diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-16 11:06:01 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-16 11:06:01 +0100 |
commit | 50808a297d8acf8f192c436e0152b8f2f95ca3cd (patch) | |
tree | a851d13da3f9b8bd83ba74727644fb5657d18dfe | |
parent | b6c97f183040b7a68bae989677e9f05f2e204d16 (diff) | |
parent | 9df14122325caa6acf00e30de2e2062089a5dff1 (diff) | |
download | rabbitmq-server-50808a297d8acf8f192c436e0152b8f2f95ca3cd.tar.gz |
merge default into bug23047
52 files changed, 2577 insertions, 2047 deletions
@@ -5,7 +5,6 @@ RABBITMQ_NODENAME ?= rabbit RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) -RABBITMQ_PLUGINS_EXPAND_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-plugins-scratch DEPS_FILE=deps.mk SOURCE_DIR=src @@ -153,8 +152,7 @@ BASIC_SCRIPT_ENVIRONMENT_SETTINGS=\ RABBITMQ_NODE_IP_ADDRESS="$(RABBITMQ_NODE_IP_ADDRESS)" \ RABBITMQ_NODE_PORT="$(RABBITMQ_NODE_PORT)" \ RABBITMQ_LOG_BASE="$(RABBITMQ_LOG_BASE)" \ - RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" \ - RABBITMQ_PLUGINS_EXPAND_DIR="$(RABBITMQ_PLUGINS_EXPAND_DIR)" + RABBITMQ_MNESIA_DIR="$(RABBITMQ_MNESIA_DIR)" run: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ @@ -245,7 +243,7 @@ distclean: clean %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + xmlto -o $(DOCS_DIR) $$opt man $<.tmp && \ gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 662dbea0..c325bb5a 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -30,7 +30,7 @@ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. </p> <p> - <a href="manpages.html">See a list of all manual pages</a>. + <a href="../manpages.html">See a list of all manual pages</a>. </p> </xsl:when> <xsl:otherwise> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 33552e17..5179eb25 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -417,7 +417,8 @@ <screen role="example">rabbitmqctl add_user tonyg changeit</screen> <para role="example"> This command instructs the RabbitMQ broker to create a - user named <command>tonyg</command> with (initial) password + (non-administrative) user named <command>tonyg</command> with + (initial) password <command>changeit</command>. </para> </listitem> @@ -465,13 +466,57 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose administrative + status is to be set.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl set_admin tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to ensure the user + named <command>tonyg</command> is an administrator. This has no + effect when the user logs in via AMQP, but can be used to permit + the user to manage users, virtual hosts and permissions when the + user logs in via some other means (for example with the + management plugin). + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>clear_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose administrative + status is to be cleared.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_admin tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to ensure the user + named <command>tonyg</command> is not an administrator. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>list_users</command></cmdsynopsis></term> <listitem> <para>Lists users</para> <para role="example-prefix">For example:</para> <screen role="example">rabbitmqctl list_users</screen> <para role="example"> - This command instructs the RabbitMQ broker to list all users. + This command instructs the RabbitMQ broker to list all + users. Each result row will contain the user name and + the administrator status of the user, in that order. </para> </listitem> </varlistentry> @@ -704,7 +749,7 @@ <variablelist> <varlistentry> <term>name</term> - <listitem><para>The name of the queue with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>The name of the queue with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>durable</term> @@ -795,7 +840,7 @@ <variablelist> <varlistentry> <term>name</term> - <listitem><para>The name of the exchange with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>The name of the exchange with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>type</term> @@ -830,22 +875,58 @@ </para> </listitem> </varlistentry> - </variablelist> - <variablelist> - <varlistentry> - <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg></cmdsynopsis></term> + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_bindings</command> <arg choice="opt">-p <replaceable>vhostpath</replaceable></arg> <arg choice="opt" role="usage-option-list"><replaceable>bindinginfoitem</replaceable> ...</arg></cmdsynopsis></term> <listitem> <para> - By default the bindings for the <command>/</command> virtual - host are returned. The "-p" flag can be used to override - this default. Each result row will contain an exchange - name, queue name, routing key and binding arguments, in - that order. Non-ASCII characters will be URL-encoded. + Returns binding details. By default the bindings for + the <command>/</command> virtual host are returned. The + "-p" flag can be used to override this default. </para> - <para role="usage"> - The output format for "list_bindings" is a list of rows containing - exchange name, queue name, routing key and arguments, in that order. + <para> + The <command>bindinginfoitem</command> parameter is used + to indicate which binding information items to include + in the results. The column order in the results will + match the order of the parameters. + <command>bindinginfoitem</command> can take any value + from the list that follows: + </para> + <variablelist> + <varlistentry> + <term>exchange_name</term> + <listitem><para>The name of the exchange to which the + binding is attached. with non-ASCII characters + escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>queue_name</term> + <listitem><para>The name of the queue to which the + binding is attached. with non-ASCII characters + escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>routing_key</term> + <listitem><para>The binding's routing key, with + non-ASCII characters escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>arguments</term> + <listitem><para>The binding's arguments.</para></listitem> + </varlistentry> + </variablelist> + <para> + If no <command>bindinginfoitem</command>s are specified then + all above items are displayed. + </para> + <para role="example-prefix"> + For example: + </para> + <screen role="example">rabbitmqctl list_bindings -p /myvhost exchange_name queue_name</screen> + <para role="example"> + This command displays the exchange name and queue name + of the bindings in the virtual host + named <command>/myvhost</command>. </para> </listitem> </varlistentry> @@ -904,7 +985,7 @@ </varlistentry> <varlistentry> <term>vhost</term> - <listitem><para>Virtual host name with non-ASCII characters URL-escaped.</para></listitem> + <listitem><para>Virtual host name with non-ASCII characters escaped as in C.</para></listitem> </varlistentry> <varlistentry> <term>timeout</term> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 48e19ff8..4be09c5a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -26,6 +26,7 @@ {queue_index_max_journal_entries, 262144}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, + {default_user_is_admin, true}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {collect_statistics, none}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b9abd788..24aa8d98 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --record(user, {username, password}). +-record(user, {username, password, is_admin}). -record(permission, {scope, configure, write, read}). -record(user_vhost, {username, virtual_host}). -record(user_permission, {user_vhost, permission}). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c0d9aeda..eb0a2a51 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -127,6 +127,12 @@ done rm -rf %{buildroot} %changelog +* Tue Sep 14 2010 marek@rabbitmq.com 2.1.0-1 +- New Upstream Release + +* Mon Aug 23 2010 mikeb@rabbitmq.com 2.0.0-1 +- New Upstream Release + * Wed Jul 14 2010 Emile Joubert <emile@rabbitmq.com> 1.8.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 0dccf938..9927cfbc 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,15 @@ +rabbitmq-server (2.1.0-1) lucid; urgency=low + + * New Upstream Release + + -- Marek Majkowski <marek@rabbitmq.com> Tue, 14 Sep 2010 14:20:17 +0100 + +rabbitmq-server (2.0.0-1) karmic; urgency=low + + * New Upstream Release + + -- Michael Bridgen <mikeb@rabbitmq.com> Mon, 23 Aug 2010 14:55:39 +0100 + rabbitmq-server (1.8.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index a44f49a0..4afc66ac 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -1,7 +1,7 @@ Source: rabbitmq-server Section: net Priority: extra -Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com> +Maintainer: RabbitMQ Team <packaging@rabbitmq.com> Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc Standards-Version: 3.8.0 diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index 4eade6c7..c4e01f4a 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) \ diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index f47b5340..abe174e0 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,7 +4,6 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz $(MAKE) -C $(SOURCE_DIR) diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index d52dc774..9310752f 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -39,7 +39,6 @@ CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia -PLUGINS_EXPAND_DIR=/var/lib/rabbitmq/plugins-scratch SERVER_START_ARGS= . `dirname $0`/rabbitmq-env @@ -70,7 +69,6 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${PLUGINS_EXPAND_DIR}" ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} @@ -91,14 +89,14 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ -pa "$RABBITMQ_EBIN_ROOT" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit plugins_expand_dir "\"${RABBITMQ_MNESIA_DIR}/plugins-scratch\"" \ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN_ROOT\"" \ -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" then - RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" + RABBITMQ_BOOT_FILE="${RABBITMQ_MNESIA_DIR}/plugins-scratch/rabbit" RABBITMQ_EBIN_PATH="" else exit 1 diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b1a91f47..5bcbc6ba 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -110,24 +110,21 @@ if "!RABBITMQ_MNESIA_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 95e5eebf..4b3961d4 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -180,24 +180,21 @@ if errorlevel 1 ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
- set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_BASE!\plugins-scratch
-)
-
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_plugin_activator ^
-rabbit plugins_dir \""!RABBITMQ_PLUGINS_DIR:\=/!"\" ^
--rabbit plugins_expand_dir \""!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"\" ^
+-rabbit plugins_expand_dir \""!RABBITMQ_MNESIA_DIR:\=/!/plugins-scratch"\" ^
-rabbit rabbit_ebin \""!RABBITMQ_EBIN_ROOT:\=/!"\" ^
-extra !STAR!
-if not exist "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" (
- echo Custom Boot File "!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit.boot" is missing.
+set RABBITMQ_BOOT_FILE=!RABBITMQ_MNESIA_DIR!\plugins-scratch\rabbit
+if not exist "!RABBITMQ_BOOT_FILE!.boot" (
+ echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
exit /B 1
)
-set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
+
set RABBITMQ_EBIN_PATH=
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index fe4bdc03..d2830a25 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -34,13 +34,15 @@ %% A File Handle Cache %% %% This extends a subset of the functionality of the Erlang file -%% module. +%% module. In the below, we use "file handle" to specifically refer to +%% file handles, and "file descriptor" to refer to descriptors which +%% are not file handles, e.g. sockets. %% %% Some constraints %% 1) This supports one writer, multiple readers per file. Nothing %% else. %% 2) Do not open the same file from different processes. Bad things -%% may happen. +%% may happen, especially for writes. %% 3) Writes are all appends. You cannot write to the middle of a %% file, although you can truncate and then append if you want. %% 4) Although there is a write buffer, there is no read buffer. Feel @@ -49,10 +51,10 @@ %% %% Some benefits %% 1) You do not have to remember to call sync before close -%% 2) Buffering is much more flexible than with plain file module, and -%% you can control when the buffer gets flushed out. This means that -%% you can rely on reads-after-writes working, without having to call -%% the expensive sync. +%% 2) Buffering is much more flexible than with the plain file module, +%% and you can control when the buffer gets flushed out. This means +%% that you can rely on reads-after-writes working, without having to +%% call the expensive sync. %% 3) Unnecessary calls to position and sync get optimised out. %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it @@ -60,14 +62,19 @@ %% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number -%% of open file handles in a "soft" way - the server will never -%% prevent a client from opening a handle, but may immediately tell it -%% to close the handle. Thus you can set the limit to zero and it will -%% still all work correctly, it is just that effectively no caching -%% will take place. The operation of limiting is as follows: +%% of open file descriptors. This is a hard limit: the server +%% component will ensure that clients do not have more file +%% descriptors open than it's configured to allow. %% -%% On open and close, the client sends messages to the server -%% informing it of opens and closes. This allows the server to keep +%% On open, the client requests permission from the server to open the +%% required number of file handles. The server may ask the client to +%% close other file handles that it has open, or it may queue the +%% request and ask other clients to close file handles they have open +%% in order to satisfy the request. Requests are always satisfied in +%% the order they arrive, even if a latter request (for a small number +%% of file handles) can be satisfied before an earlier request (for a +%% larger number of file handles). On close, the client sends a +%% message to the server. These messages allow the server to keep %% track of the number of open handles. The client also keeps a %% gb_tree which is updated on every use of a file handle, mapping the %% time at which the file handle was last used (timestamp) to the @@ -81,21 +88,38 @@ %% Note that this data can go very out of date, by the client using %% the least recently used handle. %% -%% When the limit is reached, the server calculates the average age of -%% the last reported least recently used file handle of all the -%% clients. It then tells all the clients to close any handles not -%% used for longer than this average, by invoking the callback the -%% client registered. The client should receive this message and pass -%% it into set_maximum_since_use/1. However, it is highly possible -%% this age will be greater than the ages of all the handles the -%% client knows of because the client has used its file handles in the -%% mean time. Thus at this point the client reports to the server the +%% When the limit is exceeded (i.e. the number of open file handles is +%% at the limit and there are pending 'open' requests), the server +%% calculates the average age of the last reported least recently used +%% file handle of all the clients. It then tells all the clients to +%% close any handles not used for longer than this average, by +%% invoking the callback the client registered. The client should +%% receive this message and pass it into +%% set_maximum_since_use/1. However, it is highly possible this age +%% will be greater than the ages of all the handles the client knows +%% of because the client has used its file handles in the mean +%% time. Thus at this point the client reports to the server the %% current timestamp at which its least recently used file handle was %% last used. The server will check two seconds later that either it %% is back under the limit, in which case all is well again, or if %% not, it will calculate a new average age. Its data will be much %% more recent now, and so it is very likely that when this is %% communicated to the clients, the clients will close file handles. +%% (In extreme cases, where it's very likely that all clients have +%% used their open handles since they last sent in an update, which +%% would mean that the average will never cause any file handles to +%% be closed, the server can send out an average age of 0, resulting +%% in all available clients closing all their file handles.) +%% +%% Care is taken to ensure that (a) processes which are blocked +%% waiting for file descriptors to become available are not sent +%% requests to close file handles; and (b) given it is known how many +%% file handles a process has open, when the average age is forced to +%% 0, close messages are only sent to enough processes to release the +%% correct number of file handles and the list of processes is +%% randomly shuffled. This ensures we don't cause processes to +%% needlessly close file handles, and ensures that we don't always +%% make such requests of the same processes. %% %% The advantage of this scheme is that there is only communication %% from the client to the server on open, close, and when in the @@ -103,11 +127,7 @@ %% communication from the client to the server on normal file handle %% operations. This scheme forms a feed-back loop - the server does %% not care which file handles are closed, just that some are, and it -%% checks this repeatedly when over the limit. Given the guarantees of -%% now(), even if there is just one file handle open, a limit of 1, -%% and one client, it is certain that when the client calculates the -%% age of the handle, it will be greater than when the server -%% calculated it, hence it should be closed. +%% checks this repeatedly when over the limit. %% %% Handles which are closed as a result of the server are put into a %% "soft-closed" state in which the handle is closed (data flushed out @@ -116,13 +136,24 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain and release_on_death. obtain/0 -%% blocks until a file descriptor is available. release_on_death/1 -%% takes a pid and monitors the pid, reducing the count by 1 when the -%% pid dies. Thus the assumption is that obtain/0 is called first, and -%% when that returns, release_on_death/1 is called with the pid who -%% "owns" the file descriptor. This is, for example, used to track the -%% use of file descriptors through network sockets. +%% The server also supports obtain and transfer. obtain/0 blocks until +%% a file descriptor is available, at which point the requesting +%% process is considered to 'own' one more descriptor. transfer/1 +%% transfers ownership of a file descriptor between processes. It is +%% non-blocking. Obtain is used to obtain permission to accept file +%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1 +%% macro. File handles can use the entire limit, but will be evicted +%% by obtain calls up to the point at which no more obtain calls can +%% be satisfied by the obtains limit. Thus there will always be some +%% capacity available for file handles. Processes that use obtain are +%% never asked to return them, and they are not managed in any way by +%% the server. It is simply a mechanism to ensure that processes that +%% need file descriptors such as sockets can do so in such a way that +%% the overall number of open file descriptors is managed. +%% +%% The callers of register_callback/3, obtain/0, and the argument of +%% transfer/1 are monitored, reducing the count of handles in use +%% appropriately when the processes terminate. -behaviour(gen_server). @@ -130,7 +161,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/1]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -148,7 +180,8 @@ -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). --define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,11 +215,26 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - client_mrefs, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -222,7 +270,11 @@ -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). --spec(obtain/1 :: (pid()) -> 'ok'). +-spec(obtain/0 :: () -> 'ok'). +-spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). +-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. @@ -249,9 +301,9 @@ open(Path, Mode, Options) -> IsWriter = is_writer(Mode1), case IsWriter andalso HasWriter of true -> {error, writer_exists}; - false -> Ref = make_ref(), - case open1(Path1, Mode1, Options, Ref, bof, new) of - {ok, _Handle} -> + false -> {ok, Ref} = new_closed_handle(Path1, Mode1, Options), + case get_or_reopen([{Ref, new}]) of + {ok, [_Handle1]} -> RCount1 = case is_reader(Mode1) of true -> RCount + 1; false -> RCount @@ -262,6 +314,7 @@ open(Path, Mode, Options) -> has_writer = HasWriter1 }), {ok, Ref}; Error -> + erase({Ref, fhc_handle}), Error end end. @@ -432,8 +485,8 @@ set_maximum_since_use(MaximumAge) -> case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> - Age = timer:now_diff(Now, Then), - case Hdl =/= closed andalso Age >= MaximumAge of + case Hdl =/= closed andalso + timer:now_diff(Now, Then) >= MaximumAge of true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; @@ -444,8 +497,17 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain(Pid) -> - gen_server:call(?SERVER, {obtain, Pid}, infinity). +obtain() -> + gen_server:call(?SERVER, {obtain, self()}, infinity). + +transfer(Pid) -> + gen_server:cast(?SERVER, {transfer, self(), Pid}). + +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). %%---------------------------------------------------------------------------- %% Internal functions @@ -462,18 +524,9 @@ append_to_write(Mode) -> end. with_handles(Refs, Fun) -> - ResHandles = lists:foldl( - fun (Ref, {ok, HandlesAcc}) -> - case get_or_reopen(Ref) of - {ok, Handle} -> {ok, [Handle | HandlesAcc]}; - Error -> Error - end; - (_Ref, Error) -> - Error - end, {ok, []}, Refs), - case ResHandles of + case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of {ok, Handles} -> - case Fun(lists:reverse(Handles)) of + case Fun(Handles) of {Result, Handles1} when is_list(Handles1) -> lists:zipwith(fun put_handle/2, Refs, Handles1), Result; @@ -502,17 +555,80 @@ with_flushed_handles(Refs, Fun) -> end end). -get_or_reopen(Ref) -> - case get({Ref, fhc_handle}) of - undefined -> - {error, not_open, Ref}; - #handle { hdl = closed, offset = Offset, - path = Path, mode = Mode, options = Options } -> - open1(Path, Mode, Options, Ref, Offset, reopen); - Handle -> - {ok, Handle} +get_or_reopen(RefNewOrReopens) -> + case partition_handles(RefNewOrReopens) of + {OpenHdls, []} -> + {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; + {OpenHdls, ClosedHdls} -> + Oldest = oldest(get_age_tree(), fun () -> now() end), + case gen_server:call(?SERVER, {open, self(), length(ClosedHdls), + Oldest}, infinity) of + ok -> + case reopen(ClosedHdls) of + {ok, RefHdls} -> sort_handles(RefNewOrReopens, + OpenHdls, RefHdls, []); + Error -> Error + end; + close -> + [soft_close(Ref, Handle) || + {{Ref, fhc_handle}, Handle = #handle { hdl = Hdl }} <- + get(), + Hdl =/= closed], + get_or_reopen(RefNewOrReopens) + end + end. + +reopen(ClosedHdls) -> reopen(ClosedHdls, get_age_tree(), []). + +reopen([], Tree, RefHdls) -> + put_age_tree(Tree), + {ok, lists:reverse(RefHdls)}; +reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, + path = Path, + mode = Mode, + offset = Offset, + last_used_at = undefined }} | + RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) -> + case file:open(Path, case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end) of + {ok, Hdl} -> + Now = now(), + {{ok, Offset1}, Handle1} = + maybe_seek(Offset, Handle #handle { hdl = Hdl, + offset = 0, + last_used_at = Now }), + Handle2 = Handle1 #handle { trusted_offset = Offset1 }, + put({Ref, fhc_handle}, Handle2), + reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + [{Ref, Handle2} | RefHdls]); + Error -> + %% NB: none of the handles in ToOpen are in the age tree + Oldest = oldest(Tree, fun () -> undefined end), + [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen], + put_age_tree(Tree), + Error end. +partition_handles(RefNewOrReopens) -> + lists:foldr( + fun ({Ref, NewOrReopen}, {Open, Closed}) -> + case get({Ref, fhc_handle}) of + #handle { hdl = closed } = Handle -> + {Open, [{Ref, NewOrReopen, Handle} | Closed]}; + #handle {} = Handle -> + {[{Ref, Handle} | Open], Closed} + end + end, {[], []}, RefNewOrReopens). + +sort_handles([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +sort_handles([{Ref, _} | RefHdls], [{Ref, Handle} | RefHdlsA], RefHdlsB, Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]); +sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> + sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). + put_handle(Ref, Handle = #handle { last_used_at = Then }) -> Now = now(), age_tree_update(Then, Now, Ref), @@ -528,21 +644,6 @@ get_age_tree() -> put_age_tree(Tree) -> put(fhc_age_tree, Tree). -age_tree_insert(Now, Ref) -> - Tree = get_age_tree(), - Tree1 = gb_trees:insert(Now, Ref, Tree), - {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest, - not gb_trees:is_empty(Tree)}, infinity) of - ok -> - put_age_tree(Tree1); - close -> - [soft_close(Ref1, Handle1) || - {{Ref1, fhc_handle}, Handle1 = #handle { hdl = Hdl1 }} <- get(), - Hdl1 =/= closed], - age_tree_insert(Now, Ref) - end. - age_tree_update(Then, Now, Ref) -> with_age_tree( fun (Tree) -> @@ -553,13 +654,7 @@ age_tree_delete(Then) -> with_age_tree( fun (Tree) -> Tree1 = gb_trees:delete_any(Then, Tree), - Oldest = case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = gb_trees:smallest(Tree1), - Oldest1 - end, + Oldest = oldest(Tree1, fun () -> undefined end), gen_server:cast(?SERVER, {close, self(), Oldest}), Tree1 end). @@ -575,44 +670,37 @@ age_tree_change() -> Tree end). -open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> - Mode1 = case NewOrReopen of - new -> Mode; - reopen -> [read | Mode] - end, - Now = now(), - age_tree_insert(Now, Ref), - case file:open(Path, Mode1) of - {ok, Hdl} -> - WriteBufferSize = - case proplists:get_value(write_buffer, Options, unbuffered) of - unbuffered -> 0; - infinity -> infinity; - N when is_integer(N) -> N - end, - Handle = #handle { hdl = Hdl, - offset = 0, - trusted_offset = 0, - is_dirty = false, - write_buffer_size = 0, - write_buffer_size_limit = WriteBufferSize, - write_buffer = [], - at_eof = false, - path = Path, - mode = Mode, - options = Options, - is_write = is_writer(Mode), - is_read = is_reader(Mode), - last_used_at = Now }, - {{ok, Offset1}, Handle1} = maybe_seek(Offset, Handle), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), - {ok, Handle2}; - {error, Reason} -> - age_tree_delete(Now), - {error, Reason} +oldest(Tree, DefaultFun) -> + case gb_trees:is_empty(Tree) of + true -> DefaultFun(); + false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + Oldest end. +new_closed_handle(Path, Mode, Options) -> + WriteBufferSize = + case proplists:get_value(write_buffer, Options, unbuffered) of + unbuffered -> 0; + infinity -> infinity; + N when is_integer(N) -> N + end, + Ref = make_ref(), + put({Ref, fhc_handle}, #handle { hdl = closed, + offset = 0, + trusted_offset = 0, + is_dirty = false, + write_buffer_size = 0, + write_buffer_size_limit = WriteBufferSize, + write_buffer = [], + at_eof = false, + path = Path, + mode = Mode, + options = Options, + is_write = is_writer(Mode), + is_read = is_reader(Mode), + last_used_at = undefined }), + {ok, Ref}. + soft_close(Ref, Handle) -> {Res, Handle1} = soft_close(Handle), case Res of @@ -626,7 +714,9 @@ soft_close(Handle = #handle { hdl = closed }) -> {ok, Handle}; soft_close(Handle) -> case write_buffer(Handle) of - {ok, #handle { hdl = Hdl, offset = Offset, is_dirty = IsDirty, + {ok, #handle { hdl = Hdl, + offset = Offset, + is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of true -> file:sync(Hdl); @@ -634,8 +724,10 @@ soft_close(Handle) -> end, ok = file:close(Hdl), age_tree_delete(Then), - {ok, Handle1 #handle { hdl = closed, trusted_offset = Offset, - is_dirty = false }}; + {ok, Handle1 #handle { hdl = closed, + trusted_offset = Offset, + is_dirty = false, + last_used_at = undefined }}; {_Error, _Handle} = Result -> Result end. @@ -722,129 +814,212 @@ init([]) -> Watermark > 0) -> Watermark; _ -> - ulimit() + case ulimit() of + infinity -> infinity; + unknown -> ?FILE_HANDLES_LIMIT_OTHER; + Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS]) + end end, - ObtainLimit = case Limit of - infinity -> infinity; - _ -> ?OBTAIN_LIMIT(Limit) - end, + ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, - open_pending = [], + open_pending = pending_new(), obtain_limit = ObtainLimit, obtain_count = 0, - obtain_pending = [], - callbacks = dict:new(), - client_mrefs = dict:new(), + obtain_pending = pending_new(), + clients = Clients, timer_ref = undefined }}. +handle_call({open, Pid, Requested, EldestUnusedSince}, From, + State = #fhc_state { open_count = Count, + open_pending = Pending, + elders = Elders, + clients = Clients }) + when EldestUnusedSince =/= undefined -> + Elders1 = dict:store(Pid, EldestUnusedSince, Elders), + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, + ok = track_client(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, + case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State1 #fhc_state { + open_pending = pending_in(Item, Pending) })}; + [#cstate { opened = Opened }] -> + true = ets:update_element( + Clients, Pid, + {#cstate.pending_closes, Opened}), + {reply, close, State1} + end; + false -> {noreply, run_pending_item(Item, State1)} + end; + handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> - {noreply, - State #fhc_state { obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> - case maybe_reduce(State #fhc_state { obtain_count = Count + 1 }) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [{obtain, Pid, From} | Pending], - elders = dict:erase(Pid, Elders) }}; - {false, State1} -> - _MRef = erlang:monitor(process, Pid), - {reply, ok, State1} + clients = Clients }) -> + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + ok = track_client(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, reduce(State #fhc_state { + obtain_pending = pending_in(Item, Pending) })}; + false -> + {noreply, run_pending_item(Item, State)} end; - -handle_call({open, Pid, EldestUnusedSince, CanClose}, From, - State = #fhc_state { open_count = Count, - open_pending = Pending, - elders = Elders }) -> - Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - case maybe_reduce( - ensure_mref(Pid, State #fhc_state { open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [{open, From} | Pending], - elders = dict:erase(Pid, Elders1) }} - end; - {false, State1} -> - {reply, ok, State1} - end. +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; - -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + State = #fhc_state { clients = Clients }) -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; + +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages - {noreply, ensure_mref(Pid, State #fhc_state { elders = Elders1 })}; + {noreply, State #fhc_state { elders = Elders1 }}; -handle_cast({close, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders, open_count = Count }) -> +handle_cast({close, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, process_pending( - ensure_mref(Pid, State #fhc_state { open_count = Count - 1, - elders = Elders1 }))}; + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; -handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. +handle_cast({transfer, FromPid, ToPid}, State) -> + ok = track_client(ToPid, State#fhc_state.clients), + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, State)))}; -handle_info({'DOWN', MRef, process, Pid, _Reason}, State = - #fhc_state { obtain_count = Count, callbacks = Callbacks, - client_mrefs = ClientMRefs, elders = Elders }) -> +handle_cast(check_counts, State) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + clients = Clients }) -> + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, process_pending( - case dict:find(Pid, ClientMRefs) of - {ok, MRef} -> State #fhc_state { - elders = dict:erase(Pid, Elders), - client_mrefs = dict:erase(Pid, ClientMRefs), - callbacks = dict:erase(Pid, Callbacks) }; - _ -> State #fhc_state { obtain_count = Count - 1 } - end)}. - -terminate(_Reason, State) -> + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) })}. + +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- +%% pending queue abstraction helpers +%%---------------------------------------------------------------------------- + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +filter_pending(Fun, {Count, Queue}) -> + {Delta, Queue1} = + queue_fold(fun (Item, {DeltaN, QueueN}) -> + case Fun(Item) of + true -> {DeltaN, queue:in(Item, QueueN)}; + false -> {DeltaN - requested(Item), QueueN} + end + end, {0, queue:new()}, Queue), + {Count + Delta, Queue1}. + +pending_new() -> + {0, queue:new()}. + +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. + +pending_out({0, _Queue} = Pending) -> + {empty, Pending}; +pending_out({N, Queue}) -> + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. + +pending_count({Count, _Queue}) -> + Count. + +pending_is_empty({0, _Queue}) -> + true; +pending_is_empty({_N, _Queue}) -> + false. + +%%---------------------------------------------------------------------------- %% server helpers %%---------------------------------------------------------------------------- +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of + OLimit when OLimit < 0 -> 0; + OLimit -> OLimit + end. + +requested({_Kind, _Pid, Requested, _From}) -> + Requested. + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc} = - process_pending(Pending, Limit - (ObtainCount + OpenCount)), - State #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + {Pending1, State1} = + process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -853,70 +1028,139 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc} = process_pending(Pending, Quota), - State #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. - -process_pending([], _Quota) -> - {[], 0}; -process_pending(Pending, Quota) when Quota =< 0 -> - {Pending, 0}; -process_pending(Pending, Quota) -> - PendingLen = length(Pending), - SatisfiableLen = lists:min([PendingLen, Quota]), - Take = PendingLen - SatisfiableLen, - {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - [run_pending_item(Item) || Item <- SatisfiableRev], - {PendingNew, SatisfiableLen}. - -run_pending_item({open, From}) -> - gen_server:reply(From, ok); -run_pending_item({obtain, Pid, From}) -> - _MRef = erlang:monitor(process, Pid), - gen_server:reply(From, ok). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. + +process_pending(Pending, Quota, State) when Quota =< 0 -> + {Pending, State}; +process_pending(Pending, Quota, State) -> + case pending_out(Pending) of + {empty, _Pending} -> + {Pending, State}; + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) + end. + +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, + State = #fhc_state { clients = Clients }) -> + gen_server:reply(From, ok), + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, Requested, State). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { open_count = OpenCount, + obtain_count = ObtainCount, + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. + +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (not pending_is_empty(OpenPending)) + orelse (ObtainCount < ObtainLimit + andalso not pending_is_empty(ObtainPending))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of + true -> Accs; + false -> {[CState | CStatesAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), - case Pids of + case CStates of [] -> ok; - _ -> AverageAge = Sum / ClientCount, - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + pending_count(OpenPending) + + pending_count(ObtainPending)) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). + +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -924,7 +1168,7 @@ maybe_reduce(State) -> ulimit() -> case os:type() of {win32, _OsName} -> - ?FILE_HANDLES_LIMIT_WINDOWS - ?RESERVED_FOR_OTHERS; + ?FILE_HANDLES_LIMIT_WINDOWS; {unix, _OsName} -> %% Under Linux, Solaris and FreeBSD, ulimit is a shell %% builtin, not a command. In OS X, it's a command. @@ -934,24 +1178,14 @@ ulimit() -> "unlimited" -> infinity; String = [C|_] when $0 =< C andalso C =< $9 -> - Num = list_to_integer( - lists:takewhile( - fun (D) -> $0 =< D andalso D =< $9 end, String)) - - ?RESERVED_FOR_OTHERS, - lists:max([1, Num]); + list_to_integer( + lists:takewhile( + fun (D) -> $0 =< D andalso D =< $9 end, String)); _ -> %% probably a variant of %% "/bin/sh: line 1: ulimit: command not found\n" - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS + unknown end; _ -> - ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS - end. - -ensure_mref(Pid, State = #fhc_state { client_mrefs = ClientMRefs }) -> - case dict:find(Pid, ClientMRefs) of - {ok, _MRef} -> State; - error -> MRef = erlang:monitor(process, Pid), - State #fhc_state { - client_mrefs = dict:store(Pid, MRef, ClientMRefs) } + unknown end. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index f1c8eb4d..e2bb940f 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,10 +16,12 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3 -%% allow callers to attach priorities to requests. Requests with -%% higher priorities are processed before requests with lower -%% priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/3, +%% prioritise_cast/2 and prioritise_info/2. These functions take +%% Message, From and State or just Message and State and return a +%% single integer representing the priority attached to the message. +%% Messages with higher priorities are processed before requests with +%% lower priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -82,13 +84,13 @@ %%% %%% The idea behind THIS server is that the user module %%% provides (different) functions to handle different -%%% kind of inputs. +%%% kind of inputs. %%% If the Parent process terminates the Module:terminate/2 %%% function is called. %%% %%% The user module should export: %%% -%%% init(Args) +%%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} %%% {ok, State, Timeout, Backoff} @@ -101,21 +103,21 @@ %%% {reply, Reply, State, Timeout} %%% {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, Reply, State} +%%% {stop, Reason, Reply, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_cast(Msg, State) %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term terminate(State) is called %%% %%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ... %%% %%% ==> {noreply, State} %%% {noreply, State, Timeout} -%%% {stop, Reason, State} +%%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% terminate(Reason, State) Let the user module clean up @@ -159,37 +161,41 @@ %% API -export([start/3, start/4, - start_link/3, start_link/4, - call/2, call/3, pcall/3, pcall/4, - cast/2, pcast/3, reply/2, - abcast/2, abcast/3, - multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). + start_link/3, start_link/4, + call/2, call/3, + cast/2, reply/2, + abcast/2, abcast/3, + multi_call/2, multi_call/3, multi_call/4, + enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). -export([behaviour_info/1]). %% System exports -export([system_continue/3, - system_terminate/4, - system_code_change/4, - format_status/2]). + system_terminate/4, + system_code_change/4, + format_status/2]). %% Internal exports -export([init_it/6, print_event/3]). -import(error_logger, [format/2]). +%% State record +-record(gs2_state, {parent, name, state, mod, time, + timeout_state, queue, debug, prioritise_call, + prioritise_cast, prioritise_info}). + %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings %%%========================================================================= -ifdef(use_specs). --spec(handle_common_termination/6 :: - (any(), any(), any(), atom(), any(), any()) -> no_return()). +-spec(handle_common_termination/3 :: + (any(), atom(), #gs2_state{}) -> no_return()). --spec(hibernate/7 :: - (pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()). +-spec(hibernate/1 :: (#gs2_state{}) -> no_return()). -endif. @@ -238,37 +244,21 @@ start_link(Name, Mod, Args, Options) -> %% be monitored. %% If the client is trapping exits and is linked server termination %% is handled here (? Shall we do that here (or rely on timeouts) ?). -%% ----------------------------------------------------------------- +%% ----------------------------------------------------------------- call(Name, Request) -> case catch gen:call(Name, '$gen_call', Request) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request]}}) end. call(Name, Request, Timeout) -> case catch gen:call(Name, '$gen_call', Request, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) - end. - -pcall(Name, Priority, Request) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}}) - end. - -pcall(Name, Priority, Request, Timeout) -> - case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of - {ok,Res} -> - Res; - {'EXIT',Reason} -> - exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}}) + {ok,Res} -> + Res; + {'EXIT',Reason} -> + exit({Reason, {?MODULE, call, [Name, Request, Timeout]}}) end. %% ----------------------------------------------------------------- @@ -277,34 +267,18 @@ pcall(Name, Priority, Request, Timeout) -> cast({global,Name}, Request) -> catch global:send(Name, cast_msg(Request)), ok; -cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> +cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) -> do_cast(Dest, Request); cast(Dest, Request) when is_atom(Dest) -> do_cast(Dest, Request); cast(Dest, Request) when is_pid(Dest) -> do_cast(Dest, Request). -do_cast(Dest, Request) -> +do_cast(Dest, Request) -> do_send(Dest, cast_msg(Request)), ok. - -cast_msg(Request) -> {'$gen_cast',Request}. -pcast({global,Name}, Priority, Request) -> - catch global:send(Name, cast_msg(Priority, Request)), - ok; -pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_atom(Dest) -> - do_cast(Dest, Priority, Request); -pcast(Dest, Priority, Request) when is_pid(Dest) -> - do_cast(Dest, Priority, Request). - -do_cast(Dest, Priority, Request) -> - do_send(Dest, cast_msg(Priority, Request)), - ok. - -cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. +cast_msg(Request) -> {'$gen_cast',Request}. %% ----------------------------------------------------------------- %% Send a reply to the client. @@ -312,9 +286,9 @@ cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}. reply({To, Tag}, Reply) -> catch To ! {Tag, Reply}. -%% ----------------------------------------------------------------- -%% Asyncronous broadcast, returns nothing, it's just send'n prey -%%----------------------------------------------------------------- +%% ----------------------------------------------------------------- +%% Asyncronous broadcast, returns nothing, it's just send'n pray +%% ----------------------------------------------------------------- abcast(Name, Request) when is_atom(Name) -> do_abcast([node() | nodes()], Name, cast_msg(Request)). @@ -330,36 +304,36 @@ do_abcast([], _,_) -> abcast. %%% Make a call to servers at several nodes. %%% Returns: {[Replies],[BadNodes]} %%% A Timeout can be given -%%% +%%% %%% A middleman process is used in case late answers arrives after %%% the timeout. If they would be allowed to glog the callers message -%%% queue, it would probably become confused. Late answers will +%%% queue, it would probably become confused. Late answers will %%% now arrive to the terminated middleman and so be discarded. %%% ----------------------------------------------------------------- multi_call(Name, Req) when is_atom(Name) -> do_multi_call([node() | nodes()], Name, Req, infinity). -multi_call(Nodes, Name, Req) +multi_call(Nodes, Name, Req) when is_list(Nodes), is_atom(Name) -> do_multi_call(Nodes, Name, Req, infinity). multi_call(Nodes, Name, Req, infinity) -> do_multi_call(Nodes, Name, Req, infinity); -multi_call(Nodes, Name, Req, Timeout) +multi_call(Nodes, Name, Req, Timeout) when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 -> do_multi_call(Nodes, Name, Req, Timeout). %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ -%% -%% Description: Makes an existing process into a gen_server. -%% The calling process will enter the gen_server receive +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ +%% +%% Description: Makes an existing process into a gen_server. +%% The calling process will enter the gen_server receive %% loop and become a gen_server process. -%% The process *must* have been started using one of the -%% start functions in proc_lib, see proc_lib(3). -%% The user is responsible for any initialization of the +%% The process *must* have been started using one of the +%% start functions in proc_lib, see proc_lib(3). +%% The user is responsible for any initialization of the %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> @@ -386,7 +360,10 @@ enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Debug = debug_options(Name, Options), Queue = priority_queue:new(), Backoff1 = extend_backoff(Backoff), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). + loop(find_prioritisers( + #gs2_state { parent = Parent, name = Name, state = State, + mod = Mod, time = Timeout, timeout_state = Backoff1, + queue = Queue, debug = Debug })). %%%======================================================================== %%% Gen-callback functions @@ -405,39 +382,51 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), + GS2State = find_prioritisers( + #gs2_state { parent = Parent, + name = Name, + mod = Mod, + queue = Queue, + debug = Debug }), case catch Mod:init(Args) of - {ok, State} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); - {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); - {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + {ok, State} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = infinity, + timeout_state = undefined }); + {ok, State, Timeout} -> + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = undefined }); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> Backoff1 = extend_backoff(Backoff), - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); - {stop, Reason} -> - %% For consistency, we must make sure that the - %% registered name (if any) is unregistered before - %% the parent process is notified about the failure. - %% (Otherwise, the parent process could get - %% an 'already_started' error if it immediately - %% tried starting the process again.) - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - ignore -> - unregister_name(Name0), - proc_lib:init_ack(Starter, ignore), - exit(normal); - {'EXIT', Reason} -> - unregister_name(Name0), - proc_lib:init_ack(Starter, {error, Reason}), - exit(Reason); - Else -> - Error = {bad_return_value, Else}, - proc_lib:init_ack(Starter, {error, Error}), - exit(Error) + proc_lib:init_ack(Starter, {ok, self()}), + loop(GS2State #gs2_state { state = State, + time = Timeout, + timeout_state = Backoff1 }); + {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + ignore -> + unregister_name(Name0), + proc_lib:init_ack(Starter, ignore), + exit(normal); + {'EXIT', Reason} -> + unregister_name(Name0), + proc_lib:init_ack(Starter, {error, Reason}), + exit(Reason); + Else -> + Error = {bad_return_value, Else}, + proc_lib:init_ack(Starter, {error, Error}), + exit(Error) end. name({local,Name}) -> Name; @@ -467,23 +456,24 @@ extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> - pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); -loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - drain(Queue), Debug). +loop(GS2State = #gs2_state { time = hibernate, + timeout_state = undefined }) -> + pre_hibernate(GS2State); +loop(GS2State) -> + process_next_msg(drain(GS2State)). -drain(Queue) -> +drain(GS2State) -> receive - Input -> drain(in(Input, Queue)) - after 0 -> Queue + Input -> drain(in(Input, GS2State)) + after 0 -> GS2State end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> +process_next_msg(GS2State = #gs2_state { time = Time, + timeout_state = TimeoutState, + queue = Queue }) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Msg); + process_msg(Msg, GS2State #gs2_state { queue = Queue1 }); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of @@ -504,68 +494,64 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> Input -> %% Time could be 'hibernate' here, so *don't* call loop process_next_msg( - Parent, Name, State, Mod, Time, TimeoutState, - drain(in(Input, Queue1)), Debug) + drain(in(Input, GS2State #gs2_state { queue = Queue1 }))) after Time1 -> case HibOnTimeout of true -> pre_hibernate( - Parent, Name, State, Mod, TimeoutState, Queue1, - Debug); + GS2State #gs2_state { queue = Queue1 }); false -> - process_msg( - Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, timeout) + process_msg(timeout, + GS2State #gs2_state { queue = Queue1 }) end end end. -wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> +wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> TimeoutState1 = case TS of undefined -> undefined; {SleptAt, TimeoutState} -> adjust_timeout_state(SleptAt, now(), TimeoutState) end, - post_hibernate(Parent, Name, State, Mod, TimeoutState1, - drain(Queue), Debug). + post_hibernate( + drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). -hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of undefined -> undefined; {backoff, _, _, _, _} -> {now(), TimeoutState} end, - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, - TS, Queue, Debug]). + proc_lib:hibernate(?MODULE, wake_hib, + [GS2State #gs2_state { timeout_state = TS }]). -pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +pre_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_pre_hibernate, 1) of true -> case catch Mod:handle_pre_hibernate(State) of {hibernate, NState} -> - hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, - Debug); + hibernate(GS2State #gs2_state { state = NState } ); Reply -> - handle_common_termination(Reply, Name, pre_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, pre_hibernate, GS2State) end; false -> - hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + hibernate(GS2State) end. -post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> +post_hibernate(GS2State = #gs2_state { state = State, + mod = Mod }) -> case erlang:function_exported(Mod, handle_post_hibernate, 1) of true -> case catch Mod:handle_post_hibernate(State) of {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = infinity }); {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, - TimeoutState, Queue, Debug); + process_next_msg(GS2State #gs2_state { state = NState, + time = Time }); Reply -> - handle_common_termination(Reply, Name, post_hibernate, - Mod, State, Debug) + handle_common_termination(Reply, post_hibernate, GS2State) end; false -> %% use hibernate here, not infinity. This matches @@ -574,8 +560,7 @@ post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> %% still set to hibernate, iff that msg is the very msg %% that woke us up (or the first msg we receive after %% waking up). - process_next_msg(Parent, Name, State, Mod, hibernate, - TimeoutState, Queue, Debug) + process_next_msg(GS2State #gs2_state { time = hibernate }) end. adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, @@ -596,32 +581,40 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, CurrentTO1 = Base + Extra, {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. -in({'$gen_pcast', {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_cast', Msg}, Priority, Queue); -in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> - priority_queue:in({'$gen_call', From, Msg}, Priority, Queue); -in(Input, Queue) -> - priority_queue:in(Input, Queue). - -process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Msg) -> +in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_cast', Msg}, + PC(Msg, GS2State), Queue) }; +in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC, + queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + {'$gen_call', From, Msg}, + PC(Msg, From, GS2State), Queue) }; +in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) -> + GS2State #gs2_state { queue = priority_queue:in( + Input, PI(Input, GS2State), Queue) }. + +process_msg(Msg, + GS2State = #gs2_state { parent = Parent, + name = Name, + debug = Debug }) -> case Msg of - {system, From, Req} -> - sys:handle_system_msg( + {system, From, Req} -> + sys:handle_system_msg( Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, TimeoutState, Queue]); + GS2State); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. - {'EXIT', Parent, Reason} -> - terminate(Reason, Name, Msg, Mod, State, Debug); - _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); - _Msg -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, - Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, - Debug1) + {'EXIT', Parent, Reason} -> + terminate(Reason, Msg, GS2State); + _Msg when Debug =:= [] -> + handle_msg(Msg, GS2State); + _Msg -> + Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, + Name, {in, Msg}), + handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }) end. %%% --------------------------------------------------- @@ -638,35 +631,35 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Tag = make_ref(), Caller = self(), Receiver = - spawn( - fun () -> - %% Middleman process. Should be unsensitive to regular - %% exit signals. The sychronization is needed in case - %% the receiver would exit before the caller started - %% the monitor. - process_flag(trap_exit, true), - Mref = erlang:monitor(process, Caller), - receive - {Caller,Tag} -> - Monitors = send_nodes(Nodes, Name, Tag, Req), - TimerId = erlang:start_timer(Timeout, self(), ok), - Result = rec_nodes(Tag, Monitors, Name, TimerId), - exit({self(),Tag,Result}); - {'DOWN',Mref,_,_,_} -> - %% Caller died before sending us the go-ahead. - %% Give up silently. - exit(normal) - end - end), + spawn( + fun () -> + %% Middleman process. Should be unsensitive to regular + %% exit signals. The sychronization is needed in case + %% the receiver would exit before the caller started + %% the monitor. + process_flag(trap_exit, true), + Mref = erlang:monitor(process, Caller), + receive + {Caller,Tag} -> + Monitors = send_nodes(Nodes, Name, Tag, Req), + TimerId = erlang:start_timer(Timeout, self(), ok), + Result = rec_nodes(Tag, Monitors, Name, TimerId), + exit({self(),Tag,Result}); + {'DOWN',Mref,_,_,_} -> + %% Caller died before sending us the go-ahead. + %% Give up silently. + exit(normal) + end + end), Mref = erlang:monitor(process, Receiver), Receiver ! {self(),Tag}, receive - {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> - Result; - {'DOWN',Mref,_,_,Reason} -> - %% The middleman code failed. Or someone did - %% exit(_, kill) on the middleman process => Reason==killed - exit(Reason) + {'DOWN',Mref,_,_,{Receiver,Tag,Result}} -> + Result; + {'DOWN',Mref,_,_,Reason} -> + %% The middleman code failed. Or someone did + %% exit(_, kill) on the middleman process => Reason==killed + exit(Reason) end. send_nodes(Nodes, Name, Tag, Req) -> @@ -681,7 +674,7 @@ send_nodes([Node|Tail], Name, Tag, Req, Monitors) send_nodes([_Node|Tail], Name, Tag, Req, Monitors) -> %% Skip non-atom Node send_nodes(Tail, Name, Tag, Req, Monitors); -send_nodes([], _Name, _Tag, _Req, Monitors) -> +send_nodes([], _Name, _Tag, _Req, Monitors) -> Monitors. %% Against old nodes: @@ -691,89 +684,89 @@ send_nodes([], _Name, _Tag, _Req, Monitors) -> %% Against contemporary nodes: %% Wait for reply, server 'DOWN', or timeout from TimerId. -rec_nodes(Tag, Nodes, Name, TimerId) -> +rec_nodes(Tag, Nodes, Name, TimerId) -> rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId). rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], Time, TimerId); - {timeout, TimerId, _} -> - unmonitor(R), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + {'DOWN', R, _, _, _} -> + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], Time, TimerId); + {timeout, TimerId, _} -> + unmonitor(R), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, Badnodes, - [{N,Reply}|Replies], 2000, TimerId); - {timeout, TimerId, _} -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - %% Collect all replies that already have arrived - rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, Badnodes, + [{N,Reply}|Replies], 2000, TimerId); + {timeout, TimerId, _} -> + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + %% Collect all replies that already have arrived + rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies) after Time -> - case rpc:call(N, erlang, whereis, [Name]) of - Pid when is_pid(Pid) -> % It exists try again. - rec_nodes(Tag, [N|Tail], Name, Badnodes, - Replies, infinity, TimerId); - _ -> % badnode - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes(Tag, Tail, Name, [N|Badnodes], - Replies, 2000, TimerId) - end + case rpc:call(N, erlang, whereis, [Name]) of + Pid when is_pid(Pid) -> % It exists try again. + rec_nodes(Tag, [N|Tail], Name, Badnodes, + Replies, infinity, TimerId); + _ -> % badnode + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes(Tag, Tail, Name, [N|Badnodes], + Replies, 2000, TimerId) + end end; rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) -> case catch erlang:cancel_timer(TimerId) of - false -> % It has already sent it's message - receive - {timeout, TimerId, _} -> ok - after 0 -> - ok - end; - _ -> % Timer was cancelled, or TimerId was 'undefined' - ok + false -> % It has already sent it's message + receive + {timeout, TimerId, _} -> ok + after 0 -> + ok + end; + _ -> % Timer was cancelled, or TimerId was 'undefined' + ok end, {Replies, Badnodes}. %% Collect all replies that already have arrived rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) -> receive - {'DOWN', R, _, _, _} -> - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {'DOWN', R, _, _, _} -> + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - unmonitor(R), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + unmonitor(R), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) -> %% R6 node receive - {nodedown, N} -> - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); - {{Tag, N}, Reply} -> %% Tag is bound !!! - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) + {nodedown, N} -> + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies); + {{Tag, N}, Reply} -> %% Tag is bound !!! + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies]) after 0 -> - receive {nodedown, N} -> ok after 0 -> ok end, - monitor_node(N, false), - rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) + receive {nodedown, N} -> ok after 0 -> ok end, + monitor_node(N, false), + rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies) end; rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> {Replies, Badnodes}. @@ -785,28 +778,28 @@ rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) -> start_monitor(Node, Name) when is_atom(Node), is_atom(Name) -> if node() =:= nonode@nohost, Node =/= nonode@nohost -> - Ref = make_ref(), - self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, - {Node, Ref}; + Ref = make_ref(), + self() ! {'DOWN', Ref, process, {Name, Node}, noconnection}, + {Node, Ref}; true -> - case catch erlang:monitor(process, {Name, Node}) of - {'EXIT', _} -> - %% Remote node is R6 - monitor_node(Node, true), - Node; - Ref when is_reference(Ref) -> - {Node, Ref} - end + case catch erlang:monitor(process, {Name, Node}) of + {'EXIT', _} -> + %% Remote node is R6 + monitor_node(Node, true), + Node; + Ref when is_reference(Ref) -> + {Node, Ref} + end end. %% Cancels a monitor started with Ref=erlang:monitor(_, _). unmonitor(Ref) when is_reference(Ref) -> erlang:demonitor(Ref), receive - {'DOWN', Ref, _, _, _} -> - true + {'DOWN', Ref, _, _, _} -> + true after 0 -> - true + true end. %%% --------------------------------------------------- @@ -818,130 +811,114 @@ dispatch({'$gen_cast', Msg}, Mod, State) -> dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue) -> +common_reply(_Name, From, Reply, _NState, [] = _Debug) -> + reply(From, Reply), + []; +common_reply(Name, From, Reply, NState, Debug) -> + reply(Name, From, Reply, NState, Debug). + +common_debug([] = _Debug, _Func, _Info, _Event) -> + []; +common_debug(Debug, Func, Info, Event) -> + sys:handle_debug(Debug, Func, Info, Event). + +handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod, + state = State, + name = Name, + debug = Debug }) -> case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {reply, Reply, NState, Time1} -> - reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, [])), - reply(From, Reply), - exit(R); - Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) + {reply, Reply, NState} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {reply, Reply, NState, Time1} -> + Debug1 = common_reply(Name, From, Reply, NState, Debug), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1}); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = infinity, + debug = Debug1}); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state {state = NState, + time = Time1, + debug = Debug1}); + {stop, Reason, Reply, NState} -> + {'EXIT', R} = + (catch terminate(Reason, Msg, + GS2State #gs2_state { state = NState })), + reply(Name, From, Reply, NState, Debug), + exit(R); + Other -> + handle_common_reply(Other, Msg, GS2State) end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue) -> +handle_msg(Msg, GS2State = #gs2_state { mod = Mod, state = State }) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue). + handle_common_reply(Reply, Msg, GS2State). -handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - case catch Mod:handle_call(Msg, From, State) of - {reply, Reply, NState} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {reply, Reply, NState, Time1} -> - Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); - {stop, Reason, Reply, NState} -> - {'EXIT', R} = - (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), - reply(Name, From, Reply, NState, Debug), - exit(R); - Other -> - handle_common_reply(Other, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug) - end; -handle_msg(Msg, - Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue, Debug). - -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, - TimeoutState, Queue) -> +handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, + debug = Debug}) -> case Reply of - {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); - {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + {noreply, NState} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = infinity, + debug = Debug1 }); + {noreply, NState, Time1} -> + Debug1 = common_debug(Debug, {?MODULE, print_event}, Name, + {noreply, NState}), + loop(GS2State #gs2_state { state = NState, + time = Time1, + debug = Debug1 }); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, []) + handle_common_termination(Reply, Msg, GS2State) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, - Debug) -> +handle_common_termination(Reply, Msg, GS2State) -> case Reply of - {noreply, NState} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, - Debug1); - {noreply, NState, Time1} -> - Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + {stop, Reason, NState} -> + terminate(Reason, Msg, GS2State #gs2_state { state = NState }); + {'EXIT', What} -> + terminate(What, Msg, GS2State); _ -> - handle_common_termination(Reply, Name, Msg, Mod, State, Debug) - end. - -handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> - case Reply of - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, Debug); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, Debug); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug) + terminate({bad_return_value, Reply}, Msg, GS2State) end. reply(Name, {To, Tag}, Reply, State, Debug) -> reply({To, Tag}, Reply), - sys:handle_debug(Debug, {?MODULE, print_event}, Name, - {out, Reply, To, State} ). + sys:handle_debug( + Debug, {?MODULE, print_event}, Name, {out, Reply, To, State}). %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> - loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). +system_continue(Parent, Debug, GS2State) -> + loop(GS2State #gs2_state { parent = Parent, debug = Debug }). -ifdef(use_specs). -spec system_terminate(_, _, _, [_]) -> no_return(). -endif. -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, - _TimeoutState, _Queue]) -> - terminate(Reason, Name, [], Mod, State, Debug). +system_terminate(Reason, _Parent, Debug, GS2State) -> + terminate(Reason, [], GS2State #gs2_state { debug = Debug }). -system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, - OldVsn, Extra) -> +system_code_change(GS2State = #gs2_state { mod = Mod, + state = State }, + _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> - {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; - Else -> + {ok, NewState} -> + NewGS2State = find_prioritisers( + GS2State #gs2_state { state = NewState }), + {ok, [NewGS2State]}; + Else -> Else end. @@ -951,18 +928,18 @@ system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, %%----------------------------------------------------------------- print_event(Dev, {in, Msg}, Name) -> case Msg of - {'$gen_call', {From, _Tag}, Call} -> - io:format(Dev, "*DBG* ~p got call ~p from ~w~n", - [Name, Call, From]); - {'$gen_cast', Cast} -> - io:format(Dev, "*DBG* ~p got cast ~p~n", - [Name, Cast]); - _ -> - io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) + {'$gen_call', {From, _Tag}, Call} -> + io:format(Dev, "*DBG* ~p got call ~p from ~w~n", + [Name, Call, From]); + {'$gen_cast', Cast} -> + io:format(Dev, "*DBG* ~p got cast ~p~n", + [Name, Cast]); + _ -> + io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg]) end; print_event(Dev, {out, Msg, To, State}, Name) -> - io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", - [Name, Msg, To, State]); + io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n", + [Name, Msg, To, State]); print_event(Dev, {noreply, State}, Name) -> io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]); print_event(Dev, Event, Name) -> @@ -973,56 +950,61 @@ print_event(Dev, Event, Name) -> %%% Terminate the server. %%% --------------------------------------------------- -terminate(Reason, Name, Msg, Mod, State, Debug) -> +terminate(Reason, Msg, #gs2_state { name = Name, + mod = Mod, + state = State, + debug = Debug }) -> case catch Mod:terminate(Reason, State) of - {'EXIT', R} -> - error_info(R, Name, Msg, State, Debug), - exit(R); - _ -> - case Reason of - normal -> - exit(normal); - shutdown -> - exit(shutdown); - {shutdown,_}=Shutdown -> - exit(Shutdown); - _ -> - error_info(Reason, Name, Msg, State, Debug), - exit(Reason) - end + {'EXIT', R} -> + error_info(R, Reason, Name, Msg, State, Debug), + exit(R); + _ -> + case Reason of + normal -> + exit(normal); + shutdown -> + exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); + _ -> + error_info(Reason, undefined, Name, Msg, State, Debug), + exit(Reason) + end end. -error_info(_Reason, application_controller, _Msg, _State, _Debug) -> +error_info(_Reason, _RootCause, application_controller, _Msg, _State, _Debug) -> %% OTP-5811 Don't send an error report if it's the system process %% application_controller which is terminating - let init take care %% of it instead ok; -error_info(Reason, Name, Msg, State, Debug) -> - Reason1 = - case Reason of - {undef,[{M,F,A}|MFAs]} -> - case code:is_loaded(M) of - false -> - {'module could not be loaded',[{M,F,A}|MFAs]}; - _ -> - case erlang:function_exported(M, F, length(A)) of - true -> - Reason; - false -> - {'function not exported',[{M,F,A}|MFAs]} - end - end; - _ -> - Reason - end, - format("** Generic server ~p terminating \n" - "** Last message in was ~p~n" - "** When Server state == ~p~n" - "** Reason for termination == ~n** ~p~n", - [Name, Msg, State, Reason1]), +error_info(Reason, RootCause, Name, Msg, State, Debug) -> + Reason1 = error_reason(Reason), + Fmt = + "** Generic server ~p terminating~n" + "** Last message in was ~p~n" + "** When Server state == ~p~n" + "** Reason for termination == ~n** ~p~n", + case RootCause of + undefined -> format(Fmt, [Name, Msg, State, Reason1]); + _ -> format(Fmt ++ "** In 'terminate' callback " + "with reason ==~n** ~p~n", + [Name, Msg, State, Reason1, + error_reason(RootCause)]) + end, sys:print_log(Debug), ok. +error_reason({undef,[{M,F,A}|MFAs]} = Reason) -> + case code:is_loaded(M) of + false -> {'module could not be loaded',[{M,F,A}|MFAs]}; + _ -> case erlang:function_exported(M, F, length(A)) of + true -> Reason; + false -> {'function not exported',[{M,F,A}|MFAs]} + end + end; +error_reason(Reason) -> + Reason. + %%% --------------------------------------------------- %%% Misc. functions. %%% --------------------------------------------------- @@ -1036,74 +1018,109 @@ opt(_, []) -> debug_options(Name, Opts) -> case opt(debug, Opts) of - {ok, Options} -> dbg_options(Name, Options); - _ -> dbg_options(Name, []) + {ok, Options} -> dbg_options(Name, Options); + _ -> dbg_options(Name, []) end. dbg_options(Name, []) -> - Opts = - case init:get_argument(generic_debug) of - error -> - []; - _ -> - [log, statistics] - end, + Opts = + case init:get_argument(generic_debug) of + error -> + []; + _ -> + [log, statistics] + end, dbg_opts(Name, Opts); dbg_options(Name, Opts) -> dbg_opts(Name, Opts). dbg_opts(Name, Opts) -> case catch sys:debug_options(Opts) of - {'EXIT',_} -> - format("~p: ignoring erroneous debug options - ~p~n", - [Name, Opts]), - []; - Dbg -> - Dbg + {'EXIT',_} -> + format("~p: ignoring erroneous debug options - ~p~n", + [Name, Opts]), + []; + Dbg -> + Dbg end. get_proc_name(Pid) when is_pid(Pid) -> Pid; get_proc_name({local, Name}) -> case process_info(self(), registered_name) of - {registered_name, Name} -> - Name; - {registered_name, _Name} -> - exit(process_not_registered); - [] -> - exit(process_not_registered) - end; + {registered_name, Name} -> + Name; + {registered_name, _Name} -> + exit(process_not_registered); + [] -> + exit(process_not_registered) + end; get_proc_name({global, Name}) -> case global:safe_whereis_name(Name) of - undefined -> - exit(process_not_registered_globally); - Pid when Pid =:= self() -> - Name; - _Pid -> - exit(process_not_registered_globally) + undefined -> + exit(process_not_registered_globally); + Pid when Pid =:= self() -> + Name; + _Pid -> + exit(process_not_registered_globally) end. get_parent() -> case get('$ancestors') of - [Parent | _] when is_pid(Parent)-> + [Parent | _] when is_pid(Parent)-> Parent; [Parent | _] when is_atom(Parent)-> name_to_pid(Parent); - _ -> - exit(process_was_not_started_by_proc_lib) + _ -> + exit(process_was_not_started_by_proc_lib) end. name_to_pid(Name) -> case whereis(Name) of - undefined -> - case global:safe_whereis_name(Name) of - undefined -> - exit(could_not_find_registerd_name); - Pid -> - Pid - end; - Pid -> - Pid + undefined -> + case global:safe_whereis_name(Name) of + undefined -> + exit(could_not_find_registerd_name); + Pid -> + Pid + end; + Pid -> + Pid + end. + +find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> + PrioriCall = function_exported_or_default( + Mod, 'prioritise_call', 3, + fun (_Msg, _From, _State) -> 0 end), + PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + fun (_Msg, _State) -> 0 end), + PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + fun (_Msg, _State) -> 0 end), + GS2State #gs2_state { prioritise_call = PrioriCall, + prioritise_cast = PrioriCast, + prioritise_info = PrioriInfo }. + +function_exported_or_default(Mod, Fun, Arity, Default) -> + case erlang:function_exported(Mod, Fun, Arity) of + true -> case Arity of + 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end; + 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> + case catch Mod:Fun(Msg, From, State) of + Res when is_integer(Res) -> + Res; + Err -> + handle_common_termination(Err, Msg, GS2State) + end + end + end; + false -> Default end. %%----------------------------------------------------------------- @@ -1113,25 +1130,23 @@ format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> - pid_to_list(Name); - is_atom(Name) -> - Name - end, + pid_to_list(Name); + is_atom(Name) -> + Name + end, Header = lists:concat(["Status for generic server ", NameTag]), Log = sys:get_debug(log, Debug, []), - Specfic = - case erlang:function_exported(Mod, format_status, 2) of - true -> - case catch Mod:format_status(Opt, [PDict, State]) of - {'EXIT', _} -> [{data, [{"State", State}]}]; - Else -> Else - end; - _ -> - [{data, [{"State", State}]}] - end, + Specfic = + case erlang:function_exported(Mod, format_status, 2) of + true -> case catch Mod:format_status(Opt, [PDict, State]) of + {'EXIT', _} -> [{data, [{"State", State}]}]; + Else -> Else + end; + _ -> [{data, [{"State", State}]}] + end, [{header, Header}, {data, [{"Status", SysState}, - {"Parent", Parent}, - {"Logged events", Log}, + {"Parent", Parent}, + {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | Specfic]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 41c628a0..8c36a9f0 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -83,12 +83,6 @@ {requires, external_infrastructure}, {enables, kernel_ready}]}). --rabbit_boot_step({rabbit_hooks, - [{description, "internal event notification system"}, - {mfa, {rabbit_hooks, start, []}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_event, [{description, "statistics event manager"}, {mfa, {rabbit_sup, start_restartable_child, @@ -211,8 +205,7 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(). + ok = ensure_working_log_handlers(). start() -> try @@ -496,11 +489,16 @@ maybe_insert_default_data() -> insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), + {ok, DefaultAdmin} = application:get_env(default_user_is_admin), {ok, DefaultVHost} = application:get_env(default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + case DefaultAdmin of + true -> rabbit_access_control:set_admin(DefaultUser); + _ -> ok + end, ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, DefaultConfigurePerm, DefaultWritePerm, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 8d00f591..73fd6f0e 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -35,11 +35,12 @@ -export([check_login/2, user_pass_login/2, check_vhost_access/2, check_resource_access/3]). --export([add_user/2, delete_user/1, change_password/2, list_users/0, - lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([add_user/2, delete_user/1, change_password/2, set_admin/1, + clear_admin/1, list_users/0, lookup_user/1]). +-export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, set_permissions/6, clear_permissions/2, - list_vhost_permissions/1, list_user_permissions/1]). + list_permissions/0, list_vhost_permissions/1, list_user_permissions/1, + list_user_vhost_permissions/2]). %%---------------------------------------------------------------------------- @@ -52,6 +53,7 @@ -type(password() :: binary()). -type(regexp() :: binary()). -type(scope() :: binary()). +-type(scope_atom() :: 'client' | 'all'). -spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user() | @@ -68,26 +70,33 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(set_admin/1 :: (username()) -> 'ok'). +-spec(clear_admin/1 :: (username()) -> 'ok'). -spec(list_users/0 :: () -> [username()]). -spec(lookup_user/1 :: (username()) -> rabbit_types:ok(rabbit_types:user()) | rabbit_types:error('not_found')). --spec(add_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). --spec(delete_vhost/1 :: - (rabbit_types:vhost()) -> 'ok'). +-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(vhost_exists/1 :: (rabbit_types:vhost()) -> boolean()). -spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]). -spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(set_permissions/6 ::(scope(), username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok'). +-spec(list_permissions/0 :: + () -> [{username(), rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) - -> [{username(), regexp(), regexp(), regexp()}]). + (rabbit_types:vhost()) -> [{username(), regexp(), regexp(), regexp(), + scope_atom()}]). -spec(list_user_permissions/1 :: - (username()) - -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]). + (username()) -> [{rabbit_types:vhost(), regexp(), regexp(), regexp(), + scope_atom()}]). +-spec(list_user_vhost_permissions/2 :: + (username(), rabbit_types:vhost()) -> [{regexp(), regexp(), regexp(), + scope_atom()}]). -endif. @@ -142,7 +151,7 @@ internal_lookup_vhost_access(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} @@ -160,7 +169,6 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -permission_index(scope) -> #permission.scope; permission_index(configure) -> #permission.configure; permission_index(write) -> #permission.write; permission_index(read) -> #permission.read. @@ -171,27 +179,29 @@ check_resource_access(Username, check_resource_access(Username, R#resource{name = <<"amq.default">>}, Permission); -check_resource_access(_Username, - #resource{name = <<"amq.gen",_/binary>>}, - #permission{scope = client}) -> - ok; check_resource_access(Username, R = #resource{virtual_host = VHostPath, name = Name}, Permission) -> Res = case mnesia:dirty_read({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) of [] -> false; [#user_permission{permission = P}] -> - PermRegexp = case element(permission_index(Permission), P) of - %% <<"^$">> breaks Emacs' erlang mode - <<"">> -> <<$^, $$>>; - RE -> RE - end, - case re:run(Name, PermRegexp, [{capture, none}]) of - match -> true; - nomatch -> false + case {Name, P} of + {<<"amq.gen",_/binary>>, #permission{scope = client}} -> + true; + _ -> + PermRegexp = + case element(permission_index(Permission), P) of + %% <<"^$">> breaks Emacs' erlang mode + <<"">> -> <<$^, $$>>; + RE -> RE + end, + case re:run(Name, PermRegexp, [{capture, none}]) of + match -> true; + nomatch -> false + end end end, if Res -> ok; @@ -207,7 +217,8 @@ add_user(Username, Password) -> [] -> ok = mnesia:write(rabbit_user, #user{username = Username, - password = Password}, + password = Password, + is_admin = false}, write); _ -> mnesia:abort({user_already_exists, Username}) @@ -237,20 +248,39 @@ delete_user(Username) -> R. change_password(Username, Password) -> - R = rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> - ok = mnesia:write(rabbit_user, - #user{username = Username, - password = Password}, - write) - end)), + R = update_user(Username, fun(User) -> + User#user{password = Password} + end), rabbit_log:info("Changed password for user ~p~n", [Username]), R. +set_admin(Username) -> + set_admin(Username, true). + +clear_admin(Username) -> + set_admin(Username, false). + +set_admin(Username, IsAdmin) -> + R = update_user(Username, fun(User) -> + User#user{is_admin = IsAdmin} + end), + rabbit_log:info("Set user admin flag for user ~p to ~p~n", + [Username, IsAdmin]), + R. + +update_user(Username, Fun) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + {ok, User} = lookup_user(Username), + ok = mnesia:write(rabbit_user, Fun(User), write) + end)). + list_users() -> - mnesia:dirty_all_keys(rabbit_user). + [{Username, IsAdmin} || + #user{username = Username, is_admin = IsAdmin} <- + mnesia:dirty_match_object(rabbit_user, #user{_ = '_'})]. lookup_user(Username) -> rabbit_misc:dirty_read({rabbit_user, Username}). @@ -300,7 +330,7 @@ delete_vhost(VHostPath) -> R. internal_delete_vhost(VHostPath) -> - lists:foreach(fun (#exchange{name=Name}) -> + lists:foreach(fun (#exchange{name = Name}) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), @@ -311,6 +341,9 @@ internal_delete_vhost(VHostPath) -> ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. +vhost_exists(VHostPath) -> + mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. + list_vhosts() -> mnesia:dirty_all_keys(rabbit_vhost). @@ -338,13 +371,13 @@ set_permissions(ScopeBin, Username, VHostPath, ConfigurePerm, WritePerm, ReadPer fun () -> ok = mnesia:write( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = #permission{ - scope = Scope, + scope = Scope, configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}}, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -355,10 +388,15 @@ clear_permissions(Username, VHostPath) -> Username, VHostPath, fun () -> ok = mnesia:delete({rabbit_user_permission, - #user_vhost{username = Username, + #user_vhost{username = Username, virtual_host = VHostPath}}) end)). +list_permissions() -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || + {Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(match_user_vhost('_', '_'))]. + list_vhost_permissions(VHostPath) -> [{Username, ConfigurePerm, WritePerm, ReadPerm, Scope} || {Username, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- @@ -371,15 +409,21 @@ list_user_permissions(Username) -> list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. +list_user_vhost_permissions(Username, VHostPath) -> + [{ConfigurePerm, WritePerm, ReadPerm, Scope} || + {_, _, ConfigurePerm, WritePerm, ReadPerm, Scope} <- + list_permissions(rabbit_misc:with_user_and_vhost( + Username, VHostPath, + match_user_vhost(Username, VHostPath)))]. + list_permissions(QueryThunk) -> [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm, Scope} || - #user_permission{user_vhost = #user_vhost{username = Username, + #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, - permission = #permission{ - scope = Scope, - configure = ConfigurePerm, - write = WritePerm, - read = ReadPerm}} <- + permission = #permission{ scope = Scope, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. @@ -387,7 +431,7 @@ match_user_vhost(Username, VHostPath) -> fun () -> mnesia:match_object( rabbit_user_permission, #user_permission{user_vhost = #user_vhost{ - username = Username, + username = Username, virtual_host = VHostPath}, permission = '_'}, read) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2453280e..3e677c38 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,7 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPE, long). +-define(EXPIRES_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -197,7 +197,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. + [Q || Q <- Qs, + gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == Q]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), @@ -248,11 +249,12 @@ start_queue_process(Q) -> Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> - Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], - fun (_X, _Q) -> ok end), - ok. + rabbit_binding:add(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = []}). lookup(Name) -> rabbit_misc:dirty_read({rabbit_queue, Name}). @@ -312,13 +314,13 @@ check_declare_arguments(QueueName, Args) -> check_expires_argument(undefined) -> ok; -check_expires_argument({?EXPIRES_TYPE, Expires}) - when is_integer(Expires) andalso Expires > 0 -> - ok; -check_expires_argument({?EXPIRES_TYPE, _Expires}) -> - {error, expires_zero_or_less}; -check_expires_argument(_) -> - {error, expires_not_of_type_long}. +check_expires_argument({Type, Expires}) when Expires > 0 -> + case lists:member(Type, ?EXPIRES_TYPES) of + true -> ok; + false -> {error, {expires_not_of_acceptable_type, Type, Expires}} + end; +check_expires_argument({_Type, _Expires}) -> + {error, expires_zero_or_less}. list(VHostPath) -> mnesia:dirty_match_object( @@ -330,10 +332,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, info, infinity). + delegate_call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -343,7 +345,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_pcall(QPid, 9, consumers, infinity). + delegate_call(QPid, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -355,7 +357,7 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> - delegate_pcast(QPid, 7, emit_stats). + delegate_cast(QPid, emit_stats). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -378,10 +380,10 @@ requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> - delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -417,10 +419,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - delegate_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_cast(QPid, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate_pcast(QPid, 7, {unblock, ChPid}). + delegate_cast(QPid, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_no_result( @@ -432,7 +434,7 @@ internal_delete1(QueueName) -> %% we want to execute some things, as %% decided by rabbit_exchange, after the %% transaction. - rabbit_exchange:delete_queue_bindings(QueueName). + rabbit_binding:remove_for_queue(QueueName). internal_delete(QueueName) -> case @@ -450,20 +452,19 @@ internal_delete(QueueName) -> end. maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:pcall(QPid, 6, {maybe_run_queue_via_backing_queue, Fun}, - infinity). + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). update_ram_duration(QPid) -> - gen_server2:pcast(QPid, 8, update_ram_duration). + gen_server2:cast(QPid, update_ram_duration). set_ram_duration_target(QPid, Duration) -> - gen_server2:pcast(QPid, 8, {set_ram_duration_target, Duration}). + gen_server2:cast(QPid, {set_ram_duration_target, Duration}). set_maximum_since_use(QPid, Age) -> - gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(QPid, {set_maximum_since_use, Age}). maybe_expire(QPid) -> - gen_server2:pcast(QPid, 8, maybe_expire). + gen_server2:cast(QPid, maybe_expire). on_node_down(Node) -> [Hook() || @@ -477,7 +478,7 @@ on_node_down(Node) -> ok. delete_queue(QueueName) -> - Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + Post = rabbit_binding:remove_transient_for_queue(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), Post. @@ -503,11 +504,6 @@ safe_delegate_call_ok(F, Pids) -> delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). -delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, - fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_pcast(Pid, Pri, Msg) -> - delegate:invoke_no_result(Pid, - fun (P) -> gen_server2:pcast(P, Pri, Msg) end). +delegate_cast(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5..91877efb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,8 @@ -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -import(queue). -import(erlang). @@ -146,8 +147,8 @@ code_change(_OldVsn, State, _Extra) -> init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of - {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); - undefined -> State + {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); + undefined -> State end. declare(Recover, From, @@ -163,10 +164,8 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - rabbit_event:notify( - queue_created, - [{Item, i(Item, State)} || - Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(queue_created, + infos(?CREATION_EVENT_KEYS, State)), noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -203,7 +202,7 @@ next_state(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = ensure_rate_timer(State), State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS)of + case BQ:needs_idle_timeout(BQS) of true -> {ensure_sync_timer(State2), 0}; false -> {stop_sync_timer(State2), hibernate} end. @@ -587,11 +586,33 @@ i(Item, _) -> throw({bad_argument, Item}). emit_stats(State) -> - rabbit_event:notify(queue_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(queue_stats, infos(?STATISTICS_KEYS, State)). %--------------------------------------------------------------------------- +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Fun} -> 6; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + _ -> 0 + end. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -603,6 +624,7 @@ handle_call({init, Recover}, From, declare(Recover, From, State); _ -> #q{q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined} = State, + gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( @@ -610,7 +632,7 @@ handle_call({init, Recover}, From, end, BQS = BQ:init(QName, IsDurable, Recover), %% Rely on terminate to delete the queue. - {stop, normal, not_found, State#q{backing_queue_state = BQS}} + {stop, normal, State#q{backing_queue_state = BQS}} end; handle_call(info, _From, State) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl new file mode 100644 index 00000000..19150fa9 --- /dev/null +++ b/src/rabbit_binding.erl @@ -0,0 +1,377 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_binding). +-include("rabbit.hrl"). + +-export([recover/0, exists/1, add/1, remove/1, add/2, remove/2, list/1]). +-export([list_for_exchange/1, list_for_queue/1, list_for_exchange_and_queue/2]). +-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]). +%% these must all be run inside a mnesia tx +-export([has_for_exchange/1, remove_for_exchange/1, + remove_for_queue/1, remove_transient_for_queue/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([key/0]). + +-type(key() :: binary()). + +-type(bind_errors() :: rabbit_types:error('queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found')). +-type(bind_res() :: 'ok' | bind_errors()). +-type(inner_fun() :: + fun((rabbit_types:exchange(), queue()) -> + rabbit_types:ok_or_error(rabbit_types:amqp_error()))). +-type(bindings() :: [rabbit_types:binding()]). + +-spec(recover/0 :: () -> [rabbit_types:binding()]). +-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). +-spec(add/1 :: (rabbit_types:binding()) -> bind_res()). +-spec(remove/1 :: (rabbit_types:binding()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(add/2 :: (rabbit_types:binding(), inner_fun()) -> bind_res()). +-spec(remove/2 :: (rabbit_types:binding(), inner_fun()) -> + bind_res() | rabbit_types:error('binding_not_found')). +-spec(list/1 :: (rabbit_types:vhost()) -> bindings()). +-spec(list_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(list_for_queue/1 :: (rabbit_amqqueue:name()) -> bindings()). +-spec(list_for_exchange_and_queue/2 :: + (rabbit_exchange:name(), rabbit_amqqueue:name()) -> bindings()). +-spec(info_keys/0 :: () -> [rabbit_types:info_key()]). +-spec(info/1 :: (rabbit_types:binding()) -> [rabbit_types:info()]). +-spec(info/2 :: (rabbit_types:binding(), [rabbit_types:info_key()]) -> + [rabbit_types:info()]). +-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]). +-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()]) + -> [[rabbit_types:info()]]). +-spec(has_for_exchange/1 :: (rabbit_exchange:name()) -> boolean()). +-spec(remove_for_exchange/1 :: (rabbit_exchange:name()) -> bindings()). +-spec(remove_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). +-spec(remove_transient_for_queue/1 :: + (rabbit_amqqueue:name()) -> fun (() -> any())). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(INFO_KEYS, [exchange_name, queue_name, routing_key, arguments]). + +recover() -> + rabbit_misc:table_fold( + fun (Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, Route, write), + ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route). + +exists(Binding) -> + binding_action( + Binding, + fun (_X, _Q, B) -> mnesia:read({rabbit_route, B}) /= [] end). + +add(Binding) -> add(Binding, fun (_X, _Q) -> ok end). + +remove(Binding) -> remove(Binding, fun (_X, _Q) -> ok end). + +add(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(X, Q) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:write/3), + {new, X, B}; + [_] -> {existing, X, B} + end; + {error, _} = E -> + E + end + end) of + {new, X = #exchange{ type = Type }, B} -> + ok = (type_to_module(Type)):add_binding(X, B), + rabbit_event:notify(binding_created, info(B)); + {existing, _, _} -> + ok; + {error, _} = Err -> + Err + end. + +remove(Binding, InnerFun) -> + case binding_action( + Binding, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + [_] -> case InnerFun(X, Q) of + ok -> + Durable = (X#exchange.durable andalso + Q#amqqueue.durable), + ok = sync_binding( + B, Durable, + fun mnesia:delete_object/3), + Deleted = + rabbit_exchange:maybe_auto_delete(X), + {{Deleted, X}, B}; + {error, _} = E -> + E + end + end + end) of + {error, _} = Err -> + Err; + {{IsDeleted, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> ok = Module:delete(X, [B]); + not_deleted -> ok = Module:remove_bindings(X, [B]) + end, + rabbit_event:notify(binding_deleted, info(B)), + ok + end. + +list(VHostPath) -> + Route = #route{binding = #binding{ + exchange_name = rabbit_misc:r(VHostPath, exchange), + queue_name = rabbit_misc:r(VHostPath, queue), + _ = '_'}, + _ = '_'}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_exchange(XName) -> + Route = #route{binding = #binding{exchange_name = XName, _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +list_for_queue(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, _ = '_'}}, + [reverse_binding(B) || #reverse_route{reverse_binding = B} <- + mnesia:dirty_match_object(rabbit_reverse_route, + reverse_route(Route))]. + +list_for_exchange_and_queue(XName, QueueName) -> + Route = #route{binding = #binding{exchange_name = XName, + queue_name = QueueName, + _ = '_'}}, + [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)]. + +info_keys() -> ?INFO_KEYS. + +map(VHostPath, F) -> + %% TODO: there is scope for optimisation here, e.g. using a + %% cursor, parallelising the function invocation + lists:map(F, list(VHostPath)). + +infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. + +i(exchange_name, #binding{exchange_name = XName}) -> XName; +i(queue_name, #binding{queue_name = QName}) -> QName; +i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; +i(arguments, #binding{args = Arguments}) -> Arguments; +i(Item, _) -> throw({bad_argument, Item}). + +info(B = #binding{}) -> infos(?INFO_KEYS, B). + +info(B = #binding{}, Items) -> infos(Items, B). + +info_all(VHostPath) -> map(VHostPath, fun (B) -> info(B) end). + +info_all(VHostPath, Items) -> map(VHostPath, fun (B) -> info(B, Items) end). + +has_for_exchange(XName) -> + Match = #route{binding = #binding{exchange_name = XName, _ = '_'}}, + %% we need to check for durable routes here too in case a bunch of + %% routes to durable queues have been removed temporarily as a + %% result of a node failure + contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match). + +remove_for_exchange(XName) -> + [begin + ok = mnesia:delete_object(rabbit_reverse_route, + reverse_route(Route), write), + ok = delete_forward_routes(Route), + Route#route.binding + end || Route <- mnesia:match_object( + rabbit_route, + #route{binding = #binding{exchange_name = XName, + _ = '_'}}, + write)]. + +remove_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_forward_routes/1). + +remove_transient_for_queue(QueueName) -> + remove_for_queue(QueueName, fun delete_transient_forward_routes/1). + +%%---------------------------------------------------------------------------- + +binding_action(Binding = #binding{exchange_name = XName, + queue_name = QueueName, + args = Arguments}, Fun) -> + call_with_exchange_and_queue( + XName, QueueName, + fun (X, Q) -> + SortedArgs = rabbit_misc:sort_field_table(Arguments), + Fun(X, Q, Binding#binding{args = SortedArgs}) + end). + +sync_binding(Binding, Durable, Fun) -> + ok = case Durable of + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); + false -> ok + end, + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), + ok. + +call_with_exchange_and_queue(XName, QueueName, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> case {mnesia:read({rabbit_exchange, XName}), + mnesia:read({rabbit_queue, QueueName})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end + end). + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + Module. + +contains(Table, MatchHead) -> + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +remove_for_queue(QueueName, FwdDeleteFun) -> + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Grouped = group_bindings_and_auto_delete( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + not_deleted -> Module:remove_bindings(X, Bs) + end + end, Grouped) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% group_bindings_and_auto_delete1) works properly. +group_bindings_and_auto_delete([], Acc) -> + Acc; +group_bindings_and_auto_delete( + [B = #binding{exchange_name = XName} | Bs], Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B], Acc). + +group_bindings_and_auto_delete( + XName, [B = #binding{exchange_name = XName} | Bs], Bindings, Acc) -> + group_bindings_and_auto_delete(XName, Bs, [B | Bindings], Acc); +group_bindings_and_auto_delete(XName, Removed, Bindings, Acc) -> + %% either Removed is [], or its head has a non-matching XName + [X] = mnesia:read({rabbit_exchange, XName}), + NewAcc = [{{rabbit_exchange:maybe_auto_delete(X), X}, Bindings} | Acc], + group_bindings_and_auto_delete(Removed, NewAcc). + +delete_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). + +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(rabbit_route, Route, write). + +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = XName, + queue_name = QueueName, + key = Key, + args = Args}) -> + #binding{exchange_name = XName, + queue_name = QueueName, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = XName, + queue_name = QueueName, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = XName, + queue_name = QueueName, + key = Key, + args = Args}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 138df716..f19f98d2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,16 +35,17 @@ -behaviour(gen_server2). --export([start_link/6, do/2, do/3, shutdown/1]). +-export([start_link/7, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, flush/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, - transaction_id, tx_participants, next_tag, + start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer}). @@ -76,9 +77,11 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/6 :: +-spec(start_link/7 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> rabbit_types:ok_pid_or_error()). + rabbit_types:vhost(), pid(), + fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -100,9 +103,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []). +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun) -> + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, Username, + VHost, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -128,10 +132,10 @@ list() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:pcall(Pid, 9, info, infinity). + gen_server2:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server2:pcall(Pid, 9, {info, Items}, infinity) of + case gen_server2:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -143,22 +147,23 @@ info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). emit_stats(Pid) -> - gen_server2:pcast(Pid, 7, emit_stats). + gen_server2:cast(Pid, emit_stats). flush(Pid) -> gen_server2:call(Pid, flush). %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, + StartLimiterFun]) -> process_flag(trap_exit, true), - link(WriterPid), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, + start_limiter_fun = StartLimiterFun, transaction_id = none, tx_participants = sets:new(), next_tag = 1, @@ -171,12 +176,23 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer()}, - rabbit_event:notify( - channel_created, - [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + info -> 9; + {info, _Items} -> 9; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + emit_stats -> 7; + _ -> 0 + end. + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -239,12 +255,6 @@ handle_cast(emit_stats, State) -> internal_emit_stats(State), {noreply, State}. -handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, - State = #ch{writer_pid = WriterPid}) -> - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, - {stop, normal, State}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State)}. @@ -259,8 +269,10 @@ terminate(_Reason, State = #ch{state = terminating}) -> terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of - normal -> ok = Res; - _ -> ok + normal -> ok = Res; + shutdown -> ok = Res; + {shutdown, _Term} -> ok = Res; + _ -> ok end, terminate(State). @@ -403,7 +415,7 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = rollback_and_notify(State), - ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), stop; handle_method(#'access.request'{},_, State) -> @@ -807,17 +819,17 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, - NoWait, State); + binding_action(fun rabbit_binding:add/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.bind_ok'{}, NoWait, State); handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, - QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, - false, State); + binding_action(fun rabbit_binding:remove/2, + ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, + #'queue.unbind_ok'{}, false, State); handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, @@ -895,7 +907,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + case Fun(#binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = ActualRoutingKey, + args = Arguments}, fun (_X, Q) -> try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) catch exit:Reason -> {error, Reason} @@ -1016,8 +1031,8 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), +start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> + {ok, LPid} = SLF(queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. @@ -1089,11 +1104,9 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, false -> rabbit_writer:send_command(WriterPid, M, Content) end. -terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> +terminate(_State) -> pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]), - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + rabbit_event:notify(channel_closed, [{pid, self()}]). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1150,7 +1163,7 @@ update_measures(Type, QX, Inc, Measure) -> orddict:store(Measure, Cur + Inc, Measures)). internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> - CoarseStats = [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS], + CoarseStats = infos(?STATISTICS_KEYS, State), case rabbit_event:stats_level(StatsTimer) of coarse -> rabbit_event:notify(channel_stats, CoarseStats); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl new file mode 100644 index 00000000..02199a65 --- /dev/null +++ b/src/rabbit_channel_sup.erl @@ -0,0 +1,96 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel_sup). + +-behaviour(supervisor2). + +-export([start_link/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-export_type([start_link_args/0]). + +-type(start_link_args() :: + {rabbit_types:protocol(), rabbit_net:socket(), + rabbit_channel:channel_number(), non_neg_integer(), pid(), + rabbit_access_control:username(), rabbit_types:vhost(), pid()}). + +-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector}) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, WriterPid} = + supervisor2:start_child( + SupPid, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, ChannelPid} = + supervisor2:start_child( + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ReaderPid, WriterPid, Username, VHost, + Collector, start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + {ok, FramingChannelPid} = + supervisor2:start_child( + SupPid, + {framing_channel, {rabbit_framing_channel, start_link, + [ReaderPid, ChannelPid, Protocol]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}), + {ok, SupPid, FramingChannelPid}. + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_limiter_fun(SupPid) -> + fun (UnackedCount) -> + Me = self(), + {ok, _Pid} = + supervisor2:start_child( + SupPid, + {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}) + end. diff --git a/src/rabbit_tracer.erl b/src/rabbit_channel_sup_sup.erl index 484249b1..21c39780 100644 --- a/src/rabbit_tracer.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -29,22 +29,35 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_tracer). --export([start/0]). +-module(rabbit_channel_sup_sup). --import(erlang). +-behaviour(supervisor2). -start() -> - spawn(fun mainloop/0), - ok. +-export([start_link/0, start_channel/2]). -mainloop() -> - erlang:trace(new, true, [all]), - mainloop1(). +-export([init/1]). -mainloop1() -> - receive - Msg -> - rabbit_log:info("TRACE: ~p~n", [Msg]) - end, - mainloop1(). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) -> + {'ok', pid(), pid()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +start_channel(Pid, Args) -> + supervisor2:start_child(Pid, [Args]). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{channel_sup, {rabbit_channel_sup, start_link, []}, + temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl new file mode 100644 index 00000000..b3821d3b --- /dev/null +++ b/src/rabbit_connection_sup.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_connection_sup). + +-behaviour(supervisor2). + +-export([start_link/0, reader/1]). + +-export([init/1]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid(), pid()}). +-spec(reader/1 :: (pid()) -> pid()). + +-endif. + +%%-------------------------------------------------------------------------- + +start_link() -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, ChannelSupSupPid} = + supervisor2:start_child( + SupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), + {ok, Collector} = + supervisor2:start_child( + SupPid, + {collector, {rabbit_queue_collector, start_link, []}, + intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + {ok, ReaderPid} = + supervisor2:start_child( + SupPid, + {reader, {rabbit_reader, start_link, + [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), + {ok, SupPid, ReaderPid}. + +reader(Pid) -> + hd(supervisor2:find_child(Pid, reader)). + +%%-------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + +start_heartbeat_fun(SupPid) -> + fun (_Sock, 0) -> + none; + (Sock, TimeoutSec) -> + Parent = self(), + {ok, Sender} = + supervisor2:start_child( + SupPid, {heartbeat_sender, + {rabbit_heartbeat, start_heartbeat_sender, + [Parent, Sock, TimeoutSec]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {ok, Receiver} = + supervisor2:start_child( + SupPid, {heartbeat_receiver, + {rabbit_heartbeat, start_heartbeat_receiver, + [Parent, Sock, TimeoutSec]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), + {Sender, Receiver} + end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index f0b623c2..a3b6f369 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -209,6 +209,14 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); +action(set_admin, Node, [Username], _Opts, Inform) -> + Inform("Setting administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, set_admin, [Username]}); + +action(clear_admin, Node, [Username], _Opts, Inform) -> + Inform("Clearing administrative status for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_admin, [Username]}); + action(list_users, Node, [], _Opts, Inform) -> Inform("Listing users", []), display_list(call(Node, {rabbit_access_control, list_users, []})); @@ -246,14 +254,14 @@ action(list_exchanges, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); -action(list_bindings, Node, _Args, Opts, Inform) -> +action(list_bindings, Node, Args, Opts, Inform) -> Inform("Listing bindings", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - InfoKeys = [exchange_name, queue_name, routing_key, args], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys); + ArgAtoms = default_if_empty(Args, [exchange_name, queue_name, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms]), + ArgAtoms); action(list_connections, Node, Args, _Opts, Inform) -> Inform("Listing connections", []), @@ -304,9 +312,11 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || - X <- InfoItemKeys]) - end, Results), + lists:foreach( + fun (Result) -> display_row( + [format_info_item(proplists:get_value(X, Result)) || + X <- InfoItemKeys]) + end, Results), ok; display_info_list(Other, _) -> Other. @@ -315,25 +325,30 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Key, Items) -> - case proplists:get_value(Key, Items) of - #resource{name = Name} -> - escape(Name); - Value when Key =:= address; Key =:= peer_address andalso - is_tuple(Value) -> - inet_parse:ntoa(Value); - Value when is_pid(Value) -> - rabbit_misc:pid_to_string(Value); - Value when is_binary(Value) -> - escape(Value); - Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] - when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> - io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); - Value -> - io_lib:format("~w", [Value]) - end. +-define(IS_U8(X), (X >= 0 andalso X =< 255)). +-define(IS_U16(X), (X >= 0 andalso X =< 65535)). + +format_info_item(#resource{name = Name}) -> + escape(Name); +format_info_item({N1, N2, N3, N4} = Value) when + ?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) -> + inet_parse:ntoa(Value); +format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when + ?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4), + ?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) -> + inet_parse:ntoa(Value); +format_info_item(Value) when is_pid(Value) -> + rabbit_misc:pid_to_string(Value); +format_info_item(Value) when is_binary(Value) -> + escape(Value); +format_info_item(Value) when is_atom(Value) -> + escape(atom_to_list(Value)); +format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = + Value) when is_binary(TableEntryKey) andalso + is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item(Value) -> + io_lib:format("~w", [Value]). display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index af4eb1bd..2a19d5b1 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -34,38 +34,19 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/5, delete_binding/5, list_bindings/1]). --export([delete/2]). --export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([assert_equivalence/5]). --export([assert_args_equivalence/2]). --export([check_type/1]). - -%% EXTENDED API --export([list_exchange_bindings/1]). --export([list_queue_bindings/1]). - --import(mnesia). --import(sets). --import(lists). + info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +%% this must be run inside a mnesia tx +-export([maybe_auto_delete/1]). +-export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([name/0, type/0, binding_key/0]). +-export_type([name/0, type/0]). -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). --type(binding_key() :: binary()). - --type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' | - 'exchange_not_found' | - 'exchange_and_queue_not_found')). --type(inner_fun() :: - fun((rabbit_types:exchange(), queue()) -> - rabbit_types:ok_or_error(rabbit_types:amqp_error()))). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: @@ -97,32 +78,12 @@ -> [[rabbit_types:info()]]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). --spec(add_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) -> bind_res()). --spec(delete_binding/5 :: - (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table(), inner_fun()) - -> bind_res() | rabbit_types:error('binding_not_found')). --spec(list_bindings/1 :: - (rabbit_types:vhost()) - -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(delete_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). --spec(delete_transient_queue_bindings/1 :: - (rabbit_amqqueue:name()) -> fun (() -> any())). -spec(delete/2 :: (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). --spec(list_queue_bindings/1 :: - (rabbit_amqqueue:name()) - -> [{name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). --spec(list_exchange_bindings/1 :: - (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(), - rabbit_framing:amqp_table()}]). +-spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> + 'not_deleted' | 'auto_deleted'). -endif. @@ -131,27 +92,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). recover() -> - Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] - end, [], rabbit_durable_exchange), - Bs = rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write), - [B | Acc] - end, [], rabbit_durable_route), - recover_with_bindings(Bs, Exs), - ok. - -recover_with_bindings(Bs, Exs) -> + Xs = rabbit_misc:table_fold( + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_binding:recover(), recover_with_bindings( lists:keysort(#binding.exchange_name, Bs), - lists:keysort(#exchange.name, Exs), []). + lists:keysort(#exchange.name, Xs), []). recover_with_bindings([B = #binding{exchange_name = Name} | Rest], Xs = [#exchange{name = Name} | _], @@ -163,38 +112,36 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end end) of {new, X} -> TypeModule:create(X), - rabbit_event:notify( - exchange_created, - [{Item, i(Item, Exchange)} || Item <- ?INFO_KEYS]), + rabbit_event:notify(exchange_created, info(X)), X; {existing, X} -> X; Err -> Err @@ -220,9 +167,9 @@ check_type(TypeBin) -> end end. -assert_equivalence(X = #exchange{ durable = Durable, +assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, - type = Type}, + type = Type}, Type, Durable, AutoDelete, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, @@ -232,8 +179,7 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -assert_args_equivalence(#exchange{ name = Name, - arguments = Args }, +assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic %% equivalence". The only arg we care about is @@ -311,256 +257,20 @@ publish(X = #exchange{type = Type}, Seen, Delivery) -> R end. -%% TODO: Should all of the route and binding management not be -%% refactored to its own module, especially seeing as unbind will have -%% to be implemented for 0.91 ? - -delete_exchange_bindings(ExchangeName) -> - [begin - ok = mnesia:delete_object(rabbit_reverse_route, - reverse_route(Route), write), - ok = delete_forward_routes(Route), - Route#route.binding - end || Route <- mnesia:match_object( - rabbit_route, - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - write)]. - -delete_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_forward_routes/1). - -delete_transient_queue_bindings(QueueName) -> - delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). - -delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - Route = reverse_route(ReverseRoute), - ok = FwdDeleteFun(Route), - ok = mnesia:delete_object(rabbit_reverse_route, - ReverseRoute, write), - Route#route.binding - end || ReverseRoute - <- mnesia:match_object( - rabbit_reverse_route, - reverse_route(#route{binding = #binding{ - queue_name = QueueName, - _ = '_'}}), - write)], - Cleanup = cleanup_deleted_queue_bindings( - lists:keysort(#binding.exchange_name, DeletedBindings), []), - fun () -> - lists:foreach( - fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, Bs); - not_deleted -> Module:remove_bindings(X, Bs) - end - end, Cleanup) - end. - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], Acc) -> - Acc; -cleanup_deleted_queue_bindings( - [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). - -cleanup_deleted_queue_bindings( - ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], - Bindings, Acc) -> - cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); -cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> - %% either Deleted is [], or its head has a non-matching ExchangeName - NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], - cleanup_deleted_queue_bindings(Deleted, NewAcc). - -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - {maybe_auto_delete(X), Bindings}. - -delete_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write), - ok = mnesia:delete_object(rabbit_durable_route, Route, write). - -delete_transient_forward_routes(Route) -> - ok = mnesia:delete_object(rabbit_route, Route, write). - -contains(Table, MatchHead) -> - continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)). - -continue('$end_of_table') -> false; -continue({[_|_], _}) -> true; -continue({[], Continuation}) -> continue(mnesia:select(Continuation)). - -call_with_exchange(Exchange, Fun) -> +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -call_with_exchange_and_queue(Exchange, Queue, Fun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> case {mnesia:read({rabbit_exchange, Exchange}), - mnesia:read({rabbit_queue, Queue})} of - {[X], [Q]} -> Fun(X, Q); - {[ ], [_]} -> {error, exchange_not_found}; - {[_], [ ]} -> {error, queue_not_found}; - {[ ], [ ]} -> {error, exchange_and_queue_not_found} - end - end). - -add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(X, Q) of - ok -> - case mnesia:read({rabbit_route, B}) of - [] -> - ok = sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:write/3), - rabbit_event:notify( - binding_created, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}, - {routing_key, RoutingKey}, - {arguments, Arguments}]), - {new, X, B}; - [_R] -> - {existing, X, B} - end; - {error, _} = E -> - E - end - end) of - {new, Exchange = #exchange{ type = Type }, Binding} -> - (type_to_module(Type)):add_binding(Exchange, Binding); - {existing, _, _} -> - ok; - {error, _} = Err -> - Err - end. - -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> - case binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> - {error, binding_not_found}; - _ -> - case InnerFun(X, Q) of - ok -> - ok = - sync_binding(B, - X#exchange.durable andalso - Q#amqqueue.durable, - fun mnesia:delete_object/3), - rabbit_event:notify( - binding_deleted, - [{exchange_name, ExchangeName}, - {queue_name, QueueName}]), - {maybe_auto_delete(X), B}; - {error, _} = E -> - E - end - end - end) of - {error, _} = Err -> - Err; - {{IsDeleted, X = #exchange{ type = Type }}, B} -> - Module = type_to_module(Type), - case IsDeleted of - auto_deleted -> Module:delete(X, [B]); - not_deleted -> Module:remove_bindings(X, [B]) - end - end. - -binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> - Fun(X, Q, #binding{ - exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = rabbit_misc:sort_field_table(Arguments)}) - end). - -sync_binding(Binding, Durable, Fun) -> - ok = case Durable of - true -> Fun(rabbit_durable_route, - #route{binding = Binding}, write); - false -> ok - end, - {Route, ReverseRoute} = route_with_reverse(Binding), - ok = Fun(rabbit_route, Route, write), - ok = Fun(rabbit_reverse_route, ReverseRoute, write), - ok. - -list_bindings(VHostPath) -> - [{ExchangeName, QueueName, RoutingKey, Arguments} || - #route{binding = #binding{ - exchange_name = ExchangeName, - key = RoutingKey, - queue_name = QueueName, - args = Arguments}} - <- mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{ - exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. - -route_with_reverse(#route{binding = Binding}) -> - route_with_reverse(Binding); -route_with_reverse(Binding = #binding{}) -> - Route = #route{binding = Binding}, - {Route, reverse_route(Route)}. - -reverse_route(#route{binding = Binding}) -> - #reverse_route{reverse_binding = reverse_binding(Binding)}; - -reverse_route(#reverse_route{reverse_binding = Binding}) -> - #route{binding = reverse_binding(Binding)}. - -reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; - -reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> - #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -568,54 +278,23 @@ delete(ExchangeName, IfUnused) -> Error end. -maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {not_deleted, Exchange}; -maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - case conditional_delete(Exchange) of - {error, in_use} -> {not_deleted, Exchange}; - {deleted, Exchange, []} -> {auto_deleted, Exchange} +maybe_auto_delete(#exchange{auto_delete = false}) -> + not_deleted; +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - %% we need to check for durable routes here too in case a bunch of - %% routes to durable queues have been removed temporarily as a - %% result of a node failure - case contains(rabbit_route, Match) orelse - contains(rabbit_durable_route, Match) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = delete_exchange_bindings(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. - -%%---------------------------------------------------------------------------- -%% EXTENDED API -%% These are API calls that are not used by the server internally, -%% they are exported for embedded clients to use - -%% This is currently used in mod_rabbit.erl (XMPP) and expects this to -%% return {QueueName, RoutingKey, Arguments} tuples -list_exchange_bindings(ExchangeName) -> - Route = #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - [{QueueName, RoutingKey, Arguments} || - #route{binding = #binding{queue_name = QueueName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. - -% Refactoring is left as an exercise for the reader -list_queue_bindings(QueueName) -> - Route = #route{binding = #binding{queue_name = QueueName, - _ = '_'}}, - [{ExchangeName, RoutingKey, Arguments} || - #route{binding = #binding{exchange_name = ExchangeName, - key = RoutingKey, - args = Arguments}} - <- mnesia:dirty_match_object(rabbit_route, Route)]. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 44607398..0a59a175 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -79,8 +79,8 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort -%% (rabbit_misc:sort_field_table) that route/3 and -%% rabbit_exchange:{add,delete}_binding/4 do. +%% (rabbit_misc:sort_field_table) that publish/1 and +%% rabbit_binding:{add,remove}/2 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 89b2441e..e796acf3 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery = Delivery). split_topic_key(Key) -> - re:split(Key, "\\.", [{return, list}]). + string:tokens(binary_to_list(Key), "."). topic_matches(PatternKey, RoutingKey) -> P = split_topic_key(PatternKey), diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 553faaa8..cb53185f 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -39,16 +39,9 @@ %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs, Protocol) -> - Parent = self(), - {ok, spawn_link( - fun () -> - %% we trap exits so that a normal termination of - %% the channel or reader process terminates us too. - process_flag(trap_exit, true), - {ok, ChannelPid} = apply(StartFun, StartArgs), - mainloop(Parent, ChannelPid, Protocol) - end)}. +start_link(Parent, ChannelPid, Protocol) -> + {ok, proc_lib:spawn_link( + fun () -> mainloop(Parent, ChannelPid, Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -62,12 +55,6 @@ shutdown(Pid) -> read_frame(ChannelPid) -> receive - %% converting the exit signal into one of our own ensures that - %% the reader sees the right pid (i.e. ours) when a channel - %% exits. Similarly in the other direction, though it is not - %% really relevant there since the channel is not specifically - %% watching out for reader exit signals. - {'EXIT', _Pid, Reason} -> exit(Reason); {frame, Frame} -> Frame; terminate -> rabbit_channel:shutdown(ChannelPid), read_frame(ChannelPid); diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index ab50c28c..a9945af1 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,16 +31,26 @@ -module(rabbit_heartbeat). --export([start_heartbeat/2, pause_monitor/1, resume_monitor/1]). +-export([start_heartbeat_sender/3, start_heartbeat_receiver/3, + pause_monitor/1, resume_monitor/1]). + +-include("rabbit.hrl"). %%---------------------------------------------------------------------------- -ifdef(use_specs). +-export_type([heartbeaters/0]). + -type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). --spec(start_heartbeat/2 :: (rabbit_net:socket(), non_neg_integer()) -> - heartbeaters()). +-spec(start_heartbeat_sender/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/3 :: + (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -48,27 +58,26 @@ %%---------------------------------------------------------------------------- -start_heartbeat(_Sock, 0) -> - none; -start_heartbeat(Sock, TimeoutSec) -> - Parent = self(), +start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. - Sender = heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, - fun () -> - catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), - continue - end}, Parent), + heartbeater( + {Sock, TimeoutSec * 1000 div 2, send_oct, 0, + fun () -> + catch rabbit_net:send( + Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end}). + +start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. - Receiver = heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> - Parent ! timeout, - stop - end}, Parent), - {Sender, Receiver}. + heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> + Parent ! timeout, + stop + end}). pause_monitor(none) -> ok; @@ -84,21 +93,15 @@ resume_monitor({_Sender, Receiver}) -> %%---------------------------------------------------------------------------- -heartbeater(Params, Parent) -> - spawn_link(fun () -> heartbeater(Params, erlang:monitor(process, Parent), - {0, 0}) - end). +heartbeater(Params) -> + {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, - MonitorRef, {StatVal, SameCount}) -> - Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + {StatVal, SameCount}) -> + Recurse = fun (V) -> heartbeater(Params, V) end, receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; pause -> receive - {'DOWN', MonitorRef, process, _Object, _Info} -> - ok; resume -> Recurse({0, 0}); Other -> diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl deleted file mode 100644 index 3fc84c1e..00000000 --- a/src/rabbit_hooks.erl +++ /dev/null @@ -1,73 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_hooks). - --export([start/0]). --export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). - --define(TableName, rabbit_hooks). - --ifdef(use_specs). - --spec(start/0 :: () -> 'ok'). --spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). --spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). --spec(trigger/2 :: (atom(), list()) -> 'ok'). --spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). - --endif. - -start() -> - ets:new(?TableName, [bag, public, named_table]), - ok. - -subscribe(Hook, HandlerName, Handler) -> - ets:insert(?TableName, {Hook, HandlerName, Handler}), - ok. - -unsubscribe(Hook, HandlerName) -> - ets:match_delete(?TableName, {Hook, HandlerName, '_'}), - ok. - -trigger(Hook, Args) -> - Hooks = ets:lookup(?TableName, Hook), - [case catch apply(M, F, [Hook, Name, Args | A]) of - {'EXIT', Reason} -> - rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", - [Name, Hook, Reason]); - _ -> ok - end || {_, Name, {M, F, A}} <- Hooks], - ok. - -notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> - Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, - ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9894a850..c323d7ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -34,8 +34,8 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). --export([start_link/2, shutdown/1]). + handle_info/2, prioritise_call/3]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -47,7 +47,6 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok_pid_or_error()). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -77,17 +76,10 @@ start_link(ChPid, UnackedMsgCount) -> gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). - limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -115,7 +107,7 @@ get_limit(undefined) -> get_limit(Pid) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + fun () -> gen_server2:call(Pid, get_limit, infinity) end). block(undefined) -> ok; @@ -125,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -135,6 +126,9 @@ unblock(LimiterPid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +prioritise_call(get_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. + handle_call({can_send, _QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, State}; @@ -165,9 +159,6 @@ handle_call(unblock, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end. -handle_cast(shutdown, State) -> - {stop, normal, State}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count @@ -247,9 +238,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl deleted file mode 100644 index e0457b1e..00000000 --- a/src/rabbit_load.erl +++ /dev/null @@ -1,78 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_load). - --export([local_load/0, remote_loads/0, pick/0]). - --define(FUDGE_FACTOR, 0.98). --define(TIMEOUT, 100). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}). --spec(local_load/0 :: () -> load()). --spec(remote_loads/0 :: () -> [load()]). --spec(pick/0 :: () -> node()). - --endif. - -%%---------------------------------------------------------------------------- - -local_load() -> - LoadAvg = case whereis(cpu_sup) of - undefined -> unknown; - _ -> case cpu_sup:avg1() of - L when is_integer(L) -> L; - {error, timeout} -> unknown - end - end, - {{statistics(run_queue), LoadAvg}, node()}. - -remote_loads() -> - {ResL, _BadNodes} = - rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT), - ResL. - -pick() -> - RemoteLoads = remote_loads(), - {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node; we rely on Erlang's term order - %% of SomeFloat < local_unknown < unknown. - AdjustedLoadAvg = case LoadAvg of - unknown -> local_unknown; - _ -> LoadAvg * ?FUDGE_FACTOR - end, - Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], - {_, SelectedNode} = lists:min(Loads), - SelectedNode. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 5fa3f8ed..086d260e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -39,7 +39,6 @@ -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). --export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). @@ -108,10 +107,6 @@ rabbit_framing:amqp_table(), rabbit_types:r(any()), [binary()]) -> 'ok' | rabbit_types:connection_exit()). --spec(get_config/1 :: - (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). --spec(get_config/2 :: (atom(), A) -> A). --spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: @@ -240,21 +235,6 @@ assert_args_equivalence1(Orig, New, Name, Key) -> [Key, rabbit_misc:rs(Name), New1, Orig1]) end. -get_config(Key) -> - case dirty_read({rabbit_config, Key}) of - {ok, {rabbit_config, Key, V}} -> {ok, V}; - Other -> Other - end. - -get_config(Key, DefaultValue) -> - case get_config(Key) of - {ok, V} -> V; - {error, not_found} -> DefaultValue - end. - -set_config(Key, Value) -> - ok = mnesia:dirty_write({rabbit_config, Key, Value}). - dirty_read(ReadSpec) -> case mnesia:dirty_read(ReadSpec) of [Result] -> {ok, Result}; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4a5adfae..a3214888 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -169,10 +169,6 @@ table_definitions() -> {attributes, record_info(fields, vhost)}, {disc_copies, [node()]}, {match, #vhost{_='_'}}]}, - {rabbit_config, - [{attributes, [key, val]}, % same mnesia's default - {disc_copies, [node()]}, - {match, {rabbit_config, '_', '_'}}]}, {rabbit_listener, [{record_name, listener}, {attributes, record_info(fields, listener)}, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5bc1f9d5..bbecbfe2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,13 +34,13 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/1, + sync/3, client_init/2, client_terminate/2, client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -98,8 +98,7 @@ }). -record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked, readers}). + {file, valid_total_size, left, right, file_size, locked, readers}). %%---------------------------------------------------------------------------- @@ -136,7 +135,7 @@ 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), binary()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). @@ -159,8 +158,7 @@ %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: -%% {File, ValidTotalSize, ContiguousTop, Left, Right, -%% FileSize, Locked, Readers} +%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} %% %% The basic idea is that messages are appended to the current file up %% until that file becomes too big (> file_size_limit). At that point, @@ -176,9 +174,7 @@ %% %% As messages are removed from files, holes appear in these %% files. The field ValidTotalSize contains the total amount of useful -%% data left in the file, whilst ContiguousTop contains the amount of -%% valid data right at the start of each file. These are needed for -%% garbage collection. +%% data left in the file. This is needed for garbage collection. %% %% When we discover that a file is now empty, we delete it. When we %% discover that it can be combined with the useful data in either its @@ -224,9 +220,7 @@ %% above B (i.e. truncate to the limit of the good contiguous region %% at the start of the file), then write C and D on top and then write %% E, F and G from the right file on top. Thus contiguous blocks of -%% good data at the bottom of files are not rewritten (yes, this is -%% the data the size of which is tracked by the ContiguousTop -%% variable. Judicious use of a mirror is required). +%% good data at the bottom of files are not rewritten. %% %% +-------+ +-------+ +-------+ %% | X | | G | | G | @@ -317,7 +311,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -328,8 +322,8 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:pcall( - Server, 2, {read, Guid}, infinity), + Defer = fun() -> {gen_server2:call( + Server, {read, Guid}, infinity), CState} end, case index_lookup(Guid, CState) of not_found -> Defer(); @@ -351,13 +345,13 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal +sync(Server) -> gen_server2:cast(Server, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> - gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}). + gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). set_maximum_since_use(Server, Age) -> - gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Server, {set_maximum_since_use, Age}). client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, @@ -373,13 +367,13 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState, Server) -> close_all_handles(CState), - ok. + ok = gen_server2:call(Server, client_terminate, infinity). client_delete_and_terminate(CState, Server, Ref) -> - ok = client_terminate(CState), - ok = gen_server2:call(Server, {delete_client, Ref}, infinity). + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). @@ -581,6 +575,22 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_call(Msg, _From, _State) -> + case Msg of + {new_client_state, _Ref} -> 7; + successfully_recovered_state -> 7; + {read, _Guid} -> 2; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + sync -> 8; + {gc_done, _Reclaimed, _Source, _Destination} -> 8; + {set_maximum_since_use, _Age} -> 8; + _ -> 0 + end. + handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), noreply(State1); @@ -606,12 +616,10 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({delete_client, CRef}, _From, - State = #msstate { client_refs = ClientRefs }) -> - reply(ok, - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). +handle_call(client_terminate, _From, State) -> + reply(ok, State). -handle_cast({write, Guid, Msg}, +handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, @@ -619,6 +627,7 @@ handle_cast({write, Guid, Msg}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> %% New message, lots to do @@ -629,20 +638,14 @@ handle_cast({write, Guid, Msg}, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, right = undefined, locked = false, file_size = FileSize }] = ets:lookup(FileSummaryEts, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = case CurOffset =:= ContiguousTop of - true -> ValidTotalSize1; - false -> ContiguousTop - end, true = ets:update_element( FileSummaryEts, CurFile, [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}, {#file_summary.file_size, FileSize + TotalSize}]), NextOffset = CurOffset + TotalSize, noreply( @@ -723,7 +726,12 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). handle_info(timeout, State) -> noreply(internal_sync(State)); @@ -898,8 +906,7 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = - index_lookup(Guid, State), + total_size = TotalSize } = index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because @@ -907,7 +914,6 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, %% msg. ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of @@ -915,12 +921,11 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, add_to_pending_gc_completion({remove, Guid}, State); false -> ok = index_delete(Guid, State), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.contiguous_top, ContiguousTop1}]), + true = + ets:update_element( + FileSummaryEts, File, + [{#file_summary.valid_total_size, ValidTotalSize1}]), State1 = delete_file_if_empty(File, State), State1 #msstate { sum_valid_data = SumValid - TotalSize } end; @@ -1267,16 +1272,17 @@ scan_file_for_valid_messages(Dir, FileName) -> %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. -find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []). +drop_contiguous_block_prefix(L) -> drop_contiguous_block_prefix(L, 0). -find_contiguous_block_prefix([], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}; -find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, Guids) -> +drop_contiguous_block_prefix([], ExpectedOffset) -> + {ExpectedOffset, []}; +drop_contiguous_block_prefix([#msg_location { offset = ExpectedOffset, + total_size = TotalSize } | Tail], + ExpectedOffset) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> - {ExpectedOffset, Guids}. + drop_contiguous_block_prefix(Tail, ExpectedOffset1); +drop_contiguous_block_prefix(MsgsAfterGap, ExpectedOffset) -> + {ExpectedOffset, MsgsAfterGap}. build_index(true, _StartupFunState, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1352,9 +1358,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, {VMAcc, VTSAcc} end end, {[], 0}, Messages), - %% foldl reverses lists, find_contiguous_block_prefix needs - %% msgs eldest first, so, ValidMessages is the right way round - {ContiguousTop, _} = find_contiguous_block_prefix(ValidMessages), {Right, FileSize1} = case Files of %% if it's the last file, we'll truncate to remove any @@ -1371,7 +1374,6 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, ok = gatherer:in(Gatherer, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, left = Left, right = Right, file_size = FileSize1, @@ -1399,7 +1401,6 @@ maybe_roll_to_new_file( true = ets:insert_new(FileSummaryEts, #file_summary { file = NextFile, valid_total_size = 0, - contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, @@ -1526,7 +1527,6 @@ gc(SrcFile, DstFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) -> true = ets:update_element( FileSummaryEts, DstFile, [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, {#file_summary.file_size, TotalValidData}]), SrcFileSize + DstFileSize - TotalValidData; false -> concurrent_readers @@ -1537,7 +1537,6 @@ combine_files(#file_summary { file = Source, left = Destination }, #file_summary { file = Destination, valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, right = Source }, State = {_FileSummaryEts, Dir, _Index, _IndexState}) -> SourceName = filenum_to_name(Source), @@ -1553,41 +1552,32 @@ combine_files(#file_summary { file = Source, %% the DestinationContiguousTop to a tmp file then truncate, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source - case DestinationContiguousTop =:= DestinationValid of - true -> - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize); - false -> - {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset =/= DestinationContiguousTop -> - %% it cannot be that Offset =:= - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - end, DestinationWorkList), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and index_state has been updated to - %% reflect the compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:delete(TmpHdl) + {DestinationWorkList, DestinationValid} = + find_unremoved_messages_in_file(Destination, State), + {DestinationContiguousTop, DestinationWorkListTail} = + drop_contiguous_block_prefix(DestinationWorkList), + case DestinationWorkListTail of + [] -> ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize); + _ -> Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE++?WRITE_MODE), + ok = copy_messages( + DestinationWorkListTail, DestinationContiguousTop, + DestinationValid, DestinationHdl, TmpHdl, Destination, + State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage + %% from Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:delete(TmpHdl) end, {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index c7948b7e..a7855bbf 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). -record(gcstate, {dir, @@ -81,7 +81,7 @@ stop(Server) -> gen_server2:call(Server, stop, infinity). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -97,6 +97,9 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 3facef17..5cfd6a5c 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -93,7 +93,14 @@ usage() -> action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), application:load(rabbit), - NodeName = rabbit_misc:nodeparts(getenv("RABBITMQ_NODENAME")), + {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( + getenv("RABBITMQ_NODENAME")), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); + {ok, _} -> + ok + end, {NodePids, Running} = case list_to_integer(NodeCount) of 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), @@ -303,8 +310,8 @@ kill_wait(Pid, TimeLeft, Forceful) -> is_dead(Pid) -> PidS = integer_to_list(Pid), with_os([{unix, fun () -> - Res = os:cmd("ps --no-headers --pid " ++ PidS), - Res == "" + system("kill -0 " ++ PidS + ++ " >/dev/null 2>&1") /= 0 end}, {win32, fun () -> Res = os:cmd("tasklist /nh /fi \"pid eq " ++ @@ -315,6 +322,16 @@ is_dead(Pid) -> end end}]). +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + call_all_nodes(Func) -> case read_pids_file() of [] -> throw(no_nodes_running); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 6baa4b88..2286896b 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -46,7 +46,7 @@ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(error() :: rabbit_types:error(any())). --type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()). +-type(socket() :: port() | #ssl_socket{}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). @@ -72,72 +72,58 @@ %%--------------------------------------------------------------------------- +-define(IS_SSL(Sock), is_record(Sock, ssl_socket)). -async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> +async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), spawn(fun () -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} - end), + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), {ok, Ref}; - async_recv(Sock, Length, infinity) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, -1); - async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when is_record(Sock, ssl_socket) -> +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); - close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> ssl:controlling_process(Sock#ssl_socket.ssl, Pid); - controlling_process(Sock, Pid) when is_port(Sock) -> gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> +getstat(Sock, Stats) when ?IS_SSL(Sock) -> inet:getstat(Sock#ssl_socket.tcp, Stats); - getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). - -peername(Sock) when is_record(Sock, ssl_socket) -> +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); - peername(Sock) when is_port(Sock) -> inet:peername(Sock). - -port_command(Sock, Data) when is_record(Sock, ssl_socket) -> +port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of - ok -> - self() ! {inet_reply, Sock, ok}, - true; - {error, Reason} -> - erlang:error(Reason) + ok -> self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> erlang:error(Reason) end; - port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when is_record(Sock, ssl_socket) -> +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); - send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). -sockname(Sock) when is_record(Sock, ssl_socket) -> +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); - sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index f968b0d8..6dbd54d2 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -46,6 +46,8 @@ -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-include_lib("ssl/src/ssl_record.hrl"). + -define(RABBIT_TCP_OPTS, [ binary, @@ -107,8 +109,37 @@ boot_ssl() -> ok; {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), - {ok, SslOpts} = application:get_env(ssl_options), - [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], + {ok, SslOptsConfig} = application:get_env(ssl_options), + % unknown_ca errors are silently ignored prior to R14B unless we + % supply this verify_fun - remove when at least R14B is required + SslOpts = + case proplists:get_value(verify, SslOptsConfig, verify_none) of + verify_none -> SslOptsConfig; + verify_peer -> [{verify_fun, fun([]) -> true; + ([_|_]) -> false + end} + | SslOptsConfig] + end, + % In R13B04 and R14A (at least), rc4 is incorrectly implemented. + CipherSuites = proplists:get_value(ciphers, + SslOpts, + ssl:cipher_suites()), + FilteredCipherSuites = + [C || C <- CipherSuites, + begin + SuiteCode = + if is_tuple(C) -> ssl_cipher:suite(C); + is_list(C) -> ssl_cipher:openssl_suite(C) + end, + SP = ssl_cipher:security_parameters( + SuiteCode, + #security_parameters{}), + SP#security_parameters.bulk_cipher_algorithm =/= ?RC4 + end], + SslOpts1 = [{ciphers, FilteredCipherSuites} + | [{K, V} || {K, V} <- SslOpts, K =/= ciphers]], + [start_ssl_listener(Host, Port, SslOpts1) + || {Host, Port} <- SslListeners], ok end. @@ -118,7 +149,7 @@ start() -> {rabbit_tcp_client_sup, {tcp_client_sup, start_link, [{local, rabbit_tcp_client_sup}, - {rabbit_reader,start_link,[]}]}, + {rabbit_connection_sup,start_link,[]}]}, transient, infinity, supervisor, [tcp_client_sup]}), ok. @@ -204,10 +235,10 @@ on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). start_client(Sock, SockTransform) -> - {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock, SockTransform}, - Child. + {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), + ok = rabbit_net:controlling_process(Sock, Reader), + Reader ! {go, Sock, SockTransform}, + Reader. start_client(Sock) -> start_client(Sock, fun (S) -> {ok, S} end). @@ -230,8 +261,9 @@ start_ssl_client(SslOpts, Sock) -> end). connections() -> - [Pid || {_, Pid, _, _} <- supervisor:which_children( - rabbit_tcp_client_sup)]. + [rabbit_connection_sup:reader(ConnSup) || + {_, ConnSup, supervisor, _} + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index c9f75be0..b23776cd 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -51,7 +51,7 @@ %%---------------------------------------------------------------------------- start() -> - io:format("Activating RabbitMQ plugins ..."), + io:format("Activating RabbitMQ plugins ...~n"), %% Ensure Rabbit is loaded so we can access it's environment application:load(rabbit), @@ -77,7 +77,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), + RabbitVersion = proplists:get_value(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, @@ -130,8 +130,9 @@ start() -> ok -> ok; error -> error("failed to compile boot script file ~s", [ScriptFile]) end, - io:format("~n~w plugins activated:~n", [length(PluginApps)]), - [io:format("* ~w~n", [App]) || App <- PluginApps], + io:format("~w plugins activated:~n", [length(PluginApps)]), + [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) + || App <- PluginApps], io:nl(), halt(), ok. @@ -150,29 +151,33 @@ determine_version(App) -> {ok, Vsn} = application:get_key(App, vsn), {App, Vsn}. -assert_dir(Dir) -> - case filelib:is_dir(Dir) of - true -> ok; - false -> ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) - end. - -delete_dir(Dir) -> - case filelib:is_dir(Dir) of +delete_recursively(Fn) -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of true -> - case file:list_dir(Dir) of + case file:list_dir(Fn) of {ok, Files} -> - [case Dir ++ "/" ++ F of - Fn -> - case filelib:is_dir(Fn) and not(is_symlink(Fn)) of - true -> delete_dir(Fn); - false -> file:delete(Fn) - end - end || F <- Files] - end, - ok = file:del_dir(Dir); + case lists:foldl(fun ( Fn1, ok) -> delete_recursively( + Fn ++ "/" ++ Fn1); + (_Fn1, Err) -> Err + end, ok, Files) of + ok -> case file:del_dir(Fn) of + ok -> ok; + {error, E} -> {error, + {cannot_delete, Fn, E}} + end; + Err -> Err + end; + {error, E} -> + {error, {cannot_list_files, Fn, E}} + end; false -> - ok + case filelib:is_file(Fn) of + true -> case file:delete(Fn) of + ok -> ok; + {error, E} -> {error, {cannot_delete, Fn, E}} + end; + false -> ok + end end. is_symlink(Name) -> @@ -181,13 +186,18 @@ is_symlink(Name) -> _ -> false end. -unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> +unpack_ez_plugins(SrcDir, DestDir) -> %% Eliminate the contents of the destination directory - delete_dir(PluginDestDir), - - assert_dir(PluginDestDir), - [unpack_ez_plugin(PluginName, PluginDestDir) || - PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> error("Could not delete dir ~s (~p)", [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> error("Could not create dir ~s (~p)", [DestDir, E2]) + end, + [unpack_ez_plugin(PluginName, DestDir) || + PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")]. unpack_ez_plugin(PluginFn, PluginDestDir) -> zip:unzip(PluginFn, [{cwd, PluginDestDir}]), @@ -246,8 +256,8 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[stdlib,permanent]}}) -> - [Entry, {apply,{rabbit,prepare,[]}}]; +process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> + [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index b056d60b..0a49b94d 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -33,7 +33,7 @@ -behaviour(gen_server). --export([start_link/0, register/2, delete_all/1, shutdown/1]). +-export([start_link/0, register/2, delete_all/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -49,7 +49,6 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). --spec(shutdown/1 :: (pid()) -> 'ok'). -endif. @@ -64,9 +63,6 @@ register(CollectorPid, Q) -> delete_all(CollectorPid) -> gen_server:call(CollectorPid, delete_all, infinity). -shutdown(CollectorPid) -> - gen_server:cast(CollectorPid, shutdown). - %%---------------------------------------------------------------------------- init([]) -> @@ -90,8 +86,8 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. -handle_cast(shutdown, State) -> - {stop, normal, State}. +handle_cast(Msg, State) -> + {stop, {unhandled_cast, Msg}, State}. handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, State = #state{queues = Queues}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a133bf45..252f81a3 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -33,11 +33,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/0, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/2]). +-export([init/4, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -46,7 +46,6 @@ -export([emit_stats/1]). -import(gen_tcp). --import(fprof). -import(inet). -import(prim_inet). @@ -60,7 +59,8 @@ %--------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, - connection_state, queue_collector, heartbeater, stats_timer}). + connection_state, queue_collector, heartbeater, stats_timer, + channel_sup_sup_pid, start_heartbeat_fun}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -160,6 +160,12 @@ -ifdef(use_specs). +-type(start_heartbeat_fun() :: + fun ((rabbit_networking:socket(), non_neg_integer()) -> + rabbit_heartbeat:heartbeaters())). + +-spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> + rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). -spec(info/1 :: (pid()) -> [rabbit_types:info()]). -spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]). @@ -168,21 +174,33 @@ -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(start_connection/7 :: + (pid(), pid(), pid(), start_heartbeat_fun(), any(), + rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- -start_link() -> - {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. +start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> + {ok, proc_lib:spawn_link(?MODULE, init, [self(), ChannelSupSupPid, + Collector, StartHeartbeatFun])}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent) -> +init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> - start_connection(Parent, Deb, Sock, SockTransform) + start_connection( + Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + SockTransform) end. system_continue(Parent, Deb, State) -> @@ -208,33 +226,6 @@ info(Pid, Items) -> emit_stats(Pid) -> gen_server:cast(Pid, emit_stats). -setup_profiling() -> - Value = rabbit_misc:get_config(profiling_enabled, false), - case Value of - once -> - rabbit_log:info("Enabling profiling for this connection, " - "and disabling for subsequent.~n"), - rabbit_misc:set_config(profiling_enabled, false), - fprof:trace(start); - true -> - rabbit_log:info("Enabling profiling for this connection.~n"), - fprof:trace(start); - false -> - ok - end, - Value. - -teardown_profiling(Value) -> - case Value of - false -> - ok; - _ -> - rabbit_log:info("Completing profiling for this connection.~n"), - fprof:trace(stop), - fprof:profile(), - fprof:analyse([{dest, []}, {cols, 100}]) - end. - conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -261,7 +252,8 @@ socket_op(Sock, Fun) -> exit(normal) end. -start_connection(Parent, Deb, Sock, SockTransform) -> +start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, + Sock, SockTransform) -> process_flag(trap_exit, true), {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), PeerAddressS = inet_parse:ntoa(PeerAddress), @@ -270,28 +262,29 @@ start_connection(Parent, Deb, Sock, SockTransform) -> ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer()}, - handshake, 8)) + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + stats_timer = + rabbit_event:init_stats_timer(), + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun + }, + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -308,9 +301,6 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% output to be sent, which results in unnecessary delays. %% %% gen_tcp:close(ClientSock), - teardown_profiling(ProfilingValue), - rabbit_misc:unlink_and_capture_exit(Collector), - rabbit_queue_collector:shutdown(Collector), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. @@ -347,10 +337,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> exit(Reason); {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_channel_exit(Channel, Reason, State)); - {'EXIT', Pid, Reason} -> - mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); + {channel_exit, ChannelOrFrPid, Reason} -> + mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State)); + {'DOWN', _MRef, process, ChSupPid, Reason} -> + mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State)); terminate_connection -> State; handshake_timeout -> @@ -443,33 +433,45 @@ close_channel(Channel, State) -> put({channel, Channel}, closing), State. -handle_channel_exit(ChPid, Reason, State) when is_pid(ChPid) -> - {channel, Channel} = get({chpid, ChPid}), +handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) -> + {channel, Channel} = get({ch_fr_pid, ChFrPid}), handle_exception(State, Channel, Reason); handle_channel_exit(Channel, Reason, State) -> handle_exception(State, Channel, Reason). -handle_dependent_exit(Pid, normal, State) -> - erase({chpid, Pid}), - maybe_close(State); -handle_dependent_exit(Pid, Reason, State) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> maybe_close(handle_exception(State, Channel, Reason)) +handle_dependent_exit(ChSupPid, Reason, State) -> + case termination_kind(Reason) of + controlled -> + case erase({ch_sup_pid, ChSupPid}) of + undefined -> ok; + {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr) + end, + maybe_close(State); + uncontrolled -> + case channel_cleanup(ChSupPid) of + undefined -> + exit({abnormal_dependent_exit, ChSupPid, Reason}); + Channel -> + maybe_close(handle_exception(State, Channel, Reason)) + end end. -channel_cleanup(Pid) -> - case get({chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> erase({channel, Channel}), - erase({chpid, Pid}), - Channel +channel_cleanup(ChSupPid) -> + case get({ch_sup_pid, ChSupPid}) of + undefined -> undefined; + {{channel, Channel}, ChFr} -> erase({channel, Channel}), + erase(ChFr), + erase({ch_sup_pid, ChSupPid}), + Channel end. -all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. +all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid}, + {_Channel, {ch_fr_pid, ChFrPid}}} <- get()]. terminate_channels() -> - NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + NChannels = + length([rabbit_framing_channel:shutdown(ChFrPid) + || ChFrPid <- all_channels()]), if NChannels > 0 -> Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, TimerRef = erlang:send_after(Timeout, self(), cancel_wait), @@ -487,14 +489,15 @@ wait_for_channel_termination(0, TimerRef) -> wait_for_channel_termination(N, TimerRef) -> receive - {'EXIT', Pid, Reason} -> - case channel_cleanup(Pid) of + {'DOWN', _MRef, process, ChSupPid, Reason} -> + case channel_cleanup(ChSupPid) of undefined -> - exit({abnormal_dependent_exit, Pid, Reason}); + exit({abnormal_dependent_exit, ChSupPid, Reason}); Channel -> - case Reason of - normal -> ok; - _ -> + case termination_kind(Reason) of + controlled -> + ok; + uncontrolled -> rabbit_log:error( "connection ~p, channel ~p - " "error while terminating:~n~p~n", @@ -519,6 +522,11 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. +termination_kind(normal) -> controlled; +termination_kind(shutdown) -> controlled; +termination_kind({shutdown, _Term}) -> controlled; +termination_kind(_) -> uncontrolled. + handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, connection = #connection{protocol = Protocol}}) @@ -548,8 +556,8 @@ handle_frame(Type, Channel, Payload, AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of - {chpid, ChPid} -> - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + {ch_fr_pid, ChFrPid} -> + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame), case AnalyzedFrame of {method, 'channel.close', _} -> erase({channel, Channel}), @@ -732,7 +740,8 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, - sock = Sock}) -> + sock = Sock, + start_heartbeat_fun = SHF}) -> if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> rabbit_misc:protocol_error( not_allowed, "frame_max=~w < ~w min size", @@ -742,8 +751,7 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = rabbit_heartbeat:start_heartbeat( - Sock, ClientHeartbeat), + Heartbeater = SHF(Sock, ClientHeartbeat), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -765,9 +773,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, connection = NewConnection}), - rabbit_event:notify( - connection_created, - [{Item, i(Item, State1)} || Item <- ?CREATION_EVENT_KEYS]), + rabbit_event:notify(connection_created, + infos(?CREATION_EVENT_KEYS, State1)), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), @@ -849,21 +856,22 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> - #v1{sock = Sock, connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost, - protocol = Protocol}} = State, - {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), - {ok, ChPid} = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector], - Protocol), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). +send_to_new_channel(Channel, AnalyzedFrame, State) -> + #v1{sock = Sock, queue_collector = Collector, + channel_sup_sup_pid = ChanSupSup, + connection = #connection{protocol = Protocol, + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + {ok, ChSupPid, ChFrPid} = + rabbit_channel_sup_sup:start_channel( + ChanSupSup, {Protocol, Sock, Channel, FrameMax, + self(), Username, VHost, Collector}), + erlang:monitor(process, ChSupPid), + put({channel, Channel}, {ch_fr_pid, ChFrPid}), + put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}), + put({ch_fr_pid, ChFrPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", @@ -931,5 +939,4 @@ amqp_exception_explanation(Text, Expl) -> end. internal_emit_stats(State) -> - rabbit_event:notify(connection_stats, - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). + rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ec049a1a..bd57f737 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -33,9 +33,7 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([deliver/2, - match_bindings/2, - match_routing_key/2]). +-export([deliver/2, match_bindings/2, match_routing_key/2]). %%---------------------------------------------------------------------------- @@ -45,9 +43,15 @@ -type(routing_key() :: binary()). -type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(qpids() :: [pid()]). -spec(deliver/2 :: - ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}). + (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(match_bindings/2 :: (rabbit_exchange:name(), + fun ((rabbit_types:binding()) -> boolean())) -> + qpids()). +-spec(match_routing_key/2 :: (rabbit_exchange:name(), routing_key() | '_') -> + qpids()). -endif. @@ -81,10 +85,10 @@ deliver(QPids, Delivery) -> %% TODO: This causes a full scan for each entry with the same exchange match_bindings(Name, Match) -> Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, + exchange_name = XName, queue_name = QName}} <- mnesia:table(rabbit_route), - ExchangeName == Name, + XName == Name, Match(Binding)]), lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af..a72656b7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,9 +35,6 @@ -export([all_tests/0, test_parsing/0]). -%% Exported so the hook mechanism can call back --export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). - -import(lists). -include("rabbit.hrl"). @@ -55,6 +52,8 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), + passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), @@ -973,6 +972,8 @@ test_user_management() -> {error, {user_already_exists, _}} = control_action(add_user, ["foo", "bar"]), ok = control_action(change_password, ["foo", "baz"]), + ok = control_action(set_admin, ["foo"]), + ok = control_action(clear_admin, ["foo"]), ok = control_action(list_users, []), %% vhost creation @@ -1020,7 +1021,8 @@ test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - <<"user">>, <<"/">>, self()), + <<"user">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1037,7 +1039,15 @@ test_server_status() -> ok = info_action(list_exchanges, rabbit_exchange:info_keys(), true), %% list bindings - ok = control_action(list_bindings, []), + ok = info_action(list_bindings, rabbit_binding:info_keys(), true), + %% misc binding listing APIs + [_|_] = rabbit_binding:list_for_exchange( + rabbit_misc:r(<<"/">>, exchange, <<"">>)), + [_] = rabbit_binding:list_for_queue( + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + [_] = rabbit_binding:list_for_exchange_and_queue( + rabbit_misc:r(<<"/">>, exchange, <<"">>), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections [#listener{host = H, port = P} | _] = @@ -1061,67 +1071,23 @@ test_server_status() -> %% cleanup [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], + + unlink(Ch), ok = rabbit_channel:shutdown(Ch), passed. -test_hooks() -> - %% Firing of hooks calls all hooks in an isolated manner - rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), - rabbit_hooks:trigger(test_hook, [arg1, arg2]), - [arg1, arg2] = get(test_hook_test_fired), - [arg1, arg2] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Hook Deletion works - put(test_hook_test_fired, undefined), - put(test_hook_test2_fired, undefined), - rabbit_hooks:unsubscribe(test_hook, test), - rabbit_hooks:trigger(test_hook, [arg3, arg4]), - undefined = get(test_hook_test_fired), - [arg3, arg4] = get(test_hook_test2_fired), - undefined = get(test_hook2_test2_fired), - - %% Catches exceptions from bad hooks - rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), - ok = rabbit_hooks:trigger(test_hook3, []), - - %% Passing extra arguments to hooks - rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), - rabbit_hooks:trigger(arg_hook, [arg1, arg2]), - {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), - - %% Invoking Pids - Remote = fun () -> - receive - {rabbitmq_hook,[remote_test,test,[],Target]} -> - Target ! invoked - end - end, - P = spawn(Remote), - rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), - rabbit_hooks:trigger(remote_test, []), - receive - invoked -> ok - after 100 -> - io:format("Remote hook not invoked"), - throw(timeout) - end, - passed. - test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"guest">>, - <<"/">>, self()), + {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, + <<"guest">>, <<"/">>, self(), + fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), - MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, - {Writer, Ch, MRef}. + {Writer, Ch}. test_statistics_receiver(Pid) -> receive @@ -1160,7 +1126,7 @@ test_statistics() -> %% by far the most complex code though. %% Set up a channel and queue - {_Writer, Ch, _MRef} = test_spawn(fun test_statistics_receiver/1), + {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1), rabbit_channel:do(Ch, #'queue.declare'{}), QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 @@ -1404,17 +1370,35 @@ delete_log_handlers(Handlers) -> Handler <- Handlers], ok. -handle_hook(HookName, Handler, Args) -> - A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", - put(list_to_atom(A), Args). -bad_handle_hook(_, _, _) -> - exit(bad_handle_hook_called). -extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> - handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). - test_supervisor_delayed_restart() -> test_sup:test_supervisor_delayed_restart(). +test_file_handle_cache() -> + %% test copying when there is just one spare handle + Limit = file_handle_cache:get_limit(), + ok = file_handle_cache:set_limit(5), %% 1 or 2 sockets, 2 msg_stores + TmpDir = filename:join(rabbit_mnesia:dir(), "tmp"), + ok = filelib:ensure_dir(filename:join(TmpDir, "nothing")), + Pid = spawn(fun () -> {ok, Hdl} = file_handle_cache:open( + filename:join(TmpDir, "file3"), + [write], []), + receive close -> ok end, + file_handle_cache:delete(Hdl) + end), + Src = filename:join(TmpDir, "file1"), + Dst = filename:join(TmpDir, "file2"), + Content = <<"foo">>, + ok = file:write_file(Src, Content), + {ok, SrcHdl} = file_handle_cache:open(Src, [read], []), + {ok, DstHdl} = file_handle_cache:open(Dst, [write], []), + Size = size(Content), + {ok, Size} = file_handle_cache:copy(SrcHdl, DstHdl, Size), + ok = file_handle_cache:delete(SrcHdl), + file_handle_cache:delete(DstHdl), + Pid ! close, + ok = file_handle_cache:set_limit(Limit), + passed. + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> @@ -1486,7 +1470,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1549,7 +1533,7 @@ test_msg_store() -> ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), %% read the second half again, just for fun (aka code coverage) MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1574,7 +1558,7 @@ test_msg_store() -> {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9)), + msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 47e8bb01..0b6a15ec 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -38,7 +38,7 @@ -export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0, delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, - amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, + amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0]). @@ -107,8 +107,6 @@ kind :: Kind, name :: Name}). --type(ssl_socket() :: #ssl_socket{}). - -type(listener() :: #listener{node :: node(), protocol :: atom(), @@ -118,7 +116,8 @@ -type(binding() :: #binding{exchange_name :: rabbit_exchange:name(), queue_name :: rabbit_amqqueue:name(), - key :: rabbit_exchange:binding_key()}). + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: #amqqueue{name :: rabbit_amqqueue:name(), @@ -141,7 +140,8 @@ -type(user() :: #user{username :: rabbit_access_control:username(), - password :: rabbit_access_control:password()}). + password :: rabbit_access_control:password(), + is_admin :: boolean()}). -type(ok(A) :: {'ok', A}). -type(error(A) :: {'error', A}). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee8..30d3a8ae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -439,9 +439,10 @@ terminate(State) -> remove_pending_ack(true, tx_commit_index(State)), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_terminate(MSCStateP) + _ -> rabbit_msg_store:client_terminate( + MSCStateP, ?PERSISTENT_MSG_STORE) end, - rabbit_msg_store:client_terminate(MSCStateT), + rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -464,8 +465,7 @@ delete_and_terminate(State) -> case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef), - rabbit_msg_store:client_terminate(MSCStateP) + MSCStateP, ?PERSISTENT_MSG_STORE, PRef) end, rabbit_msg_store:client_delete_and_terminate( MSCStateT, ?TRANSIENT_MSG_STORE, TRef), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f90ee734..aa986e54 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/4, start_link/4, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, send_command_and_signal_back/3, - send_command_and_signal_back/4, send_command_and_notify/5]). +-export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([send_command/2, send_command/3, send_command_sync/2, + send_command_sync/3, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). @@ -48,24 +48,23 @@ -ifdef(use_specs). --spec(start/4 :: +-spec(start/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/4 :: +-spec(start_link/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(send_command_and_signal_back/3 :: - (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok'). --spec(send_command_and_signal_back/4 :: - (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid()) - -> 'ok'). +-spec(send_command_sync/2 :: + (pid(), rabbit_framing:amqp_method()) -> 'ok'). +-spec(send_command_sync/3 :: + (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -84,68 +83,61 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}])}. - -start_link(Sock, Channel, FrameMax, Protocol) -> - {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, +start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}])}. -mainloop(State) -> +start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + {ok, + proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. + +mainloop(ReaderPid, State) -> + try + mainloop1(ReaderPid, State) + catch + exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + end, + done. + +mainloop1(ReaderPid, State) -> receive - Message -> ?MODULE:mainloop(handle_message(Message, State)) + Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [State]) + erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) end. -handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), +handle_message({send_command, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), State; -handle_message({send_command, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), +handle_message({send_command, MethodRecord, Content}, State) -> + ok = internal_send_command_async(MethodRecord, Content, State), State; -handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> + ok = internal_send_command_async(MethodRecord, State), + gen_server:reply(From, ok), State; -handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), - Parent ! rabbit_writer_send_command_signal, +handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), + gen_server:reply(From, ok), State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, - State = #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax, Protocol), + State) -> + ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); -handle_message(shutdown, _State) -> - exit(normal); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). @@ -159,29 +151,28 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. -send_command_and_signal_back(W, MethodRecord, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Parent}, - ok. +send_command_sync(W, MethodRecord) -> + call(W, {send_command_sync, MethodRecord}). -send_command_and_signal_back(W, MethodRecord, Content, Parent) -> - W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, - ok. +send_command_sync(W, MethodRecord, Content) -> + call(W, {send_command_sync, MethodRecord, Content}). send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -shutdown(W) -> - W ! shutdown, - rabbit_misc:unlink_and_capture_exit(W), - ok. +%--------------------------------------------------------------------------- + +call(Pid, Msg) -> + {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), + Res. %--------------------------------------------------------------------------- assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, - Protocol). + rabbit_binary_generator:build_simple_method_frame( + Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), @@ -223,12 +214,18 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> +internal_send_command_async(MethodRecord, + #wstate{sock = Sock, + channel = Channel, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, - Protocol) -> +internal_send_command_async(MethodRecord, Content, + #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol)), ok. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 87883037..93adfcb1 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -34,7 +34,9 @@ %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits -%% normally, the supervisor itself also exits normally. +%% normally, the supervisor itself also exits normally. If the +%% child is a supervisor and it exits normally (i.e. with reason of +%% 'shutdown') then the child's parent also exits normally. %% %% All modifications are (C) 2010 Rabbit Technologies Ltd. %% @@ -63,7 +65,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, + which_children/1, find_child/2, check_childspecs/1]). -export([behaviour_info/1]). @@ -138,6 +140,10 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). +find_child(Supervisor, Name) -> + [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), + Name1 =:= Name]. + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). @@ -541,6 +547,9 @@ do_restart(permanent, Reason, Child, State) -> restart(Child, State); do_restart(intrinsic, normal, Child, State) -> {shutdown, state_del_child(Child, State)}; +do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor}, + State) -> + {shutdown, state_del_child(Child, State)}; do_restart(_, normal, Child, State) -> NState = state_del_child(Child, State), {ok, NState}; diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 11ce6fc5..c9809ace 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,7 +55,7 @@ handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(accept, State) -> - ok = file_handle_cache:obtain(self()), + ok = file_handle_cache:obtain(), accept(State); handle_cast(_Msg, State) -> @@ -84,7 +84,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% is drained. gen_event:which_handlers(error_logger), %% handle - file_handle_cache:obtain(apply(M, F, A ++ [Sock])) + file_handle_cache:transfer(apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain() catch {inet_error, Reason} -> gen_tcp:close(Sock), error_logger:error_msg("unable to accept TCP connection: ~p~n", @@ -93,11 +94,13 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, %% accept more accept(State); + handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we %% know this will fail. {stop, normal, State}; + handle_info(_Info, State) -> {noreply, State}. diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl index 1b785843..02d7e0e4 100644 --- a/src/tcp_client_sup.erl +++ b/src/tcp_client_sup.erl @@ -31,19 +31,19 @@ -module(tcp_client_sup). --behaviour(supervisor). +-behaviour(supervisor2). -export([start_link/1, start_link/2]). -export([init/1]). start_link(Callback) -> - supervisor:start_link(?MODULE, Callback). + supervisor2:start_link(?MODULE, Callback). start_link(SupName, Callback) -> - supervisor:start_link(SupName, ?MODULE, Callback). + supervisor2:start_link(SupName, ?MODULE, Callback). init({M,F,A}) -> - {ok, {{simple_one_for_one, 10, 10}, + {ok, {{simple_one_for_one_terminate, 10, 10}, [{tcp_client, {M,F,A}, - temporary, brutal_kill, worker, [M]}]}}. + temporary, infinity, supervisor, [M]}]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 42049d50..f461a539 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -38,7 +38,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, prioritise_cast/2]). %%---------------------------------------------------------------------------- @@ -71,7 +71,7 @@ submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). set_maximum_since_use(Pid, Age) -> - gen_server2:pcast(Pid, 8, {set_maximum_since_use, Age}). + gen_server2:cast(Pid, {set_maximum_since_use, Age}). run({M, F, A}) -> apply(M, F, A); @@ -88,6 +88,9 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; +prioritise_cast(_Msg, _State) -> 0. + handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), |