summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--LICENSE2
-rw-r--r--Makefile15
-rw-r--r--docs/html-to-website-xml.xsl2
-rw-r--r--docs/rabbitmqctl.1.xml115
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control7
-rw-r--r--packaging/generic-unix/Makefile1
-rw-r--r--packaging/windows/Makefile1
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmqctl1
-rw-r--r--src/file_handle_cache.erl111
-rw-r--r--src/gen_server2.erl923
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_access_control.erl146
-rw-r--r--src/rabbit_amqqueue.erl65
-rw-r--r--src/rabbit_amqqueue_process.erl71
-rw-r--r--src/rabbit_binding.erl377
-rw-r--r--src/rabbit_channel.erl71
-rw-r--r--src/rabbit_channel_sup_sup.erl4
-rw-r--r--src/rabbit_connection_sup.erl4
-rw-r--r--src/rabbit_control.erl71
-rw-r--r--src/rabbit_dialyzer.erl25
-rw-r--r--src/rabbit_event.erl71
-rw-r--r--src/rabbit_exchange.erl417
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_limiter.erl7
-rw-r--r--src/rabbit_load.erl78
-rw-r--r--src/rabbit_msg_store.erl169
-rw-r--r--src/rabbit_msg_store_gc.erl7
-rw-r--r--src/rabbit_multi.erl14
-rw-r--r--src/rabbit_net.erl44
-rw-r--r--src/rabbit_networking.erl35
-rw-r--r--src/rabbit_plugin_activator.erl69
-rw-r--r--src/rabbit_reader.erl36
-rw-r--r--src/rabbit_router.erl16
-rw-r--r--src/rabbit_tests.erl18
-rw-r--r--src/rabbit_tracer.erl50
-rw-r--r--src/rabbit_types.erl10
-rw-r--r--src/rabbit_variable_queue.erl8
-rw-r--r--src/rabbit_writer.erl63
-rw-r--r--src/supervisor2.erl7
-rw-r--r--src/worker_pool_worker.erl7
45 files changed, 1693 insertions, 1471 deletions
diff --git a/LICENSE b/LICENSE
index d7042b92..89640485 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,5 +1,5 @@
This package, the RabbitMQ server is licensed under the MPL. For the
MPL, please see LICENSE-MPL-RabbitMQ.
-If you have any questions regarding licensing, please contact us at
+If you have any questions regarding licensing, please contact us at
info@rabbitmq.com.
diff --git a/Makefile b/Makefile
index 7ec8009e..ca61c47b 100644
--- a/Makefile
+++ b/Makefile
@@ -96,7 +96,7 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
escript generate_app $(EBIN_DIR) $@ < $<
-$(EBIN_DIR)/%.beam:
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(DEPS_FILE)
erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8)
@@ -110,7 +110,11 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
$(ERL_EBIN) -eval \
- "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))."
+ "rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\")." \
+ -eval \
+ "init:stop()."
+
+
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
@@ -307,11 +311,6 @@ else
TESTABLEGOALS:=$(MAKECMDGOALS)
endif
-ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)"
ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
-ifeq "$(strip $(wildcard $(DEPS_FILE)))" ""
-$(info $(shell $(MAKE) $(DEPS_FILE)))
-endif
-include $(DEPS_FILE)
-endif
+-include $(DEPS_FILE)
endif
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 662dbea0..c325bb5a 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -30,7 +30,7 @@
<code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
</p>
<p>
- <a href="manpages.html">See a list of all manual pages</a>.
+ <a href="../manpages.html">See a list of all manual pages</a>.
</p>
</xsl:when>
<xsl:otherwise>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 33552e17..5179eb25 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -417,7 +417,8 @@
<screen role="example">rabbitmqctl add_user tonyg changeit</screen>
<para role="example">
This command instructs the RabbitMQ broker to create a
- user named <command>tonyg</command> with (initial) password
+ (non-administrative) user named <command>tonyg</command> with
+ (initial) password
<command>changeit</command>.
</para>
</listitem>
@@ -465,13 +466,57 @@
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose administrative
+ status is to be set.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl set_admin tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to ensure the user
+ named <command>tonyg</command> is an administrator. This has no
+ effect when the user logs in via AMQP, but can be used to permit
+ the user to manage users, virtual hosts and permissions when the
+ user logs in via some other means (for example with the
+ management plugin).
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>username</term>
+ <listitem><para>The name of the user whose administrative
+ status is to be cleared.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl clear_admin tonyg</screen>
+ <para role="example">
+ This command instructs the RabbitMQ broker to ensure the user
+ named <command>tonyg</command> is not an administrator.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><cmdsynopsis><command>list_users</command></cmdsynopsis></term>
<listitem>
<para>Lists users</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl list_users</screen>
<para role="example">
- This command instructs the RabbitMQ broker to list all users.
+ This command instructs the RabbitMQ broker to list all
+ users. Each result row will contain the user name and
+ the administrator status of the user, in that order.
</para>
</listitem>
</varlistentry>
@@ -704,7 +749,7 @@
<variablelist>
<varlistentry>
<term>name</term>
- <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>The name of the queue with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>durable</term>
@@ -795,7 +840,7 @@
<variablelist>
<varlistentry>
<term>name</term>
- <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>The name of the exchange with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>type</term>
@@ -830,22 +875,58 @@
</para>
</listitem>
</varlistentry>
- </variablelist>
- <variablelist>
- <varlistentry>
- <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term>
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>bindinginfoitem</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<para>
- By default the bindings for the <command>/</command> virtual
- host are returned. The "-p" flag can be used to override
- this default. Each result row will contain an exchange
- name, queue name, routing key and binding arguments, in
- that order. Non-ASCII characters will be URL-encoded.
+ Returns binding details. By default the bindings for
+ the <command>/</command> virtual host are returned. The
+ "-p" flag can be used to override this default.
</para>
- <para role="usage">
- The output format for "list_bindings" is a list of rows containing
- exchange name, queue name, routing key and arguments, in that order.
+ <para>
+ The <command>bindinginfoitem</command> parameter is used
+ to indicate which binding information items to include
+ in the results. The column order in the results will
+ match the order of the parameters.
+ <command>bindinginfoitem</command> can take any value
+ from the list that follows:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>exchange_name</term>
+ <listitem><para>The name of the exchange to which the
+ binding is attached. with non-ASCII characters
+ escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>queue_name</term>
+ <listitem><para>The name of the queue to which the
+ binding is attached. with non-ASCII characters
+ escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>routing_key</term>
+ <listitem><para>The binding's routing key, with
+ non-ASCII characters escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>arguments</term>
+ <listitem><para>The binding's arguments.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ If no <command>bindinginfoitem</command>s are specified then
+ all above items are displayed.
+ </para>
+ <para role="example-prefix">
+ For example:
+ </para>
+ <screen role="example">rabbitmqctl list_bindings -p /myvhost exchange_name queue_name</screen>
+ <para role="example">
+ This command displays the exchange name and queue name
+ of the bindings in the virtual host
+ named <command>/myvhost</command>.
</para>
</listitem>
</varlistentry>
@@ -904,7 +985,7 @@
</varlistentry>
<varlistentry>
<term>vhost</term>
- <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem>
+ <listitem><para>Virtual host name with non-ASCII characters escaped as in C.</para></listitem>
</varlistentry>
<varlistentry>
<term>timeout</term>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 48e19ff8..4be09c5a 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -26,6 +26,7 @@
{queue_index_max_journal_entries, 262144},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
+ {default_user_is_admin, true},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
{collect_statistics, none}]}]}.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 161b9b0b..700523d7 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--record(user, {username, password}).
+-record(user, {username, password, is_admin}).
-record(permission, {scope, configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 17518ddf..eb0a2a51 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -127,6 +127,9 @@ done
rm -rf %{buildroot}
%changelog
+* Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1
+- New Upstream Release
+
* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 7ee60016..9927cfbc 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.1.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Marek Majkowski <marek@rabbitmq.com> Tue, 14 Sep 2010 14:20:17 +0100
+
rabbitmq-server (2.0.0-1) karmic; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index a44f49a0..02da0cc6 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -1,13 +1,16 @@
Source: rabbitmq-server
Section: net
Priority: extra
-Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
+Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
+# erlang-inets is not a strict dependency, but it's needed to allow
+# the installation of plugins that use mochiweb. Ideally it would be a
+# "Recommends" instead, but gdebi does not install those.
+Depends: erlang-base (>= 1:12.b.3) | erlang-base-hipe (>= 1:12.b.3), erlang-ssl | erlang-nox (<< 1:13.b-dfsg1-1), erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), erlang-mnesia | erlang-nox (<< 1:13.b-dfsg1-1), erlang-inets | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index 4eade6c7..c4e01f4a 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR) \
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index f47b5340..abe174e0 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
- $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
$(MAKE) -C $(SOURCE_DIR)
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 9310752f..8e26663a 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -82,7 +82,7 @@ fi
[ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}"
RABBITMQ_START_RABBIT=
-[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
+[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 92e5312b..76ce25fd 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -47,4 +47,3 @@ exec erl \
-s rabbit_control \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
-
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f83fa0bc..d2830a25 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -34,13 +34,15 @@
%% A File Handle Cache
%%
%% This extends a subset of the functionality of the Erlang file
-%% module.
+%% module. In the below, we use "file handle" to specifically refer to
+%% file handles, and "file descriptor" to refer to descriptors which
+%% are not file handles, e.g. sockets.
%%
%% Some constraints
%% 1) This supports one writer, multiple readers per file. Nothing
%% else.
%% 2) Do not open the same file from different processes. Bad things
-%% may happen.
+%% may happen, especially for writes.
%% 3) Writes are all appends. You cannot write to the middle of a
%% file, although you can truncate and then append if you want.
%% 4) Although there is a write buffer, there is no read buffer. Feel
@@ -49,10 +51,10 @@
%%
%% Some benefits
%% 1) You do not have to remember to call sync before close
-%% 2) Buffering is much more flexible than with plain file module, and
-%% you can control when the buffer gets flushed out. This means that
-%% you can rely on reads-after-writes working, without having to call
-%% the expensive sync.
+%% 2) Buffering is much more flexible than with the plain file module,
+%% and you can control when the buffer gets flushed out. This means
+%% that you can rely on reads-after-writes working, without having to
+%% call the expensive sync.
%% 3) Unnecessary calls to position and sync get optimised out.
%% 4) You can find out what your 'real' offset is, and what your
%% 'virtual' offset is (i.e. where the hdl really is, and where it
@@ -60,14 +62,19 @@
%% 5) You can find out what the offset was when you last sync'd.
%%
%% There is also a server component which serves to limit the number
-%% of open file handles in a "soft" way - the server will never
-%% prevent a client from opening a handle, but may immediately tell it
-%% to close the handle. Thus you can set the limit to zero and it will
-%% still all work correctly, it is just that effectively no caching
-%% will take place. The operation of limiting is as follows:
+%% of open file descriptors. This is a hard limit: the server
+%% component will ensure that clients do not have more file
+%% descriptors open than it's configured to allow.
%%
-%% On open and close, the client sends messages to the server
-%% informing it of opens and closes. This allows the server to keep
+%% On open, the client requests permission from the server to open the
+%% required number of file handles. The server may ask the client to
+%% close other file handles that it has open, or it may queue the
+%% request and ask other clients to close file handles they have open
+%% in order to satisfy the request. Requests are always satisfied in
+%% the order they arrive, even if a latter request (for a small number
+%% of file handles) can be satisfied before an earlier request (for a
+%% larger number of file handles). On close, the client sends a
+%% message to the server. These messages allow the server to keep
%% track of the number of open handles. The client also keeps a
%% gb_tree which is updated on every use of a file handle, mapping the
%% time at which the file handle was last used (timestamp) to the
@@ -81,21 +88,38 @@
%% Note that this data can go very out of date, by the client using
%% the least recently used handle.
%%
-%% When the limit is reached, the server calculates the average age of
-%% the last reported least recently used file handle of all the
-%% clients. It then tells all the clients to close any handles not
-%% used for longer than this average, by invoking the callback the
-%% client registered. The client should receive this message and pass
-%% it into set_maximum_since_use/1. However, it is highly possible
-%% this age will be greater than the ages of all the handles the
-%% client knows of because the client has used its file handles in the
-%% mean time. Thus at this point the client reports to the server the
+%% When the limit is exceeded (i.e. the number of open file handles is
+%% at the limit and there are pending 'open' requests), the server
+%% calculates the average age of the last reported least recently used
+%% file handle of all the clients. It then tells all the clients to
+%% close any handles not used for longer than this average, by
+%% invoking the callback the client registered. The client should
+%% receive this message and pass it into
+%% set_maximum_since_use/1. However, it is highly possible this age
+%% will be greater than the ages of all the handles the client knows
+%% of because the client has used its file handles in the mean
+%% time. Thus at this point the client reports to the server the
%% current timestamp at which its least recently used file handle was
%% last used. The server will check two seconds later that either it
%% is back under the limit, in which case all is well again, or if
%% not, it will calculate a new average age. Its data will be much
%% more recent now, and so it is very likely that when this is
%% communicated to the clients, the clients will close file handles.
+%% (In extreme cases, where it's very likely that all clients have
+%% used their open handles since they last sent in an update, which
+%% would mean that the average will never cause any file handles to
+%% be closed, the server can send out an average age of 0, resulting
+%% in all available clients closing all their file handles.)
+%%
+%% Care is taken to ensure that (a) processes which are blocked
+%% waiting for file descriptors to become available are not sent
+%% requests to close file handles; and (b) given it is known how many
+%% file handles a process has open, when the average age is forced to
+%% 0, close messages are only sent to enough processes to release the
+%% correct number of file handles and the list of processes is
+%% randomly shuffled. This ensures we don't cause processes to
+%% needlessly close file handles, and ensures that we don't always
+%% make such requests of the same processes.
%%
%% The advantage of this scheme is that there is only communication
%% from the client to the server on open, close, and when in the
@@ -103,11 +127,7 @@
%% communication from the client to the server on normal file handle
%% operations. This scheme forms a feed-back loop - the server does
%% not care which file handles are closed, just that some are, and it
-%% checks this repeatedly when over the limit. Given the guarantees of
-%% now(), even if there is just one file handle open, a limit of 1,
-%% and one client, it is certain that when the client calculates the
-%% age of the handle, it will be greater than when the server
-%% calculated it, hence it should be closed.
+%% checks this repeatedly when over the limit.
%%
%% Handles which are closed as a result of the server are put into a
%% "soft-closed" state in which the handle is closed (data flushed out
@@ -117,8 +137,19 @@
%% - reopening them when necessary is handled transparently.
%%
%% The server also supports obtain and transfer. obtain/0 blocks until
-%% a file descriptor is available. transfer/1 is transfers ownership
-%% of a file descriptor between processes. It is non-blocking.
+%% a file descriptor is available, at which point the requesting
+%% process is considered to 'own' one more descriptor. transfer/1
+%% transfers ownership of a file descriptor between processes. It is
+%% non-blocking. Obtain is used to obtain permission to accept file
+%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1
+%% macro. File handles can use the entire limit, but will be evicted
+%% by obtain calls up to the point at which no more obtain calls can
+%% be satisfied by the obtains limit. Thus there will always be some
+%% capacity available for file handles. Processes that use obtain are
+%% never asked to return them, and they are not managed in any way by
+%% the server. It is simply a mechanism to ensure that processes that
+%% need file descriptors such as sockets can do so in such a way that
+%% the overall number of open file descriptors is managed.
%%
%% The callers of register_callback/3, obtain/0, and the argument of
%% transfer/1 are monitored, reducing the count of handles in use
@@ -131,6 +162,7 @@
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -242,6 +274,7 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
@@ -781,7 +814,11 @@ init([]) ->
Watermark > 0) ->
Watermark;
_ ->
- ulimit()
+ case ulimit() of
+ infinity -> infinity;
+ unknown -> ?FILE_HANDLES_LIMIT_OTHER;
+ Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
+ end
end,
ObtainLimit = obtain_limit(Limit),
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
@@ -1131,7 +1168,7 @@ track_client(Pid, Clients) ->
ulimit() ->
case os:type() of
{win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS;
+ ?FILE_HANDLES_LIMIT_WINDOWS;
{unix, _OsName} ->
%% Under Linux, Solaris and FreeBSD, ulimit is a shell
%% builtin, not a command. In OS X, it's a command.
@@ -1141,16 +1178,14 @@ ulimit() ->
"unlimited" ->
infinity;
String = [C|_] when $0 =< C andalso C =< $9 ->
- Num = list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String)) -
- ?RESERVED_FOR_OTHERS,
- lists:max([1, Num]);
+ list_to_integer(
+ lists:takewhile(
+ fun (D) -> $0 =< D andalso D =< $9 end, String));
_ ->
%% probably a variant of
%% "/bin/sh: line 1: ulimit: command not found\n"
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end;
_ ->
- ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS
+ unknown
end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 9fb9e2fe..b0379b95 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -16,10 +16,12 @@
%% The original code could reorder messages when communicating with a
%% process on a remote node that was not currently connected.
%%
-%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
-%% allow callers to attach priorities to requests. Requests with
-%% higher priorities are processed before requests with lower
-%% priorities. The default priority is 0.
+%% 4) The callback module can optionally implement prioritise_call/3,
+%% prioritise_cast/2 and prioritise_info/2. These functions take
+%% Message, From and State or just Message and State and return a
+%% single integer representing the priority attached to the message.
+%% Messages with higher priorities are processed before requests with
+%% lower priorities. The default priority is 0.
%%
%% 5) The callback module can optionally implement
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
@@ -64,16 +66,16 @@
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
%% AB. All Rights Reserved.''
-%%
+%%
%% $Id$
%%
-module(gen_server2).
@@ -82,13 +84,13 @@
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
-%%% kind of inputs.
+%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%
%%% The user module should export:
%%%
-%%% init(Args)
+%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% {ok, State, Timeout, Backoff}
@@ -101,21 +103,21 @@
%%% {reply, Reply, State, Timeout}
%%% {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, Reply, State}
+%%% {stop, Reason, Reply, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_cast(Msg, State)
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
-%%% {stop, Reason, State}
+%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
@@ -159,37 +161,41 @@
%% API
-export([start/3, start/4,
- start_link/3, start_link/4,
- call/2, call/3, pcall/3, pcall/4,
- cast/2, pcast/3, reply/2,
- abcast/2, abcast/3,
- multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
+ start_link/3, start_link/4,
+ call/2, call/3,
+ cast/2, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
-export([behaviour_info/1]).
%% System exports
-export([system_continue/3,
- system_terminate/4,
- system_code_change/4,
- format_status/2]).
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
%% Internal exports
-export([init_it/6, print_event/3]).
-import(error_logger, [format/2]).
+%% State record
+-record(gs2_state, {parent, name, state, mod, time,
+ timeout_state, queue, debug, prioritise_call,
+ prioritise_cast, prioritise_info}).
+
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
%%%=========================================================================
-ifdef(use_specs).
--spec(handle_common_termination/6 ::
- (any(), any(), any(), atom(), any(), any()) -> no_return()).
+-spec(handle_common_termination/3 ::
+ (any(), atom(), #gs2_state{}) -> no_return()).
--spec(hibernate/7 ::
- (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
+-spec(hibernate/1 :: (#gs2_state{}) -> no_return()).
-endif.
@@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) ->
%% be monitored.
%% If the client is trapping exits and is linked server termination
%% is handled here (? Shall we do that here (or rely on timeouts) ?).
-%% -----------------------------------------------------------------
+%% -----------------------------------------------------------------
call(Name, Request) ->
case catch gen:call(Name, '$gen_call', Request) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
end.
call(Name, Request, Timeout) ->
case catch gen:call(Name, '$gen_call', Request, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
- end.
-
-pcall(Name, Priority, Request) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
- end.
-
-pcall(Name, Priority, Request, Timeout) ->
- case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
- {ok,Res} ->
- Res;
- {'EXIT',Reason} ->
- exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
end.
%% -----------------------------------------------------------------
@@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) ->
cast({global,Name}, Request) ->
catch global:send(Name, cast_msg(Request)),
ok;
-cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_atom(Dest) ->
do_cast(Dest, Request);
cast(Dest, Request) when is_pid(Dest) ->
do_cast(Dest, Request).
-do_cast(Dest, Request) ->
+do_cast(Dest, Request) ->
do_send(Dest, cast_msg(Request)),
ok.
-
-cast_msg(Request) -> {'$gen_cast',Request}.
-pcast({global,Name}, Priority, Request) ->
- catch global:send(Name, cast_msg(Priority, Request)),
- ok;
-pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_atom(Dest) ->
- do_cast(Dest, Priority, Request);
-pcast(Dest, Priority, Request) when is_pid(Dest) ->
- do_cast(Dest, Priority, Request).
-
-do_cast(Dest, Priority, Request) ->
- do_send(Dest, cast_msg(Priority, Request)),
- ok.
-
-cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+cast_msg(Request) -> {'$gen_cast',Request}.
%% -----------------------------------------------------------------
%% Send a reply to the client.
@@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
reply({To, Tag}, Reply) ->
catch To ! {Tag, Reply}.
-%% -----------------------------------------------------------------
-%% Asyncronous broadcast, returns nothing, it's just send'n prey
-%%-----------------------------------------------------------------
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n pray
+%% -----------------------------------------------------------------
abcast(Name, Request) when is_atom(Name) ->
do_abcast([node() | nodes()], Name, cast_msg(Request)).
@@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast.
%%% Make a call to servers at several nodes.
%%% Returns: {[Replies],[BadNodes]}
%%% A Timeout can be given
-%%%
+%%%
%%% A middleman process is used in case late answers arrives after
%%% the timeout. If they would be allowed to glog the callers message
-%%% queue, it would probably become confused. Late answers will
+%%% queue, it would probably become confused. Late answers will
%%% now arrive to the terminated middleman and so be discarded.
%%% -----------------------------------------------------------------
multi_call(Name, Req)
when is_atom(Name) ->
do_multi_call([node() | nodes()], Name, Req, infinity).
-multi_call(Nodes, Name, Req)
+multi_call(Nodes, Name, Req)
when is_list(Nodes), is_atom(Name) ->
do_multi_call(Nodes, Name, Req, infinity).
multi_call(Nodes, Name, Req, infinity) ->
do_multi_call(Nodes, Name, Req, infinity);
-multi_call(Nodes, Name, Req, Timeout)
+multi_call(Nodes, Name, Req, Timeout)
when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
do_multi_call(Nodes, Name, Req, Timeout).
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
-%%
-%% Description: Makes an existing process into a gen_server.
-%% The calling process will enter the gen_server receive
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
%% loop and become a gen_server process.
-%% The process *must* have been started using one of the
-%% start functions in proc_lib, see proc_lib(3).
-%% The user is responsible for any initialization of the
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
@@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
Backoff1 = extend_backoff(Backoff),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
+ loop(find_prioritisers(
+ #gs2_state { parent = Parent, name = Name, state = State,
+ mod = Mod, time = Timeout, timeout_state = Backoff1,
+ queue = Queue, debug = Debug })).
%%%========================================================================
%%% Gen-callback functions
@@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
+ GS2State = find_prioritisers(
+ #gs2_state { parent = Parent,
+ name = Name,
+ mod = Mod,
+ queue = Queue,
+ debug = Debug }),
case catch Mod:init(Args) of
- {ok, State} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
- {ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
- {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = infinity,
+ timeout_state = undefined });
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = undefined });
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
Backoff1 = extend_backoff(Backoff),
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
- {stop, Reason} ->
- %% For consistency, we must make sure that the
- %% registered name (if any) is unregistered before
- %% the parent process is notified about the failure.
- %% (Otherwise, the parent process could get
- %% an 'already_started' error if it immediately
- %% tried starting the process again.)
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- ignore ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, ignore),
- exit(normal);
- {'EXIT', Reason} ->
- unregister_name(Name0),
- proc_lib:init_ack(Starter, {error, Reason}),
- exit(Reason);
- Else ->
- Error = {bad_return_value, Else},
- proc_lib:init_ack(Starter, {error, Error}),
- exit(Error)
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(GS2State #gs2_state { state = State,
+ time = Timeout,
+ timeout_state = Backoff1 });
+ {stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ unregister_name(Name0),
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
end.
name({local,Name}) -> Name;
@@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
- pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
-loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- drain(Queue), Debug).
+loop(GS2State = #gs2_state { time = hibernate,
+ timeout_state = undefined }) ->
+ pre_hibernate(GS2State);
+loop(GS2State) ->
+ process_next_msg(drain(GS2State)).
-drain(Queue) ->
+drain(GS2State) ->
receive
- Input -> drain(in(Input, Queue))
- after 0 -> Queue
+ Input -> drain(in(Input, GS2State))
+ after 0 -> GS2State
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+process_next_msg(GS2State = #gs2_state { time = Time,
+ timeout_state = TimeoutState,
+ queue = Queue }) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
- process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Msg);
+ process_msg(Msg, GS2State #gs2_state { queue = Queue1 });
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
@@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
Input ->
%% Time could be 'hibernate' here, so *don't* call loop
process_next_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- drain(in(Input, Queue1)), Debug)
+ drain(in(Input, GS2State #gs2_state { queue = Queue1 })))
after Time1 ->
case HibOnTimeout of
true ->
pre_hibernate(
- Parent, Name, State, Mod, TimeoutState, Queue1,
- Debug);
+ GS2State #gs2_state { queue = Queue1 });
false ->
- process_msg(
- Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, timeout)
+ process_msg(timeout,
+ GS2State #gs2_state { queue = Queue1 })
end
end
end.
-wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
TimeoutState1 = case TS of
undefined ->
undefined;
{SleptAt, TimeoutState} ->
adjust_timeout_state(SleptAt, now(), TimeoutState)
end,
- post_hibernate(Parent, Name, State, Mod, TimeoutState1,
- drain(Queue), Debug).
+ post_hibernate(
+ drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
-hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
undefined -> undefined;
{backoff, _, _, _, _} -> {now(), TimeoutState}
end,
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
- TS, Queue, Debug]).
+ proc_lib:hibernate(?MODULE, wake_hib,
+ [GS2State #gs2_state { timeout_state = TS }]).
-pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+pre_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
true ->
case catch Mod:handle_pre_hibernate(State) of
{hibernate, NState} ->
- hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
- Debug);
+ hibernate(GS2State #gs2_state { state = NState } );
Reply ->
- handle_common_termination(Reply, Name, pre_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, pre_hibernate, GS2State)
end;
false ->
- hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ hibernate(GS2State)
end.
-post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+post_hibernate(GS2State = #gs2_state { state = State,
+ mod = Mod }) ->
case erlang:function_exported(Mod, handle_post_hibernate, 1) of
true ->
case catch Mod:handle_post_hibernate(State) of
{noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = infinity });
{noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time,
- TimeoutState, Queue, Debug);
+ process_next_msg(GS2State #gs2_state { state = NState,
+ time = Time });
Reply ->
- handle_common_termination(Reply, Name, post_hibernate,
- Mod, State, Debug)
+ handle_common_termination(Reply, post_hibernate, GS2State)
end;
false ->
%% use hibernate here, not infinity. This matches
@@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
%% still set to hibernate, iff that msg is the very msg
%% that woke us up (or the first msg we receive after
%% waking up).
- process_next_msg(Parent, Name, State, Mod, hibernate,
- TimeoutState, Queue, Debug)
+ process_next_msg(GS2State #gs2_state { time = hibernate })
end.
adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
@@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-in({'$gen_pcast', {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
-in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
- priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
-in(Input, Queue) ->
- priority_queue:in(Input, Queue).
-
-process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Msg) ->
+in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_cast', Msg},
+ PC(Msg, GS2State), Queue) };
+in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC,
+ queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ {'$gen_call', From, Msg},
+ PC(Msg, From, GS2State), Queue) };
+in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) ->
+ GS2State #gs2_state { queue = priority_queue:in(
+ Input, PI(Input, GS2State), Queue) }.
+
+process_msg(Msg,
+ GS2State = #gs2_state { parent = Parent,
+ name = Name,
+ debug = Debug }) ->
case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(
+ {system, From, Req} ->
+ sys:handle_system_msg(
Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, TimeoutState, Queue]);
+ GS2State);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Name, Msg, Mod, State, Debug);
- _Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
- Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
- Debug1)
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Msg, GS2State);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, GS2State);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
end.
%%% ---------------------------------------------------
@@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) ->
Tag = make_ref(),
Caller = self(),
Receiver =
- spawn(
- fun () ->
- %% Middleman process. Should be unsensitive to regular
- %% exit signals. The sychronization is needed in case
- %% the receiver would exit before the caller started
- %% the monitor.
- process_flag(trap_exit, true),
- Mref = erlang:monitor(process, Caller),
- receive
- {Caller,Tag} ->
- Monitors = send_nodes(Nodes, Name, Tag, Req),
- TimerId = erlang:start_timer(Timeout, self(), ok),
- Result = rec_nodes(Tag, Monitors, Name, TimerId),
- exit({self(),Tag,Result});
- {'DOWN',Mref,_,_,_} ->
- %% Caller died before sending us the go-ahead.
- %% Give up silently.
- exit(normal)
- end
- end),
+ spawn(
+ fun () ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
Mref = erlang:monitor(process, Receiver),
Receiver ! {self(),Tag},
receive
- {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
- Result;
- {'DOWN',Mref,_,_,Reason} ->
- %% The middleman code failed. Or someone did
- %% exit(_, kill) on the middleman process => Reason==killed
- exit(Reason)
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
end.
send_nodes(Nodes, Name, Tag, Req) ->
@@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors)
send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
%% Skip non-atom Node
send_nodes(Tail, Name, Tag, Req, Monitors);
-send_nodes([], _Name, _Tag, _Req, Monitors) ->
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
Monitors.
%% Against old nodes:
@@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) ->
%% Against contemporary nodes:
%% Wait for reply, server 'DOWN', or timeout from TimerId.
-rec_nodes(Tag, Nodes, Name, TimerId) ->
+rec_nodes(Tag, Nodes, Name, TimerId) ->
rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], Time, TimerId);
- {timeout, TimerId, _} ->
- unmonitor(R),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, Badnodes,
- [{N,Reply}|Replies], 2000, TimerId);
- {timeout, TimerId, _} ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- %% Collect all replies that already have arrived
- rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
after Time ->
- case rpc:call(N, erlang, whereis, [Name]) of
- Pid when is_pid(Pid) -> % It exists try again.
- rec_nodes(Tag, [N|Tail], Name, Badnodes,
- Replies, infinity, TimerId);
- _ -> % badnode
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes(Tag, Tail, Name, [N|Badnodes],
- Replies, 2000, TimerId)
- end
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
end;
rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
case catch erlang:cancel_timer(TimerId) of
- false -> % It has already sent it's message
- receive
- {timeout, TimerId, _} -> ok
- after 0 ->
- ok
- end;
- _ -> % Timer was cancelled, or TimerId was 'undefined'
- ok
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
end,
{Replies, Badnodes}.
%% Collect all replies that already have arrived
rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
receive
- {'DOWN', R, _, _, _} ->
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- unmonitor(R),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
%% R6 node
receive
- {nodedown, N} ->
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
- {{Tag, N}, Reply} -> %% Tag is bound !!!
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
after 0 ->
- receive {nodedown, N} -> ok after 0 -> ok end,
- monitor_node(N, false),
- rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
end;
rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
{Replies, Badnodes}.
@@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
if node() =:= nonode@nohost, Node =/= nonode@nohost ->
- Ref = make_ref(),
- self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
- {Node, Ref};
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
true ->
- case catch erlang:monitor(process, {Name, Node}) of
- {'EXIT', _} ->
- %% Remote node is R6
- monitor_node(Node, true),
- Node;
- Ref when is_reference(Ref) ->
- {Node, Ref}
- end
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
end.
%% Cancels a monitor started with Ref=erlang:monitor(_, _).
unmonitor(Ref) when is_reference(Ref) ->
erlang:demonitor(Ref),
receive
- {'DOWN', Ref, _, _, _} ->
- true
+ {'DOWN', Ref, _, _, _} ->
+ true
after 0 ->
- true
+ true
end.
%%% ---------------------------------------------------
@@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) ->
dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {reply, Reply, NState, Time1} ->
- reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, [])),
- reply(From, Reply),
- exit(R);
- Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue)
- end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue) ->
- Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue).
-
-handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
+ reply(From, Reply),
+ [];
+common_reply(Name, From, Reply, NState, Debug) ->
+ reply(Name, From, Reply, NState, Debug).
+
+common_debug([] = _Debug, _Func, _Info, _Event) ->
+ [];
+common_debug(Debug, Func, Info, Event) ->
+ sys:handle_debug(Debug, Func, Info, Event).
+
+handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
+ state = State,
+ name = Name,
+ debug = Debug }) ->
case catch Mod:handle_call(Msg, From, State) of
- {reply, Reply, NState} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {reply, Reply, NState, Time1} ->
- Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
- {stop, Reason, Reply, NState} ->
- {'EXIT', R} =
- (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
- reply(Name, From, Reply, NState, Debug),
- exit(R);
- Other ->
- handle_common_reply(Other, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug)
+ {reply, Reply, NState} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {reply, Reply, NState, Time1} ->
+ Debug1 = common_reply(Name, From, Reply, NState, Debug),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1});
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Msg,
+ GS2State #gs2_state { state = NState })),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other, Msg, GS2State)
end;
-handle_msg(Msg,
- Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue, Debug).
+ handle_common_reply(Reply, Msg, GS2State).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
- TimeoutState, Queue) ->
+handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
+ debug = Debug}) ->
case Reply of
- {noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
- {noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ {noreply, NState} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = infinity,
+ debug = Debug1 });
+ {noreply, NState, Time1} ->
+ Debug1 = common_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(GS2State #gs2_state { state = NState,
+ time = Time1,
+ debug = Debug1 });
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, [])
+ handle_common_termination(Reply, Msg, GS2State)
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
- Debug) ->
+handle_common_termination(Reply, Msg, GS2State) ->
case Reply of
- {noreply, NState} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
- Debug1);
- {noreply, NState, Time1} ->
- Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Msg, GS2State #gs2_state { state = NState });
+ {'EXIT', What} ->
+ terminate(What, Msg, GS2State);
_ ->
- handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
- end.
-
-handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
- case Reply of
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, Debug);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, Debug);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ terminate({bad_return_value, Reply}, Msg, GS2State)
end.
reply(Name, {To, Tag}, Reply, State, Debug) ->
reply({To, Tag}, Reply),
- sys:handle_debug(Debug, {?MODULE, print_event}, Name,
- {out, Reply, To, State} ).
+ sys:handle_debug(
+ Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}).
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
- loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
+system_continue(Parent, Debug, GS2State) ->
+ loop(GS2State #gs2_state { parent = Parent, debug = Debug }).
-ifdef(use_specs).
-spec system_terminate(_, _, _, [_]) -> no_return().
-endif.
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
- _TimeoutState, _Queue]) ->
- terminate(Reason, Name, [], Mod, State, Debug).
+system_terminate(Reason, _Parent, Debug, GS2State) ->
+ terminate(Reason, [], GS2State #gs2_state { debug = Debug }).
-system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
- OldVsn, Extra) ->
+system_code_change(GS2State = #gs2_state { mod = Mod,
+ state = State },
+ _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} ->
- {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
- Else ->
+ {ok, NewState} ->
+ NewGS2State = find_prioritisers(
+ GS2State #gs2_state { state = NewState }),
+ {ok, [NewGS2State]};
+ Else ->
Else
end.
@@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
%%-----------------------------------------------------------------
print_event(Dev, {in, Msg}, Name) ->
case Msg of
- {'$gen_call', {From, _Tag}, Call} ->
- io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
- [Name, Call, From]);
- {'$gen_cast', Cast} ->
- io:format(Dev, "*DBG* ~p got cast ~p~n",
- [Name, Cast]);
- _ ->
- io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
end;
print_event(Dev, {out, Msg, To, State}, Name) ->
- io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
- [Name, Msg, To, State]);
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
print_event(Dev, {noreply, State}, Name) ->
io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
print_event(Dev, Event, Name) ->
@@ -973,23 +950,26 @@ print_event(Dev, Event, Name) ->
%%% Terminate the server.
%%% ---------------------------------------------------
-terminate(Reason, Name, Msg, Mod, State, Debug) ->
+terminate(Reason, Msg, #gs2_state { name = Name,
+ mod = Mod,
+ state = State,
+ debug = Debug }) ->
case catch Mod:terminate(Reason, State) of
- {'EXIT', R} ->
- error_info(R, Reason, Name, Msg, State, Debug),
- exit(R);
- _ ->
- case Reason of
- normal ->
- exit(normal);
- shutdown ->
- exit(shutdown);
- {shutdown,_}=Shutdown ->
- exit(Shutdown);
- _ ->
- error_info(Reason, undefined, Name, Msg, State, Debug),
- exit(Reason)
- end
+ {'EXIT', R} ->
+ error_info(R, Reason, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
+ _ ->
+ error_info(Reason, undefined, Name, Msg, State, Debug),
+ exit(Reason)
+ end
end.
error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) ->
@@ -1038,74 +1018,109 @@ opt(_, []) ->
debug_options(Name, Opts) ->
case opt(debug, Opts) of
- {ok, Options} -> dbg_options(Name, Options);
- _ -> dbg_options(Name, [])
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
end.
dbg_options(Name, []) ->
- Opts =
- case init:get_argument(generic_debug) of
- error ->
- [];
- _ ->
- [log, statistics]
- end,
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
dbg_opts(Name, Opts);
dbg_options(Name, Opts) ->
dbg_opts(Name, Opts).
dbg_opts(Name, Opts) ->
case catch sys:debug_options(Opts) of
- {'EXIT',_} ->
- format("~p: ignoring erroneous debug options - ~p~n",
- [Name, Opts]),
- [];
- Dbg ->
- Dbg
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
end.
get_proc_name(Pid) when is_pid(Pid) ->
Pid;
get_proc_name({local, Name}) ->
case process_info(self(), registered_name) of
- {registered_name, Name} ->
- Name;
- {registered_name, _Name} ->
- exit(process_not_registered);
- [] ->
- exit(process_not_registered)
- end;
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
get_proc_name({global, Name}) ->
case global:safe_whereis_name(Name) of
- undefined ->
- exit(process_not_registered_globally);
- Pid when Pid =:= self() ->
- Name;
- _Pid ->
- exit(process_not_registered_globally)
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
end.
get_parent() ->
case get('$ancestors') of
- [Parent | _] when is_pid(Parent)->
+ [Parent | _] when is_pid(Parent)->
Parent;
[Parent | _] when is_atom(Parent)->
name_to_pid(Parent);
- _ ->
- exit(process_was_not_started_by_proc_lib)
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
end.
name_to_pid(Name) ->
case whereis(Name) of
- undefined ->
- case global:safe_whereis_name(Name) of
- undefined ->
- exit(could_not_find_registerd_name);
- Pid ->
- Pid
- end;
- Pid ->
- Pid
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
+ PrioriCall = function_exported_or_default(
+ Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritise_call = PrioriCall,
+ prioritise_cast = PrioriCast,
+ prioritise_info = PrioriInfo }.
+
+function_exported_or_default(Mod, Fun, Arity, Default) ->
+ case erlang:function_exported(Mod, Fun, Arity) of
+ true -> case Arity of
+ 2 -> fun (Msg, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end;
+ 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) ->
+ case catch Mod:Fun(Msg, From, State) of
+ Res when is_integer(Res) ->
+ Res;
+ Err ->
+ handle_common_termination(Err, Msg, GS2State)
+ end
+ end
+ end;
+ false -> Default
end.
%%-----------------------------------------------------------------
@@ -1115,25 +1130,23 @@ format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug,
[Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
NameTag = if is_pid(Name) ->
- pid_to_list(Name);
- is_atom(Name) ->
- Name
- end,
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
Header = lists:concat(["Status for generic server ", NameTag]),
Log = sys:get_debug(log, Debug, []),
- Specfic =
- case erlang:function_exported(Mod, format_status, 2) of
- true ->
- case catch Mod:format_status(Opt, [PDict, State]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ ->
- [{data, [{"State", State}]}]
- end,
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true -> case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ -> [{data, [{"State", State}]}]
+ end,
[{header, Header},
{data, [{"Status", SysState},
- {"Parent", Parent},
- {"Logged events", Log},
+ {"Parent", Parent},
+ {"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
Specfic].
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 303d1f3a..8c36a9f0 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -205,8 +205,7 @@
%%----------------------------------------------------------------------------
prepare() ->
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir().
+ ok = ensure_working_log_handlers().
start() ->
try
@@ -490,11 +489,16 @@ maybe_insert_default_data() ->
insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
+ {ok, DefaultAdmin} = application:get_env(default_user_is_admin),
{ok, DefaultVHost} = application:get_env(default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
application:get_env(default_permissions),
ok = rabbit_access_control:add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ case DefaultAdmin of
+ true -> rabbit_access_control:set_admin(DefaultUser);
+ _ -> ok
+ end,
ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
DefaultConfigurePerm,
DefaultWritePerm,
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 8d00f591..73fd6f0e 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -35,11 +35,12 @@
-export([check_login/2, user_pass_login/2,
check_vhost_access/2, check_resource_access/3]).
--export([add_user/2, delete_user/1, change_password/2, list_users/0,
- lookup_user/1]).
--export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
+-export([add_user/2, delete_user/1, change_password/2, set_admin/1,
+ clear_admin/1, list_users/0, lookup_user/1]).
+-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]).
-export([set_permissions/5, set_permissions/6, clear_permissions/2,
- list_vhost_permissions/1, list_user_permissions/1]).
+ list_permissions/0, list_vhost_permissions/1, list_user_permissions/1,
+ list_user_vhost_permissions/2]).
%%----------------------------------------------------------------------------
@@ -52,6 +53,7 @@
-type(password() :: binary()).
-type(regexp() :: binary()).
-type(scope() :: binary()).
+-type(scope_atom() :: 'client' | 'all').
-spec(check_login/2 ::
(binary(), binary()) -> rabbit_types:user() |
@@ -68,26 +70,33 @@
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
+-spec(set_admin/1 :: (username()) -> 'ok').
+-spec(clear_admin/1 :: (username()) -> 'ok').
-spec(list_users/0 :: () -> [username()]).
-spec(lookup_user/1 ::
(username()) -> rabbit_types:ok(rabbit_types:user())
| rabbit_types:error('not_found')).
--spec(add_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok').
--spec(delete_vhost/1 ::
- (rabbit_types:vhost()) -> 'ok').
+-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()).
-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
regexp(), regexp()) -> 'ok').
-spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
+-spec(list_permissions/0 ::
+ () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost())
- -> [{username(), regexp(), regexp(), regexp()}]).
+ (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
-spec(list_user_permissions/1 ::
- (username())
- -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
+ (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(),
+ scope_atom()}]).
+-spec(list_user_vhost_permissions/2 ::
+ (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(),
+ scope_atom()}]).
-endif.
@@ -142,7 +151,7 @@ internal_lookup_vhost_access(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] -> not_found;
[R] -> {ok, R}
@@ -160,7 +169,6 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
-permission_index(scope) -> #permission.scope;
permission_index(configure) -> #permission.configure;
permission_index(write) -> #permission.write;
permission_index(read) -> #permission.read.
@@ -171,27 +179,29 @@ check_resource_access(Username,
check_resource_access(Username,
R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(_Username,
- #resource{name = <<"amq.gen",_/binary>>},
- #permission{scope = client}) ->
- ok;
check_resource_access(Username,
R = #resource{virtual_host = VHostPath, name = Name},
Permission) ->
Res = case mnesia:dirty_read({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}}) of
[] ->
false;
[#user_permission{permission = P}] ->
- PermRegexp = case element(permission_index(Permission), P) of
- %% <<"^$">> breaks Emacs' erlang mode
- <<"">> -> <<$^, $$>>;
- RE -> RE
- end,
- case re:run(Name, PermRegexp, [{capture, none}]) of
- match -> true;
- nomatch -> false
+ case {Name, P} of
+ {<<"amq.gen",_/binary>>, #permission{scope = client}} ->
+ true;
+ _ ->
+ PermRegexp =
+ case element(permission_index(Permission), P) of
+ %% <<"^$">> breaks Emacs' erlang mode
+ <<"">> -> <<$^, $$>>;
+ RE -> RE
+ end,
+ case re:run(Name, PermRegexp, [{capture, none}]) of
+ match -> true;
+ nomatch -> false
+ end
end
end,
if Res -> ok;
@@ -207,7 +217,8 @@ add_user(Username, Password) ->
[] ->
ok = mnesia:write(rabbit_user,
#user{username = Username,
- password = Password},
+ password = Password,
+ is_admin = false},
write);
_ ->
mnesia:abort({user_already_exists, Username})
@@ -237,20 +248,39 @@ delete_user(Username) ->
R.
change_password(Username, Password) ->
- R = rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () ->
- ok = mnesia:write(rabbit_user,
- #user{username = Username,
- password = Password},
- write)
- end)),
+ R = update_user(Username, fun(User) ->
+ User#user{password = Password}
+ end),
rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
+set_admin(Username) ->
+ set_admin(Username, true).
+
+clear_admin(Username) ->
+ set_admin(Username, false).
+
+set_admin(Username, IsAdmin) ->
+ R = update_user(Username, fun(User) ->
+ User#user{is_admin = IsAdmin}
+ end),
+ rabbit_log:info("Set user admin flag for user ~p to ~p~n",
+ [Username, IsAdmin]),
+ R.
+
+update_user(Username, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:with_user(
+ Username,
+ fun () ->
+ {ok, User} = lookup_user(Username),
+ ok = mnesia:write(rabbit_user, Fun(User), write)
+ end)).
+
list_users() ->
- mnesia:dirty_all_keys(rabbit_user).
+ [{Username, IsAdmin} ||
+ #user{username = Username, is_admin = IsAdmin} <-
+ mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})].
lookup_user(Username) ->
rabbit_misc:dirty_read({rabbit_user, Username}).
@@ -300,7 +330,7 @@ delete_vhost(VHostPath) ->
R.
internal_delete_vhost(VHostPath) ->
- lists:foreach(fun (#exchange{name=Name}) ->
+ lists:foreach(fun (#exchange{name = Name}) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
@@ -311,6 +341,9 @@ internal_delete_vhost(VHostPath) ->
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
+vhost_exists(VHostPath) ->
+ mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
+
list_vhosts() ->
mnesia:dirty_all_keys(rabbit_vhost).
@@ -338,13 +371,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer
fun () -> ok = mnesia:write(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
- username = Username,
+ username = Username,
virtual_host = VHostPath},
permission = #permission{
- scope = Scope,
+ scope = Scope,
configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}},
+ write = WritePerm,
+ read = ReadPerm}},
write)
end)).
@@ -355,10 +388,15 @@ clear_permissions(Username, VHostPath) ->
Username, VHostPath,
fun () ->
ok = mnesia:delete({rabbit_user_permission,
- #user_vhost{username = Username,
+ #user_vhost{username = Username,
virtual_host = VHostPath}})
end)).
+list_permissions() ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ list_permissions(match_user_vhost('_', '_'))].
+
list_vhost_permissions(VHostPath) ->
[{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
{Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
@@ -371,15 +409,21 @@ list_user_permissions(Username) ->
list_permissions(rabbit_misc:with_user(
Username, match_user_vhost(Username, '_')))].
+list_user_vhost_permissions(Username, VHostPath) ->
+ [{ConfigurePerm, WritePerm, ReadPerm, Scope} ||
+ {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <-
+ list_permissions(rabbit_misc:with_user_and_vhost(
+ Username, VHostPath,
+ match_user_vhost(Username, VHostPath)))].
+
list_permissions(QueryThunk) ->
[{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} ||
- #user_permission{user_vhost = #user_vhost{username = Username,
+ #user_permission{user_vhost = #user_vhost{username = Username,
virtual_host = VHostPath},
- permission = #permission{
- scope = Scope,
- configure = ConfigurePerm,
- write = WritePerm,
- read = ReadPerm}} <-
+ permission = #permission{ scope = Scope,
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(QueryThunk)].
@@ -387,7 +431,7 @@ match_user_vhost(Username, VHostPath) ->
fun () -> mnesia:match_object(
rabbit_user_permission,
#user_permission{user_vhost = #user_vhost{
- username = Username,
+ username = Username,
virtual_host = VHostPath},
permission = '_'},
read)
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 99faa511..c6bbd5a3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -56,7 +56,7 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(EXPIRES_TYPE, long).
+-define(EXPIRES_TYPES, [byte, short, signedint, long]).
%%----------------------------------------------------------------------------
@@ -249,11 +249,12 @@ start_queue_process(Q) ->
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
- Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
- fun (_X, _Q) -> ok end),
- ok.
+ rabbit_binding:add(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = []}).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -313,13 +314,13 @@ check_declare_arguments(QueueName, Args) ->
check_expires_argument(undefined) ->
ok;
-check_expires_argument({?EXPIRES_TYPE, Expires})
- when is_integer(Expires) andalso Expires > 0 ->
- ok;
-check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
- {error, expires_zero_or_less};
-check_expires_argument(_) ->
- {error, expires_not_of_type_long}.
+check_expires_argument({Type, Expires}) when Expires > 0 ->
+ case lists:member(Type, ?EXPIRES_TYPES) of
+ true -> ok;
+ false -> {error, {expires_not_of_acceptable_type, Type, Expires}}
+ end;
+check_expires_argument({_Type, _Expires}) ->
+ {error, expires_zero_or_less}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -331,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, info, infinity).
+ delegate_call(QPid, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -344,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_pcall(QPid, 9, consumers, infinity).
+ delegate_call(QPid, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -356,7 +357,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
- delegate_pcast(QPid, 7, emit_stats).
+ delegate_cast(QPid, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -382,10 +383,10 @@ requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+ delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
@@ -421,10 +422,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- delegate_pcast(QPid, 7, {unblock, ChPid}).
+ delegate_cast(QPid, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_no_result(
@@ -436,7 +437,7 @@ internal_delete1(QueueName) ->
%% we want to execute some things, as
%% decided by rabbit_exchange, after the
%% transaction.
- rabbit_exchange:delete_queue_bindings(QueueName).
+ rabbit_binding:remove_for_queue(QueueName).
internal_delete(QueueName) ->
case
@@ -454,20 +455,19 @@ internal_delete(QueueName) ->
end.
maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun},
- infinity).
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
update_ram_duration(QPid) ->
- gen_server2:pcast(QPid, 8, update_ram_duration).
+ gen_server2:cast(QPid, update_ram_duration).
set_ram_duration_target(QPid, Duration) ->
- gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}).
+ gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
- gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
maybe_expire(QPid) ->
- gen_server2:pcast(QPid, 8, maybe_expire).
+ gen_server2:cast(QPid, maybe_expire).
on_node_down(Node) ->
[Hook() ||
@@ -481,7 +481,7 @@ on_node_down(Node) ->
ok.
delete_queue(QueueName) ->
- Post = rabbit_exchange:delete_transient_queue_bindings(QueueName),
+ Post = rabbit_binding:remove_transient_for_queue(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
Post.
@@ -507,11 +507,6 @@ safe_delegate_call_ok(F, Pids) ->
delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
-delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
- fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-
-delegate_pcast(Pid, Pri, Msg) ->
- delegate:invoke_no_result(Pid,
- fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
+delegate_cast(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6eeb397c..8a9a293b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,8 @@
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-import(queue).
-import(erlang).
@@ -148,13 +149,14 @@ code_change(_OldVsn, State, _Extra) ->
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
- {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
- undefined -> State
+ {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
+ undefined -> State
end.
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined}) ->
+ backing_queue = BQ, backing_queue_state = undefined,
+ stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -165,11 +167,12 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- rabbit_event:notify(
- queue_created,
- [{Item, i(Item, State)} ||
- Item <- ?CREATION_EVENT_KEYS]),
- noreply(init_expires(State#q{backing_queue_state = BQS}));
+ State1 = init_expires(State#q{backing_queue_state = BQS}),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -205,7 +208,7 @@ next_state(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
ensure_rate_timer(State),
State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS)of
+ case BQ:needs_idle_timeout(BQS) of
true -> {ensure_sync_timer(State2), 0};
false -> {stop_sync_timer(State2), hibernate}
end.
@@ -269,14 +272,8 @@ ensure_stats_timer(State = #q{stats_timer = StatsTimer,
q = Q}) ->
State#q{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(State) end,
fun() -> rabbit_amqqueue:emit_stats(Q) end)}.
-stop_stats_timer(State = #q{stats_timer = StatsTimer}) ->
- State#q{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> emit_stats(State) end)}.
-
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -618,11 +615,33 @@ i(Item, _) ->
throw({bad_argument, Item}).
emit_stats(State) ->
- rabbit_event:notify(queue_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+ rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)).
%---------------------------------------------------------------------------
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ _ -> 0
+ end.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -938,9 +957,12 @@ handle_cast(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+ %% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- noreply(State).
+ State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ assert_invariant(State1),
+ {noreply, State1}.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -971,11 +993,14 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ stats_timer = StatsTimer}) ->
BQS1 = BQ:handle_pre_hibernate(BQS),
%% no activity for a while == 0 egress and ingress rates
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), infinity),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- {hibernate, stop_stats_timer(
- stop_rate_timer(State#q{backing_queue_state = BQS2}))}.
+ rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State) end),
+ State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ backing_queue_state = BQS2},
+ {hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
new file mode 100644
index 00000000..19150fa9
--- /dev/null
+++ b/src/rabbit_binding.erl
@@ -0,0 +1,377 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_binding).
+-include("rabbit.hrl").
+
+-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]).
+-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]).
+-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+%% these must all be run inside a mnesia tx
+-export([has_for_exchange/1, remove_for_exchange/1,
+ remove_for_queue/1, remove_transient_for_queue/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-export_type([key/0]).
+
+-type(key() :: binary()).
+
+-type(bind_errors() :: rabbit_types:error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(bind_res() :: 'ok' | bind_errors()).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
+-type(bindings() :: [rabbit_types:binding()]).
+
+-spec(recover/0 :: () -> [rabbit_types:binding()]).
+-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
+-spec(add/1 :: (rabbit_types:binding()) -> bind_res()).
+-spec(remove/1 :: (rabbit_types:binding()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()).
+-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) ->
+ bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list/1 :: (rabbit_types:vhost()) -> bindings()).
+-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()).
+-spec(list_for_exchange_and_queue/2 ::
+ (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]).
+-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) ->
+ [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()).
+-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()).
+-spec(remove_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+-spec(remove_transient_for_queue/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]).
+
+recover() ->
+ rabbit_misc:table_fold(
+ fun (Route = #route{binding = B}, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write),
+ [B | Acc]
+ end, [], rabbit_durable_route).
+
+exists(Binding) ->
+ binding_action(
+ Binding,
+ fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+
+add(Binding) -> add(Binding, fun (_X, _Q) -> ok end).
+
+remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end).
+
+add(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_] -> {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
+ end
+ end) of
+ {new, X = #exchange{ type = Type }, B} ->
+ ok = (type_to_module(Type)):add_binding(X, B),
+ rabbit_event:notify(binding_created, info(B));
+ {existing, _, _} ->
+ ok;
+ {error, _} = Err ->
+ Err
+ end.
+
+remove(Binding, InnerFun) ->
+ case binding_action(
+ Binding,
+ fun (X, Q, B) ->
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] -> {error, binding_not_found};
+ [_] -> case InnerFun(X, Q) of
+ ok ->
+ Durable = (X#exchange.durable andalso
+ Q#amqqueue.durable),
+ ok = sync_binding(
+ B, Durable,
+ fun mnesia:delete_object/3),
+ Deleted =
+ rabbit_exchange:maybe_auto_delete(X),
+ {{Deleted, X}, B};
+ {error, _} = E ->
+ E
+ end
+ end
+ end) of
+ {error, _} = Err ->
+ Err;
+ {{IsDeleted, X = #exchange{ type = Type }}, B} ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> ok = Module:delete(X, [B]);
+ not_deleted -> ok = Module:remove_bindings(X, [B])
+ end,
+ rabbit_event:notify(binding_deleted, info(B)),
+ ok
+ end.
+
+list(VHostPath) ->
+ Route = #route{binding = #binding{
+ exchange_name = rabbit_misc:r(VHostPath, exchange),
+ queue_name = rabbit_misc:r(VHostPath, queue),
+ _ = '_'},
+ _ = '_'},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_exchange(XName) ->
+ Route = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+list_for_queue(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}},
+ [reverse_binding(B) || #reverse_route{reverse_binding = B} <-
+ mnesia:dirty_match_object(rabbit_reverse_route,
+ reverse_route(Route))].
+
+list_for_exchange_and_queue(XName, QueueName) ->
+ Route = #route{binding = #binding{exchange_name = XName,
+ queue_name = QueueName,
+ _ = '_'}},
+ [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route,
+ Route)].
+
+info_keys() -> ?INFO_KEYS.
+
+map(VHostPath, F) ->
+ %% TODO: there is scope for optimisation here, e.g. using a
+ %% cursor, parallelising the function invocation
+ lists:map(F, list(VHostPath)).
+
+infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items].
+
+i(exchange_name, #binding{exchange_name = XName}) -> XName;
+i(queue_name, #binding{queue_name = QName}) -> QName;
+i(routing_key, #binding{key = RoutingKey}) -> RoutingKey;
+i(arguments, #binding{args = Arguments}) -> Arguments;
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(B = #binding{}) -> infos(?INFO_KEYS, B).
+
+info(B = #binding{}, Items) -> infos(Items, B).
+
+info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end).
+
+info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end).
+
+has_for_exchange(XName) ->
+ Match = #route{binding = #binding{exchange_name = XName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match).
+
+remove_for_exchange(XName) ->
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route),
+ Route#route.binding
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = XName,
+ _ = '_'}},
+ write)].
+
+remove_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_forward_routes/1).
+
+remove_transient_for_queue(QueueName) ->
+ remove_for_queue(QueueName, fun delete_transient_forward_routes/1).
+
+%%----------------------------------------------------------------------------
+
+binding_action(Binding = #binding{exchange_name = XName,
+ queue_name = QueueName,
+ args = Arguments}, Fun) ->
+ call_with_exchange_and_queue(
+ XName, QueueName,
+ fun (X, Q) ->
+ SortedArgs = rabbit_misc:sort_field_table(Arguments),
+ Fun(X, Q, Binding#binding{args = SortedArgs})
+ end).
+
+sync_binding(Binding, Durable, Fun) ->
+ ok = case Durable of
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
+ false -> ok
+ end,
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
+ ok.
+
+call_with_exchange_and_queue(XName, QueueName, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> case {mnesia:read({rabbit_exchange, XName}),
+ mnesia:read({rabbit_queue, QueueName})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
+ end).
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_exchange_type_registry:lookup_module(T),
+ Module.
+
+contains(Table, MatchHead) ->
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+remove_for_queue(QueueName, FwdDeleteFun) ->
+ DeletedBindings =
+ [begin
+ Route = reverse_route(ReverseRoute),
+ ok = FwdDeleteFun(Route),
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ ReverseRoute, write),
+ Route#route.binding
+ end || ReverseRoute
+ <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(#route{binding = #binding{
+ queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ Grouped = group_bindings_and_auto_delete(
+ lists:keysort(#binding.exchange_name, DeletedBindings), []),
+ fun () ->
+ lists:foreach(
+ fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
+ Module = type_to_module(Type),
+ case IsDeleted of
+ auto_deleted -> Module:delete(X, Bs);
+ not_deleted -> Module:remove_bindings(X, Bs)
+ end
+ end, Grouped)
+ end.
+
+%% Requires that its input binding list is sorted in exchange-name
+%% order, so that the grouping of bindings (for passing to
+%% group_bindings_and_auto_delete1) works properly.
+group_bindings_and_auto_delete([], Acc) ->
+ Acc;
+group_bindings_and_auto_delete(
+ [B = #binding{exchange_name = XName} | Bs], Acc) ->
+ group_bindings_and_auto_delete(XName, Bs, [B], Acc).
+
+group_bindings_and_auto_delete(
+ XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) ->
+ group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc);
+group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) ->
+ %% either Removed is [], or its head has a non-matching XName
+ [X] = mnesia:read({rabbit_exchange, XName}),
+ NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc],
+ group_bindings_and_auto_delete(Removed, NewAcc).
+
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
+
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(rabbit_route, Route, write).
+
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = XName,
+ queue_name = QueueName,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = XName,
+ queue_name = QueueName,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = XName,
+ queue_name = QueueName,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = XName,
+ queue_name = QueueName,
+ key = Key,
+ args = Args}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 917188d4..4bb1f13b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,7 +41,8 @@
-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
start_limiter_fun, transaction_id, tx_participants, next_tag,
@@ -137,10 +138,10 @@ list() ->
info_keys() -> ?INFO_KEYS.
info(Pid) ->
- gen_server2:pcall(Pid, 9, info, infinity).
+ gen_server2:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of
+ case gen_server2:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -152,7 +153,7 @@ info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
emit_stats(Pid) ->
- gen_server2:pcast(Pid, 7, emit_stats).
+ gen_server2:cast(Pid, emit_stats).
flush(Pid) ->
gen_server2:call(Pid, flush).
@@ -170,6 +171,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
+ StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{ state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -187,19 +189,32 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer(),
+ stats_timer = StatsTimer,
confirm_enabled = false,
published_count = 0,
confirm_multiple = false,
held_confirms = gb_sets:new(),
need_confirming = gb_sets:new(),
qpid_to_msgs = dict:new() },
- rabbit_event:notify(
- channel_created,
- [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ info -> 9;
+ {info, _Items} -> 9;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -258,9 +273,10 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
end, State),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State) ->
+handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
- {noreply, State};
+ {noreply,
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}};
handle_cast(flush_multiple_acks,
State = #ch{writer_pid = WriterPid,
@@ -295,12 +311,16 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
handle_pre_hibernate(State = #ch { writer_pid = WriterPid,
held_confirms = As,
+ stats_timer = StatsTimer,
need_confirming = NA }) ->
ok = clear_permission_cache(),
handle_multiple_flush(WriterPid, As, NA),
- {hibernate, stop_stats_timer(
- State #ch { held_confirms = gb_sets:new(),
- confirm_tref = undefined })}.
+ rabbit_event:if_enabled(StatsTimer, fun() ->
+ internal_emit_stats(State)
+ end),
+ {hibernate, State #ch { held_confirms = gb_sets:new(),
+ stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
+ confirm_tref = undefined }}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -330,14 +350,8 @@ ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> internal_emit_stats(State) end,
fun() -> emit_stats(ChPid) end)}.
-stop_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:stop_stats_timer(
- StatsTimer,
- fun() -> internal_emit_stats(State) end)}.
-
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -937,17 +951,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
- NoWait, State);
+ binding_action(fun rabbit_binding:add/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.bind_ok'{}, NoWait, State);
handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
- QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
- false, State);
+ binding_action(fun rabbit_binding:remove/2,
+ ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
+ #'queue.unbind_ok'{}, false, State);
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
@@ -1064,7 +1078,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ case Fun(#binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = ActualRoutingKey,
+ args = Arguments},
fun (_X, Q) ->
try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
catch exit:Reason -> {error, Reason}
@@ -1318,7 +1335,7 @@ update_measures(Type, QX, Inc, Measure) ->
orddict:store(Measure, Cur + Inc, Measures)).
internal_emit_stats(State = #ch{stats_timer = StatsTimer}) ->
- CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS],
+ CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
rabbit_event:notify(channel_stats, CoarseStats);
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index d1938805..21c39780 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -53,9 +53,7 @@ start_link() ->
supervisor2:start_link(?MODULE, []).
start_channel(Pid, Args) ->
- {ok, ChSupPid, _} = Result = supervisor2:start_child(Pid, [Args]),
- link(ChSupPid),
- Result.
+ supervisor2:start_child(Pid, [Args]).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 69e21d73..b3821d3b 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -88,12 +88,12 @@ start_heartbeat_fun(SupPid) ->
SupPid, {heartbeat_sender,
{rabbit_heartbeat, start_heartbeat_sender,
[Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{ok, Receiver} =
supervisor2:start_child(
SupPid, {heartbeat_receiver,
{rabbit_heartbeat, start_heartbeat_receiver,
[Parent, Sock, TimeoutSec]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
{Sender, Receiver}
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index f0b623c2..a3b6f369 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -209,6 +209,14 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
Inform("Changing password for user ~p", [Username]),
call(Node, {rabbit_access_control, change_password, Args});
+action(set_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Setting administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, set_admin, [Username]});
+
+action(clear_admin, Node, [Username], _Opts, Inform) ->
+ Inform("Clearing administrative status for user ~p", [Username]),
+ call(Node, {rabbit_access_control, clear_admin, [Username]});
+
action(list_users, Node, [], _Opts, Inform) ->
Inform("Listing users", []),
display_list(call(Node, {rabbit_access_control, list_users, []}));
@@ -246,14 +254,14 @@ action(list_exchanges, Node, Args, Opts, Inform) ->
[VHostArg, ArgAtoms]),
ArgAtoms);
-action(list_bindings, Node, _Args, Opts, Inform) ->
+action(list_bindings, Node, Args, Opts, Inform) ->
Inform("Listing bindings", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- InfoKeys = [exchange_name, queue_name, routing_key, args],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys);
+ ArgAtoms = default_if_empty(Args, [exchange_name, queue_name,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms]),
+ ArgAtoms);
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
@@ -304,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
- X <- InfoItemKeys])
- end, Results),
+ lists:foreach(
+ fun (Result) -> display_row(
+ [format_info_item(proplists:get_value(X, Result)) ||
+ X <- InfoItemKeys])
+ end, Results),
ok;
display_info_list(Other, _) ->
Other.
@@ -315,25 +325,30 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Key, Items) ->
- case proplists:get_value(Key, Items) of
- #resource{name = Name} ->
- escape(Name);
- Value when Key =:= address; Key =:= peer_address andalso
- is_tuple(Value) ->
- inet_parse:ntoa(Value);
- Value when is_pid(Value) ->
- rabbit_misc:pid_to_string(Value);
- Value when is_binary(Value) ->
- escape(Value);
- Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
- when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
- io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
- Value ->
- io_lib:format("~w", [Value])
- end.
+-define(IS_U8(X), (X >= 0 andalso X =< 255)).
+-define(IS_U16(X), (X >= 0 andalso X =< 65535)).
+
+format_info_item(#resource{name = Name}) ->
+ escape(Name);
+format_info_item({N1, N2, N3, N4} = Value) when
+ ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
+ inet_parse:ntoa(Value);
+format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
+ ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
+ ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
+ inet_parse:ntoa(Value);
+format_info_item(Value) when is_pid(Value) ->
+ rabbit_misc:pid_to_string(Value);
+format_info_item(Value) when is_binary(Value) ->
+ escape(Value);
+format_info_item(Value) when is_atom(Value) ->
+ escape(atom_to_list(Value));
+format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
+ Value) when is_binary(TableEntryKey) andalso
+ is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item(Value) ->
+ io_lib:format("~w", [Value]).
display_list(L) when is_list(L) ->
lists:foreach(fun (I) when is_binary(I) ->
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index 51bd6b1f..a9806305 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -61,26 +61,27 @@ add_to_plt(PltPath, FilesString) ->
{init_plt, PltPath},
{output_plt, PltPath},
{files, Files}]),
- print_warnings(DialyzerWarnings),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1),
ok.
dialyze_files(PltPath, ModifiedFiles) ->
Files = string:tokens(ModifiedFiles, " "),
DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
- {files, Files}]),
+ {files, Files},
+ {warnings, [behaviours,
+ race_conditions]}]),
case DialyzerWarnings of
- [] -> io:format("~nOk~n"),
- ok;
- _ -> io:format("~nFAILED with the following warnings:~n"),
- print_warnings(DialyzerWarnings),
- fail
- end.
-
-print_warnings(Warnings) ->
- [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
- io:format("~n"),
+ [] -> io:format("~nOk~n");
+ _ -> io:format("~n~nFAILED with the following ~p warnings:~n~n",
+ [length(DialyzerWarnings)]),
+ print_warnings(DialyzerWarnings, fun dialyzer:format_warning/1)
+ end,
ok.
+print_warnings(Warnings, FormatFun) ->
+ [io:format("~s~n", [FormatFun(W)]) || W <- Warnings],
+ io:format("~n").
+
otp_apps_dependencies_paths() ->
[code:lib_dir(App, ebin) ||
App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 0f00537a..2b236531 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -34,9 +34,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/2]).
--export([ensure_stats_timer_after/2, reset_stats_timer_after/1]).
--export([stats_level/1]).
+-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
+-export([reset_stats_timer/1]).
+-export([stats_level/1, if_enabled/2]).
-export([notify/2]).
%%----------------------------------------------------------------------------
@@ -71,11 +71,11 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), timer_fun(), timer_fun()) -> state()).
--spec(stop_stats_timer/2 :: (state(), timer_fun()) -> state()).
--spec(ensure_stats_timer_after/2 :: (state(), timer_fun()) -> state()).
--spec(reset_stats_timer_after/1 :: (state()) -> state()).
+-spec(ensure_stats_timer/2 :: (state(), timer_fun()) -> state()).
+-spec(stop_stats_timer/1 :: (state()) -> state()).
+-spec(reset_stats_timer/1 :: (state()) -> state()).
-spec(stats_level/1 :: (state()) -> level()).
+-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-endif.
@@ -85,44 +85,61 @@
start_link() ->
gen_event:start_link({local, ?MODULE}).
+%% The idea is, for each stat-emitting object:
+%%
+%% On startup:
+%% Timer = init_stats_timer()
+%% notify(created event)
+%% if_enabled(internal_emit_stats) - so we immediately send something
+%%
+%% On wakeup:
+%% ensure_stats_timer(Timer, emit_stats)
+%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
+%%
+%% emit_stats:
+%% if_enabled(internal_emit_stats)
+%% reset_stats_timer(Timer) - just bookkeeping
+%%
+%% Pre-hibernation:
+%% if_enabled(internal_emit_stats)
+%% stop_stats_timer(Timer)
+%%
+%% internal_emit_stats:
+%% notify(stats)
+
init_stats_timer() ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
#state{level = StatsLevel, timer = undefined}.
-ensure_stats_timer(State = #state{level = none}, _NowFun, _TimerFun) ->
+ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
-ensure_stats_timer(State = #state{timer = undefined}, NowFun, TimerFun) ->
- NowFun(),
- {ok, TRef} = timer:apply_interval(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
+ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
+ {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
-ensure_stats_timer(State, _NowFun, _TimerFun) ->
+ensure_stats_timer(State, _Fun) ->
State.
-stop_stats_timer(State = #state{level = none}, _NowFun) ->
+stop_stats_timer(State = #state{level = none}) ->
State;
-stop_stats_timer(State = #state{timer = undefined}, _NowFun) ->
+stop_stats_timer(State = #state{timer = undefined}) ->
State;
-stop_stats_timer(State = #state{timer = TRef}, NowFun) ->
+stop_stats_timer(State = #state{timer = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
- NowFun(),
State#state{timer = undefined}.
-ensure_stats_timer_after(State = #state{level = none}, _TimerFun) ->
- State;
-ensure_stats_timer_after(State = #state{timer = undefined}, TimerFun) ->
- {ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [TimerFun, []]),
- State#state{timer = TRef};
-ensure_stats_timer_after(State, _TimerFun) ->
- State.
-
-reset_stats_timer_after(State) ->
+reset_stats_timer(State) ->
State#state{timer = undefined}.
stats_level(#state{level = Level}) ->
Level.
+if_enabled(#state{level = none}, _Fun) ->
+ ok;
+if_enabled(_State, Fun) ->
+ Fun(),
+ ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index af4eb1bd..2a19d5b1 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -34,38 +34,19 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2]).
--export([add_binding/5, delete_binding/5, list_bindings/1]).
--export([delete/2]).
--export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([assert_equivalence/5]).
--export([assert_args_equivalence/2]).
--export([check_type/1]).
-
-%% EXTENDED API
--export([list_exchange_bindings/1]).
--export([list_queue_bindings/1]).
-
--import(mnesia).
--import(sets).
--import(lists).
+ info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+%% this must be run inside a mnesia tx
+-export([maybe_auto_delete/1]).
+-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, type/0, binding_key/0]).
+-export_type([name/0, type/0]).
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
--type(binding_key() :: binary()).
-
--type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found')).
--type(inner_fun() ::
- fun((rabbit_types:exchange(), queue()) ->
- rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 ::
@@ -97,32 +78,12 @@
-> [[rabbit_types:info()]]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
--spec(add_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun()) -> bind_res()).
--spec(delete_binding/5 ::
- (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table(), inner_fun())
- -> bind_res() | rabbit_types:error('binding_not_found')).
--spec(list_bindings/1 ::
- (rabbit_types:vhost())
- -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(delete_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
--spec(delete_transient_queue_bindings/1 ::
- (rabbit_amqqueue:name()) -> fun (() -> any())).
-spec(delete/2 ::
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
--spec(list_queue_bindings/1 ::
- (rabbit_amqqueue:name())
- -> [{name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
--spec(list_exchange_bindings/1 ::
- (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
- rabbit_framing:amqp_table()}]).
+-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) ->
+ 'not_deleted' | 'auto_deleted').
-endif.
@@ -131,27 +92,15 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments]).
recover() ->
- Exs = rabbit_misc:table_fold(
- fun (Exchange, Acc) ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
- [Exchange | Acc]
- end, [], rabbit_durable_exchange),
- Bs = rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route,
- Route, write),
- ok = mnesia:write(rabbit_reverse_route,
- ReverseRoute, write),
- [B | Acc]
- end, [], rabbit_durable_route),
- recover_with_bindings(Bs, Exs),
- ok.
-
-recover_with_bindings(Bs, Exs) ->
+ Xs = rabbit_misc:table_fold(
+ fun (X, Acc) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ [X | Acc]
+ end, [], rabbit_durable_exchange),
+ Bs = rabbit_binding:recover(),
recover_with_bindings(
lists:keysort(#binding.exchange_name, Bs),
- lists:keysort(#exchange.name, Exs), []).
+ lists:keysort(#exchange.name, Xs), []).
recover_with_bindings([B = #binding{exchange_name = Name} | Rest],
Xs = [#exchange{name = Name} | _],
@@ -163,38 +112,36 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args},
+declare(XName, Type, Durable, AutoDelete, Args) ->
+ X = #exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
TypeModule = type_to_module(Type),
- ok = TypeModule:validate(Exchange),
+ ok = TypeModule:validate(X),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
+ ok = mnesia:write(rabbit_exchange, X, write),
ok = case Durable of
true ->
mnesia:write(rabbit_durable_exchange,
- Exchange, write);
+ X, write);
false ->
ok
end,
- {new, Exchange};
+ {new, X};
[ExistingX] ->
{existing, ExistingX}
end
end) of
{new, X} -> TypeModule:create(X),
- rabbit_event:notify(
- exchange_created,
- [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]),
+ rabbit_event:notify(exchange_created, info(X)),
X;
{existing, X} -> X;
Err -> Err
@@ -220,9 +167,9 @@ check_type(TypeBin) ->
end
end.
-assert_equivalence(X = #exchange{ durable = Durable,
+assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
- type = Type},
+ type = Type},
Type, Durable, AutoDelete, RequiredArgs) ->
(type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
@@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_args_equivalence(#exchange{ name = Name,
- arguments = Args },
+assert_args_equivalence(#exchange{ name = Name, arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
@@ -311,256 +257,20 @@ publish(X = #exchange{type = Type}, Seen, Delivery) ->
R
end.
-%% TODO: Should all of the route and binding management not be
-%% refactored to its own module, especially seeing as unbind will have
-%% to be implemented for 0.91 ?
-
-delete_exchange_bindings(ExchangeName) ->
- [begin
- ok = mnesia:delete_object(rabbit_reverse_route,
- reverse_route(Route), write),
- ok = delete_forward_routes(Route),
- Route#route.binding
- end || Route <- mnesia:match_object(
- rabbit_route,
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- write)].
-
-delete_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_forward_routes/1).
-
-delete_transient_queue_bindings(QueueName) ->
- delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
-
-delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- Route = reverse_route(ReverseRoute),
- ok = FwdDeleteFun(Route),
- ok = mnesia:delete_object(rabbit_reverse_route,
- ReverseRoute, write),
- Route#route.binding
- end || ReverseRoute
- <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(#route{binding = #binding{
- queue_name = QueueName,
- _ = '_'}}),
- write)],
- Cleanup = cleanup_deleted_queue_bindings(
- lists:keysort(#binding.exchange_name, DeletedBindings), []),
- fun () ->
- lists:foreach(
- fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, Bs);
- not_deleted -> Module:remove_bindings(X, Bs)
- end
- end, Cleanup)
- end.
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], Acc) ->
- Acc;
-cleanup_deleted_queue_bindings(
- [B = #binding{exchange_name = ExchangeName} | Bs], Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc).
-
-cleanup_deleted_queue_bindings(
- ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs],
- Bindings, Acc) ->
- cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc);
-cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) ->
- %% either Deleted is [], or its head has a non-matching ExchangeName
- NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc],
- cleanup_deleted_queue_bindings(Deleted, NewAcc).
-
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X] = mnesia:read({rabbit_exchange, ExchangeName}),
- {maybe_auto_delete(X), Bindings}.
-
-delete_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write),
- ok = mnesia:delete_object(rabbit_durable_route, Route, write).
-
-delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(rabbit_route, Route, write).
-
-contains(Table, MatchHead) ->
- continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
-
-continue('$end_of_table') -> false;
-continue({[_|_], _}) -> true;
-continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
-
-call_with_exchange(Exchange, Fun) ->
+call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
end).
-call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({rabbit_exchange, Exchange}),
- mnesia:read({rabbit_queue, Queue})} of
- {[X], [Q]} -> Fun(X, Q);
- {[ ], [_]} -> {error, exchange_not_found};
- {[_], [ ]} -> {error, queue_not_found};
- {[ ], [ ]} -> {error, exchange_and_queue_not_found}
- end
- end).
-
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(X, Q) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- ok = sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- rabbit_event:notify(
- binding_created,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName},
- {routing_key, RoutingKey},
- {arguments, Arguments}]),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Exchange = #exchange{ type = Type }, Binding} ->
- (type_to_module(Type)):add_binding(Exchange, Binding);
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
-
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
- case binding_action(
- ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X, Q, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- _ ->
- case InnerFun(X, Q) of
- ok ->
- ok =
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- rabbit_event:notify(
- binding_deleted,
- [{exchange_name, ExchangeName},
- {queue_name, QueueName}]),
- {maybe_auto_delete(X), B};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {{IsDeleted, X = #exchange{ type = Type }}, B} ->
- Module = type_to_module(Type),
- case IsDeleted of
- auto_deleted -> Module:delete(X, [B]);
- not_deleted -> Module:remove_bindings(X, [B])
- end
- end.
-
-binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) ->
- call_with_exchange_and_queue(
- ExchangeName, QueueName,
- fun (X, Q) ->
- Fun(X, Q, #binding{
- exchange_name = ExchangeName,
- queue_name = QueueName,
- key = RoutingKey,
- args = rabbit_misc:sort_field_table(Arguments)})
- end).
-
-sync_binding(Binding, Durable, Fun) ->
- ok = case Durable of
- true -> Fun(rabbit_durable_route,
- #route{binding = Binding}, write);
- false -> ok
- end,
- {Route, ReverseRoute} = route_with_reverse(Binding),
- ok = Fun(rabbit_route, Route, write),
- ok = Fun(rabbit_reverse_route, ReverseRoute, write),
- ok.
-
-list_bindings(VHostPath) ->
- [{ExchangeName, QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- key = RoutingKey,
- queue_name = QueueName,
- args = Arguments}}
- <- mnesia:dirty_match_object(
- rabbit_route,
- #route{binding = #binding{
- exchange_name = rabbit_misc:r(VHostPath, exchange),
- _ = '_'},
- _ = '_'})].
-
-route_with_reverse(#route{binding = Binding}) ->
- route_with_reverse(Binding);
-route_with_reverse(Binding = #binding{}) ->
- Route = #route{binding = Binding},
- {Route, reverse_route(Route)}.
-
-reverse_route(#route{binding = Binding}) ->
- #reverse_route{reverse_binding = reverse_binding(Binding)};
-
-reverse_route(#reverse_route{reverse_binding = Binding}) ->
- #route{binding = reverse_binding(Binding)}.
-
-reverse_binding(#reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args};
-
-reverse_binding(#binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}) ->
- #reverse_binding{exchange_name = Exchange,
- queue_name = Queue,
- key = Key,
- args = Args}.
-
-delete(ExchangeName, IfUnused) ->
+delete(XName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
false -> fun unconditional_delete/1
end,
- case call_with_exchange(ExchangeName, Fun) of
+ case call_with_exchange(XName, Fun) of
{deleted, X = #exchange{type = Type}, Bs} ->
(type_to_module(Type)):delete(X, Bs),
ok;
@@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) ->
Error
end.
-maybe_auto_delete(Exchange = #exchange{auto_delete = false}) ->
- {not_deleted, Exchange};
-maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
- case conditional_delete(Exchange) of
- {error, in_use} -> {not_deleted, Exchange};
- {deleted, Exchange, []} -> {auto_deleted, Exchange}
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ not_deleted;
+maybe_auto_delete(#exchange{auto_delete = true} = X) ->
+ case conditional_delete(X) of
+ {error, in_use} -> not_deleted;
+ {deleted, X, []} -> auto_deleted
end.
-conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
- %% we need to check for durable routes here too in case a bunch of
- %% routes to durable queues have been removed temporarily as a
- %% result of a node failure
- case contains(rabbit_route, Match) orelse
- contains(rabbit_durable_route, Match) of
- false -> unconditional_delete(Exchange);
+conditional_delete(X = #exchange{name = XName}) ->
+ case rabbit_binding:has_for_exchange(XName) of
+ false -> unconditional_delete(X);
true -> {error, in_use}
end.
-unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = delete_exchange_bindings(ExchangeName),
- ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
- {deleted, Exchange, Bindings}.
-
-%%----------------------------------------------------------------------------
-%% EXTENDED API
-%% These are API calls that are not used by the server internally,
-%% they are exported for embedded clients to use
-
-%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
-%% return {QueueName, RoutingKey, Arguments} tuples
-list_exchange_bindings(ExchangeName) ->
- Route = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- [{QueueName, RoutingKey, Arguments} ||
- #route{binding = #binding{queue_name = QueueName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
-
-% Refactoring is left as an exercise for the reader
-list_queue_bindings(QueueName) ->
- Route = #route{binding = #binding{queue_name = QueueName,
- _ = '_'}},
- [{ExchangeName, RoutingKey, Arguments} ||
- #route{binding = #binding{exchange_name = ExchangeName,
- key = RoutingKey,
- args = Arguments}}
- <- mnesia:dirty_match_object(rabbit_route, Route)].
+unconditional_delete(X = #exchange{name = XName}) ->
+ Bindings = rabbit_binding:remove_for_exchange(XName),
+ ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange, XName}),
+ rabbit_event:notify(exchange_deleted, [{name, XName}]),
+ {deleted, X, Bindings}.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 44607398..0a59a175 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -79,8 +79,8 @@ parse_x_match(Other) ->
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
-%% (rabbit_misc:sort_field_table) that route/3 and
-%% rabbit_exchange:{add,delete}_binding/4 do.
+%% (rabbit_misc:sort_field_table) that publish/1 and
+%% rabbit_binding:{add,remove}/2 do.
%%
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index da7078f1..c323d7ce 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2]).
+ handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1]).
@@ -107,7 +107,7 @@ get_limit(undefined) ->
get_limit(Pid) ->
rabbit_misc:with_exit_handler(
fun () -> 0 end,
- fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+ fun () -> gen_server2:call(Pid, get_limit, infinity) end).
block(undefined) ->
ok;
@@ -126,6 +126,9 @@ unblock(LimiterPid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+prioritise_call(get_limit, _From, _State) -> 9;
+prioritise_call(_Msg, _From, _State) -> 0.
+
handle_call({can_send, _QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, State};
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
deleted file mode 100644
index e0457b1e..00000000
--- a/src/rabbit_load.erl
+++ /dev/null
@@ -1,78 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_load).
-
--export([local_load/0, remote_loads/0, pick/0]).
-
--define(FUDGE_FACTOR, 0.98).
--define(TIMEOUT, 100).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}).
--spec(local_load/0 :: () -> load()).
--spec(remote_loads/0 :: () -> [load()]).
--spec(pick/0 :: () -> node()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-local_load() ->
- LoadAvg = case whereis(cpu_sup) of
- undefined -> unknown;
- _ -> case cpu_sup:avg1() of
- L when is_integer(L) -> L;
- {error, timeout} -> unknown
- end
- end,
- {{statistics(run_queue), LoadAvg}, node()}.
-
-remote_loads() ->
- {ResL, _BadNodes} =
- rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT),
- ResL.
-
-pick() ->
- RemoteLoads = remote_loads(),
- {{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node; we rely on Erlang's term order
- %% of SomeFloat < local_unknown < unknown.
- AdjustedLoadAvg = case LoadAvg of
- unknown -> local_unknown;
- _ -> LoadAvg * ?FUDGE_FACTOR
- end,
- Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
- {_, SelectedNode} = lists:min(Loads),
- SelectedNode.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b2768a13..208f71f0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,14 +34,14 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/1,
+ sync/3, client_init/2, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1,
register_sync_callback/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -102,8 +102,7 @@
}).
-record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size,
- locked, readers}).
+ {file, valid_total_size, left, right, file_size, locked, readers}).
%%----------------------------------------------------------------------------
@@ -142,7 +141,7 @@
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()).
--spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), rabbit_guid:guid()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
@@ -165,8 +164,7 @@
%% {Guid, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
-%% {File, ValidTotalSize, ContiguousTop, Left, Right,
-%% FileSize, Locked, Readers}
+%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
%%
%% The basic idea is that messages are appended to the current file up
%% until that file becomes too big (> file_size_limit). At that point,
@@ -182,9 +180,7 @@
%%
%% As messages are removed from files, holes appear in these
%% files. The field ValidTotalSize contains the total amount of useful
-%% data left in the file, whilst ContiguousTop contains the amount of
-%% valid data right at the start of each file. These are needed for
-%% garbage collection.
+%% data left in the file. This is needed for garbage collection.
%%
%% When we discover that a file is now empty, we delete it. When we
%% discover that it can be combined with the useful data in either its
@@ -230,9 +226,7 @@
%% above B (i.e. truncate to the limit of the good contiguous region
%% at the start of the file), then write C and D on top and then write
%% E, F and G from the right file on top. Thus contiguous blocks of
-%% good data at the bottom of files are not rewritten (yes, this is
-%% the data the size of which is tracked by the ContiguousTop
-%% variable. Judicious use of a mirror is required).
+%% good data at the bottom of files are not rewritten.
%%
%% +-------+ +-------+ +-------+
%% | X | | G | | G |
@@ -335,8 +329,8 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:pcall(
- Server, 2, {read, Guid}, infinity),
+ Defer = fun() -> {gen_server2:call(
+ Server, {read, Guid}, infinity),
CState} end,
case index_lookup(Guid, CState) of
not_found -> Defer();
@@ -358,18 +352,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+sync(Server) -> gen_server2:cast(Server, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
+ gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
set_maximum_since_use(Server, Age) ->
- gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Server, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -381,19 +375,15 @@ client_init(Server, Ref) ->
cur_file_cache_ets = CurFileCacheEts,
client_ref = Ref}.
-client_terminate(CState) ->
- Self = self(),
- spawn(fun() ->
- gen_server2:call(Self, {client_terminate, CState})
- end),
- ok.
+client_terminate(CState, Server) ->
+ ok = gen_server2:call(Server, {client_terminate, CState}, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
- ok = client_terminate(CState),
- ok = gen_server2:cast(Server, {delete_client, Ref}).
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
- gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
+ gen_server2:call(Server, successfully_recovered_state, infinity).
register_sync_callback(Server, ClientRef, Fun) ->
gen_server2:call(Server, {register_sync_callback, ClientRef, Fun}, infinity).
@@ -597,6 +587,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {new_client_state, _Ref} -> 7;
+ successfully_recovered_state -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ sync -> 8;
+ {gc_done, _Reclaimed, _Source, _Destination} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ _ -> 0
+ end.
+
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
noreply(State1);
@@ -626,6 +632,7 @@ handle_call({register_sync_callback, ClientRef, Fun}, _From,
State = #msstate { client_ondisk_callback = CODC }) ->
reply(ok, State #msstate { client_ondisk_callback =
dict:store(ClientRef, Fun, CODC) });
+
handle_call({client_terminate, CState = #client_msstate { client_ref = CRef }},
_From,
State = #msstate { client_ondisk_callback = CODC,
@@ -657,20 +664,14 @@ handle_cast({write, CRef, Guid},
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
right = undefined,
locked = false,
file_size = FileSize }] =
ets:lookup(FileSummaryEts, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = case CurOffset =:= ContiguousTop of
- true -> ValidTotalSize1;
- false -> ContiguousTop
- end,
true = ets:update_element(
FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1},
{#file_summary.file_size, FileSize + TotalSize}]),
NextOffset = CurOffset + TotalSize,
noreply(
@@ -758,7 +759,7 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({delete_client, CRef},
+handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
noreply(
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
@@ -951,8 +952,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize } =
- index_lookup(Guid, State),
+ total_size = TotalSize } = index_lookup(Guid, State),
case RefCount of
1 ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
@@ -960,7 +960,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
%% msg.
ok = remove_cache_entry(DedupCacheEts, Guid),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
@@ -968,12 +967,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
add_to_pending_gc_completion({remove, Guid}, State);
false ->
ok = index_delete(Guid, State),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:update_element(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1}]),
+ true =
+ ets:update_element(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, ValidTotalSize1}]),
State1 = delete_file_if_empty(File, State),
State1 #msstate { sum_valid_data = SumValid - TotalSize }
end;
@@ -1320,16 +1318,17 @@ scan_file_for_valid_messages(Dir, FileName) ->
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
-find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
+drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0).
-find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids};
-find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
- ExpectedOffset, Guids) ->
+drop_contiguous_block_prefix([], ExpectedOffset) ->
+ {ExpectedOffset, []};
+drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset,
+ total_size = TotalSize } | Tail],
+ ExpectedOffset) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
- find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
-find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
- {ExpectedOffset, Guids}.
+ drop_contiguous_block_prefix(Tail, ExpectedOffset1);
+drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) ->
+ {ExpectedOffset, MsgsAfterGap}.
build_index(true, _StartupFunState,
State = #msstate { file_summary_ets = FileSummaryEts }) ->
@@ -1405,9 +1404,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
{VMAcc, VTSAcc}
end
end, {[], 0}, Messages),
- %% foldl reverses lists, find_contiguous_block_prefix needs
- %% msgs eldest first, so, ValidMessages is the right way round
- {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages),
{Right, FileSize1} =
case Files of
%% if it's the last file, we'll truncate to remove any
@@ -1424,7 +1420,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
ok = gatherer:in(Gatherer, #file_summary {
file = File,
valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
left = Left,
right = Right,
file_size = FileSize1,
@@ -1452,7 +1447,6 @@ maybe_roll_to_new_file(
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
- contiguous_top = 0,
left = CurFile,
right = undefined,
file_size = 0,
@@ -1579,7 +1573,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
true = ets:update_element(
FileSummaryEts, DstFile,
[{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.contiguous_top, TotalValidData},
{#file_summary.file_size, TotalValidData}]),
SrcFileSize + DstFileSize - TotalValidData;
false -> concurrent_readers
@@ -1590,7 +1583,6 @@ combine_files(#file_summary { file = Source,
left = Destination },
#file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
@@ -1606,41 +1598,32 @@ combine_files(#file_summary { file = Source,
%% the DestinationContiguousTop to a tmp file then truncate,
%% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
- case DestinationContiguousTop =:= DestinationValid of
- true ->
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize);
- false ->
- {DestinationWorkList, DestinationValid} =
- find_unremoved_messages_in_file(Destination, State),
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset =/= DestinationContiguousTop ->
- %% it cannot be that Offset =:=
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- end, DestinationWorkList),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and index_state has been updated to
- %% reflect the compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:delete(TmpHdl)
+ {DestinationWorkList, DestinationValid} =
+ find_unremoved_messages_in_file(Destination, State),
+ {DestinationContiguousTop, DestinationWorkListTail} =
+ drop_contiguous_block_prefix(DestinationWorkList),
+ case DestinationWorkListTail of
+ [] -> ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize);
+ _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE),
+ ok = copy_messages(
+ DestinationWorkListTail, DestinationContiguousTop,
+ DestinationValid, DestinationHdl, TmpHdl, Destination,
+ State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage
+ %% from Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:delete(TmpHdl)
end,
{SourceWorkList, SourceValid} =
find_unremoved_messages_in_file(Source, State),
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index c7948b7e..a7855bbf 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-record(gcstate,
{dir,
@@ -81,7 +81,7 @@ stop(Server) ->
gen_server2:call(Server, stop, infinity).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
%%----------------------------------------------------------------------------
@@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index c7a5a600..5cfd6a5c 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -310,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) ->
is_dead(Pid) ->
PidS = integer_to_list(Pid),
with_os([{unix, fun () ->
- Res = os:cmd("ps --no-headers --pid " ++ PidS),
- Res == ""
+ system("kill -0 " ++ PidS
+ ++ " >/dev/null 2>&1") /= 0
end},
{win32, fun () ->
Res = os:cmd("tasklist /nh /fi \"pid eq " ++
@@ -322,6 +322,16 @@ is_dead(Pid) ->
end
end}]).
+% Like system(3)
+system(Cmd) ->
+ ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
+ Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
+ receive {Port, {exit_status, Status}} -> Status end.
+
+% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
+escape_quotes(Cmd) ->
+ lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
+
call_all_nodes(Func) ->
case read_pids_file() of
[] -> throw(no_nodes_running);
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 6baa4b88..2286896b 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -46,7 +46,7 @@
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
-type(error() :: rabbit_types:error(any())).
--type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()).
+-type(socket() :: port() | #ssl_socket{}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
@@ -72,72 +72,58 @@
%%---------------------------------------------------------------------------
+-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
-async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
+async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
spawn(fun () -> Pid ! {inet_async, Sock, Ref,
- ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
- end),
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ end),
{ok, Ref};
-
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
-
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
-close(Sock) when is_record(Sock, ssl_socket) ->
+close(Sock) when ?IS_SSL(Sock) ->
ssl:close(Sock#ssl_socket.ssl);
-
close(Sock) when is_port(Sock) ->
gen_tcp:close(Sock).
-
-controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
-
controlling_process(Sock, Pid) when is_port(Sock) ->
gen_tcp:controlling_process(Sock, Pid).
-
-getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
inet:getstat(Sock#ssl_socket.tcp, Stats);
-
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
-
-peername(Sock) when is_record(Sock, ssl_socket) ->
+peername(Sock) when ?IS_SSL(Sock) ->
ssl:peername(Sock#ssl_socket.ssl);
-
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
-
-port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
+port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
- ok ->
- self() ! {inet_reply, Sock, ok},
- true;
- {error, Reason} ->
- erlang:error(Reason)
+ ok -> self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} -> erlang:error(Reason)
end;
-
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
-send(Sock, Data) when is_record(Sock, ssl_socket) ->
+send(Sock, Data) when ?IS_SSL(Sock) ->
ssl:send(Sock#ssl_socket.ssl, Data);
-
send(Sock, Data) when is_port(Sock) ->
gen_tcp:send(Sock, Data).
-sockname(Sock) when is_record(Sock, ssl_socket) ->
+sockname(Sock) when ?IS_SSL(Sock) ->
ssl:sockname(Sock#ssl_socket.ssl);
-
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index f656e04c..6dbd54d2 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -46,6 +46,8 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
+-include_lib("ssl/src/ssl_record.hrl").
+
-define(RABBIT_TCP_OPTS, [
binary,
@@ -107,8 +109,37 @@ boot_ssl() ->
ok;
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
- {ok, SslOpts} = application:get_env(ssl_options),
- [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ {ok, SslOptsConfig} = application:get_env(ssl_options),
+ % unknown_ca errors are silently ignored prior to R14B unless we
+ % supply this verify_fun - remove when at least R14B is required
+ SslOpts =
+ case proplists:get_value(verify, SslOptsConfig, verify_none) of
+ verify_none -> SslOptsConfig;
+ verify_peer -> [{verify_fun, fun([]) -> true;
+ ([_|_]) -> false
+ end}
+ | SslOptsConfig]
+ end,
+ % In R13B04 and R14A (at least), rc4 is incorrectly implemented.
+ CipherSuites = proplists:get_value(ciphers,
+ SslOpts,
+ ssl:cipher_suites()),
+ FilteredCipherSuites =
+ [C || C <- CipherSuites,
+ begin
+ SuiteCode =
+ if is_tuple(C) -> ssl_cipher:suite(C);
+ is_list(C) -> ssl_cipher:openssl_suite(C)
+ end,
+ SP = ssl_cipher:security_parameters(
+ SuiteCode,
+ #security_parameters{}),
+ SP#security_parameters.bulk_cipher_algorithm =/= ?RC4
+ end],
+ SslOpts1 = [{ciphers, FilteredCipherSuites}
+ | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]],
+ [start_ssl_listener(Host, Port, SslOpts1)
+ || {Host, Port} <- SslListeners],
ok
end.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 26274a36..b23776cd 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -51,7 +51,7 @@
%%----------------------------------------------------------------------------
start() ->
- io:format("Activating RabbitMQ plugins ..."),
+ io:format("Activating RabbitMQ plugins ...~n"),
%% Ensure Rabbit is loaded so we can access it's environment
application:load(rabbit),
@@ -130,7 +130,7 @@ start() ->
ok -> ok;
error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
- io:format("~n~w plugins activated:~n", [length(PluginApps)]),
+ io:format("~w plugins activated:~n", [length(PluginApps)]),
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
|| App <- PluginApps],
io:nl(),
@@ -151,29 +151,33 @@ determine_version(App) ->
{ok, Vsn} = application:get_key(App, vsn),
{App, Vsn}.
-assert_dir(Dir) ->
- case filelib:is_dir(Dir) of
- true -> ok;
- false -> ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
- end.
-
-delete_dir(Dir) ->
- case filelib:is_dir(Dir) of
+delete_recursively(Fn) ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
true ->
- case file:list_dir(Dir) of
+ case file:list_dir(Fn) of
{ok, Files} ->
- [case Dir ++ "/" ++ F of
- Fn ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true -> delete_dir(Fn);
- false -> file:delete(Fn)
- end
- end || F <- Files]
- end,
- ok = file:del_dir(Dir);
+ case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
+ Fn ++ "/" ++ Fn1);
+ (_Fn1, Err) -> Err
+ end, ok, Files) of
+ ok -> case file:del_dir(Fn) of
+ ok -> ok;
+ {error, E} -> {error,
+ {cannot_delete, Fn, E}}
+ end;
+ Err -> Err
+ end;
+ {error, E} ->
+ {error, {cannot_list_files, Fn, E}}
+ end;
false ->
- ok
+ case filelib:is_file(Fn) of
+ true -> case file:delete(Fn) of
+ ok -> ok;
+ {error, E} -> {error, {cannot_delete, Fn, E}}
+ end;
+ false -> ok
+ end
end.
is_symlink(Name) ->
@@ -182,13 +186,18 @@ is_symlink(Name) ->
_ -> false
end.
-unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+unpack_ez_plugins(SrcDir, DestDir) ->
%% Eliminate the contents of the destination directory
- delete_dir(PluginDestDir),
-
- assert_dir(PluginDestDir),
- [unpack_ez_plugin(PluginName, PluginDestDir) ||
- PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+ [unpack_ez_plugin(PluginName, DestDir) ||
+ PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
unpack_ez_plugin(PluginFn, PluginDestDir) ->
zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
@@ -247,8 +256,8 @@ post_process_script(ScriptFile) ->
{error, {failed_to_load_script, Reason}}
end.
-process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) ->
- [Entry, {apply,{rabbit,prepare,[]}}];
+process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
+ [{apply,{rabbit,prepare,[]}}, Entry];
process_entry(Entry) ->
[Entry].
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 09ada1c0..745e0083 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -339,7 +339,7 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw(E);
{channel_exit, ChannelOrFrPid, Reason} ->
mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'EXIT', ChSupPid, Reason} ->
+ {'DOWN', _MRef, process, ChSupPid, Reason} ->
mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
terminate_connection ->
State;
@@ -369,10 +369,8 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
end),
mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
- internal_emit_stats(State),
- mainloop(Deb, State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ State1 = internal_emit_stats(State),
+ mainloop(Deb, State1);
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);
@@ -489,7 +487,7 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'EXIT', ChSupPid, Reason} ->
+ {'DOWN', _MRef, process, ChSupPid, Reason} ->
case channel_cleanup(ChSupPid) of
undefined ->
exit({abnormal_dependent_exit, ChSupPid, Reason});
@@ -690,11 +688,14 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer}) ->
+ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
+ connection_state = running}) ->
Self = self(),
- State#v1{stats_timer = rabbit_event:ensure_stats_timer_after(
+ State#v1{stats_timer = rabbit_event:ensure_stats_timer(
StatsTimer,
- fun() -> emit_stats(Self) end)}.
+ fun() -> emit_stats(Self) end)};
+ensure_stats_timer(State) ->
+ State.
%%--------------------------------------------------------------------------
@@ -765,7 +766,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock}) ->
+ sock = Sock,
+ stats_timer = StatsTimer}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -773,9 +775,10 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
connection = NewConnection}),
- rabbit_event:notify(
- connection_created,
- [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]),
+ rabbit_event:notify(connection_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(StatsTimer,
+ fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
@@ -868,6 +871,7 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), Username, VHost, Collector}),
+ erlang:monitor(process, ChSupPid),
put({channel, Channel}, {ch_fr_pid, ChFrPid}),
put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
put({ch_fr_pid, ChFrPid}, {channel, Channel}),
@@ -938,6 +942,6 @@ amqp_exception_explanation(Text, Expl) ->
true -> CompleteTextBin
end.
-internal_emit_stats(State) ->
- rabbit_event:notify(connection_stats,
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
+internal_emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+ rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
+ State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index aaf3ae92..e5ffe863 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -33,9 +33,7 @@
-include_lib("stdlib/include/qlc.hrl").
-include("rabbit.hrl").
--export([deliver/2,
- match_bindings/2,
- match_routing_key/2]).
+-export([deliver/2, match_bindings/2, match_routing_key/2]).
%%----------------------------------------------------------------------------
@@ -45,9 +43,15 @@
-type(routing_key() :: binary()).
-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(qpids() :: [pid()]).
-spec(deliver/2 ::
- ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
+ (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+-spec(match_bindings/2 :: (rabbit_exchange:name(),
+ fun ((rabbit_types:binding()) -> boolean())) ->
+ qpids()).
+-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') ->
+ qpids()).
-endif.
@@ -89,10 +93,10 @@ deliver(QPids, Delivery = #delivery{msg_seq_no = MsgSeqNo}) ->
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(Name, Match) ->
Query = qlc:q([QName || #route{binding = Binding = #binding{
- exchange_name = ExchangeName,
+ exchange_name = XName,
queue_name = QName}} <-
mnesia:table(rabbit_route),
- ExchangeName == Name,
+ XName == Name,
Match(Binding)]),
lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c8af7c10..27235154 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -972,6 +972,8 @@ test_user_management() ->
{error, {user_already_exists, _}} =
control_action(add_user, ["foo", "bar"]),
ok = control_action(change_password, ["foo", "baz"]),
+ ok = control_action(set_admin, ["foo"]),
+ ok = control_action(clear_admin, ["foo"]),
ok = control_action(list_users, []),
%% vhost creation
@@ -1037,7 +1039,15 @@ test_server_status() ->
ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true),
%% list bindings
- ok = control_action(list_bindings, []),
+ ok = info_action(list_bindings, rabbit_binding:info_keys(), true),
+ %% misc binding listing APIs
+ [_|_] = rabbit_binding:list_for_exchange(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>)),
+ [_] = rabbit_binding:list_for_queue(
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ [_] = rabbit_binding:list_for_exchange_and_queue(
+ rabbit_misc:r(<<"/">>, exchange, <<"">>),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
[#listener{host = H, port = P} | _] =
@@ -1460,7 +1470,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1523,7 +1533,7 @@ test_msg_store() ->
ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1548,7 +1558,7 @@ test_msg_store() ->
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9)),
+ msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl
deleted file mode 100644
index 484249b1..00000000
--- a/src/rabbit_tracer.erl
+++ /dev/null
@@ -1,50 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_tracer).
--export([start/0]).
-
--import(erlang).
-
-start() ->
- spawn(fun mainloop/0),
- ok.
-
-mainloop() ->
- erlang:trace(new, true, [all]),
- mainloop1().
-
-mainloop1() ->
- receive
- Msg ->
- rabbit_log:info("TRACE: ~p~n", [Msg])
- end,
- mainloop1().
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 47e8bb01..0b6a15ec 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -38,7 +38,7 @@
-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
- amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
+ amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2,
ok_pid_or_error/0, channel_exit/0, connection_exit/0]).
@@ -107,8 +107,6 @@
kind :: Kind,
name :: Name}).
--type(ssl_socket() :: #ssl_socket{}).
-
-type(listener() ::
#listener{node :: node(),
protocol :: atom(),
@@ -118,7 +116,8 @@
-type(binding() ::
#binding{exchange_name :: rabbit_exchange:name(),
queue_name :: rabbit_amqqueue:name(),
- key :: rabbit_exchange:binding_key()}).
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: rabbit_amqqueue:name(),
@@ -141,7 +140,8 @@
-type(user() ::
#user{username :: rabbit_access_control:username(),
- password :: rabbit_access_control:password()}).
+ password :: rabbit_access_control:password(),
+ is_admin :: boolean()}).
-type(ok(A) :: {'ok', A}).
-type(error(A) :: {'error', A}).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 62bd38f2..6521c544 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -458,9 +458,10 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ _ -> rabbit_msg_store:client_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -483,8 +484,7 @@ delete_and_terminate(State) ->
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
- rabbit_msg_store:client_terminate(MSCStateP)
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
end,
rabbit_msg_store:client_delete_and_terminate(
MSCStateT, ?TRANSIENT_MSG_STORE, TRef),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index fd5b5ba5..aa986e54 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -114,41 +114,24 @@ mainloop1(ReaderPid, State) ->
erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
end.
-handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({send_command, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
State;
-handle_message({send_command, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({send_command, MethodRecord, Content}, State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
State;
-handle_message({send_command_sync, From, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
gen_server:reply(From, ok),
State;
-handle_message({send_command_sync, From, {MethodRecord, Content}},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
- State = #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax, Protocol),
+ State) ->
+ ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -169,10 +152,10 @@ send_command(W, MethodRecord, Content) ->
ok.
send_command_sync(W, MethodRecord) ->
- call(W, send_command_sync, MethodRecord).
+ call(W, {send_command_sync, MethodRecord}).
send_command_sync(W, MethodRecord, Content) ->
- call(W, send_command_sync, {MethodRecord, Content}).
+ call(W, {send_command_sync, MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
@@ -180,16 +163,16 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
%---------------------------------------------------------------------------
-call(Pid, Label, Msg) ->
- {ok, Res} = gen:call(Pid, Label, Msg, infinity),
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
%---------------------------------------------------------------------------
assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
- Protocol).
+ rabbit_binary_generator:build_simple_method_frame(
+ Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
@@ -231,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+internal_send_command_async(MethodRecord,
+ #wstate{sock = Sock,
+ channel = Channel,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
- Protocol) ->
+internal_send_command_async(MethodRecord, Content,
+ #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
Content, FrameMax, Protocol)),
ok.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 4a1c5832..93adfcb1 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -34,7 +34,9 @@
%% 4) Added an 'intrinsic' restart type. Like the transient type, this
%% type means the child should only be restarted if the child exits
%% abnormally. Unlike the transient type, if the child exits
-%% normally, the supervisor itself also exits normally.
+%% normally, the supervisor itself also exits normally. If the
+%% child is a supervisor and it exits normally (i.e. with reason of
+%% 'shutdown') then the child's parent also exits normally.
%%
%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
@@ -545,6 +547,9 @@ do_restart(permanent, Reason, Child, State) ->
restart(Child, State);
do_restart(intrinsic, normal, Child, State) ->
{shutdown, state_del_child(Child, State)};
+do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
+ State) ->
+ {shutdown, state_del_child(Child, State)};
do_restart(_, normal, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState};
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 42049d50..f461a539 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -38,7 +38,7 @@
-export([set_maximum_since_use/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -71,7 +71,7 @@ submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
set_maximum_since_use(Pid, Age) ->
- gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Pid, {set_maximum_since_use, Age}).
run({M, F, A}) ->
apply(M, F, A);
@@ -88,6 +88,9 @@ init([WId]) ->
{ok, WId, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8;
+prioritise_cast(_Msg, _State) -> 0.
+
handle_call({submit, Fun}, From, WId) ->
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),