summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-12 22:08:12 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-12 22:08:12 +0100
commitb9963d98ea0778457e9f132d2ab1c17ae4d30fff (patch)
tree1a38845c7a9513663ac50db5efd4e4cfb9793447
parente537261021b2cf35b3e9f9f5f7f698688ded69bc (diff)
parente680875be3709a7b90f6f278e8308097e59ed268 (diff)
downloadrabbitmq-server-b9963d98ea0778457e9f132d2ab1c17ae4d30fff.tar.gz
merge default into bug24308
-rw-r--r--Makefile45
-rw-r--r--docs/rabbitmq-plugins.1.xml181
-rw-r--r--include/rabbit.hrl7
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/common/LICENSE.head5
-rw-r--r--[-rwxr-xr-x]packaging/common/LICENSE.tail (renamed from packaging/debs/Debian/debian/copyright)52
-rw-r--r--packaging/common/rabbitmq-script-wrapper6
-rw-r--r--packaging/debs/Debian/Makefile3
-rw-r--r--packaging/debs/Debian/debian/rules2
-rw-r--r--packaging/macports/Portfile.in20
-rw-r--r--packaging/windows/Makefile9
-rwxr-xr-xscripts/rabbitmq-plugins34
-rwxr-xr-xscripts/rabbitmq-plugins.bat51
-rwxr-xr-xscripts/rabbitmq-server4
-rwxr-xr-x[-rw-r--r--]scripts/rabbitmq-server.bat7
-rwxr-xr-x[-rw-r--r--]scripts/rabbitmq-service.bat7
-rw-r--r--src/rabbit_amqqueue_process.erl86
-rw-r--r--src/rabbit_channel.erl44
-rw-r--r--src/rabbit_control.erl19
-rw-r--r--src/rabbit_event.erl111
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_plugins.erl357
-rw-r--r--src/rabbit_prelaunch.erl37
-rw-r--r--src/rabbit_reader.erl67
-rw-r--r--src/rabbit_tests.erl93
-rw-r--r--src/rabbit_variable_queue.erl56
26 files changed, 1038 insertions, 277 deletions
diff --git a/Makefile b/Makefile
index 9c069093..146b6335 100644
--- a/Makefile
+++ b/Makefile
@@ -14,11 +14,11 @@ DOCS_DIR=docs
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
-TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
+TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) plugins
WEB_URL=http://www.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
-USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml
+USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
QC_MODULES := rabbit_backing_queue_qc
QC_TRIALS ?= 100
@@ -57,6 +57,8 @@ endif
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
VERSION=0.0.0
+PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
+PLUGINS_DIR=plugins
TARBALL_NAME=rabbitmq-server-$(VERSION)
TARGET_SRC_DIR=dist/$(TARBALL_NAME)
@@ -101,6 +103,19 @@ endif
all: $(TARGETS)
+.PHONY: plugins
+ifneq "$(PLUGINS_SRC_DIR)" ""
+plugins:
+ [ -d "$(PLUGINS_SRC_DIR)/rabbitmq-server" ] || ln -s "$(CURDIR)" "$(PLUGINS_SRC_DIR)/rabbitmq-server"
+ mkdir -p $(PLUGINS_DIR)
+ PLUGINS_SRC_DIR="" $(MAKE) -C "$(PLUGINS_SRC_DIR)" plugins-dist PLUGINS_DIST_DIR="$(CURDIR)/$(PLUGINS_DIR)" VERSION=$(VERSION)
+ echo "Put your EZs here and use rabbitmq-plugins to enable them." > $(PLUGINS_DIR)/README
+ rm -f $(PLUGINS_DIR)/rabbit_common*.ez
+else
+plugins:
+# Not building plugins
+endif
+
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
rm -f $@
echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR)
@@ -143,6 +158,8 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
+ rm -f $(PLUGINS_DIR)/*.ez
+ [ -d "$(PLUGINS_SRC_DIR)" ] && PLUGINS_SRC_DIR="" PRESERVE_CLONE_DIR=1 make -C $(PLUGINS_SRC_DIR) clean || true
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
@@ -244,7 +261,20 @@ srcdist: distclean
cp -r $(DOCS_DIR) $(TARGET_SRC_DIR)
chmod 0755 $(TARGET_SRC_DIR)/scripts/*
- (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME))
+ifneq "$(PLUGINS_SRC_DIR)" ""
+ cp -r $(PLUGINS_SRC_DIR) $(TARGET_SRC_DIR)/plugins-src
+ rm $(TARGET_SRC_DIR)/LICENSE
+ cat packaging/common/LICENSE.head >> $(TARGET_SRC_DIR)/LICENSE
+ cat $(AMQP_CODEGEN_DIR)/license_info >> $(TARGET_SRC_DIR)/LICENSE
+ find $(PLUGINS_SRC_DIR)/licensing -name "license_info_*" -exec cat '{}' >> $(TARGET_SRC_DIR)/LICENSE \;
+ cat packaging/common/LICENSE.tail >> $(TARGET_SRC_DIR)/LICENSE
+ find $(PLUGINS_SRC_DIR)/licensing -name "LICENSE-*" -exec cp '{}' $(TARGET_SRC_DIR) \;
+ rm -rf $(TARGET_SRC_DIR)/licensing
+else
+ @echo No plugins source distribution found
+endif
+
+ (cd dist; tar -zchf $(TARBALL_NAME).tar.gz $(TARBALL_NAME))
(cd dist; zip -q -r $(TARBALL_NAME).zip $(TARBALL_NAME))
rm -rf $(TARGET_SRC_DIR)
@@ -288,15 +318,16 @@ docs_all: $(MANPAGES) $(WEB_MANPAGES)
install: install_bin install_docs
install_bin: all install_dirs
- cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
+ cp -r ebin include LICENSE* INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
- for script in rabbitmq-env rabbitmq-server rabbitmqctl; do \
+ for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-plugins; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
- mkdir -p $(TARGET_DIR)/plugins
- echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README
+
+ mkdir -p $(TARGET_DIR)/$(PLUGINS_DIR)
+ [ -d "$(PLUGINS_DIR)" ] && cp $(PLUGINS_DIR)/*.ez $(PLUGINS_DIR)/README $(TARGET_DIR)/$(PLUGINS_DIR) || true
install_docs: docs_all install_dirs
for section in 1 5; do \
diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml
new file mode 100644
index 00000000..5d74c6e1
--- /dev/null
+++ b/docs/rabbitmq-plugins.1.xml
@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd">
+<!--
+ There is some extra magic in this document besides the usual DocBook semantics
+ to allow us to derive manpages, HTML and usage messages from the same source
+ document.
+
+ Examples need to be moved to the end for man pages. To this end, <para>s and
+ <screen>s with role="example" will be moved, and with role="example-prefix"
+ will be removed.
+
+ The usage messages are more involved. We have some magic in usage.xsl to pull
+ out the command synopsis, global option and subcommand synopses. We also pull
+ out <para>s with role="usage".
+
+ Finally we construct lists of possible values for subcommand options, if the
+ subcommand's <varlistentry> has role="usage-has-option-list". The option which
+ takes the values should be marked with role="usage-option-list".
+-->
+
+<refentry lang="en">
+ <refentryinfo>
+ <productname>RabbitMQ Server</productname>
+ <authorgroup>
+ <corpauthor>The RabbitMQ Team &lt;<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>&gt;</corpauthor>
+ </authorgroup>
+ </refentryinfo>
+
+ <refmeta>
+ <refentrytitle>rabbitmq-plugins</refentrytitle>
+ <manvolnum>1</manvolnum>
+ <refmiscinfo class="manual">RabbitMQ Service</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>rabbitmq-plugins</refname>
+ <refpurpose>command line tool for managing RabbitMQ broker plugins</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+ <cmdsynopsis>
+ <command>rabbitmq-plugins</command>
+ <arg choice="req"><replaceable>command</replaceable></arg>
+ <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
+ </cmdsynopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+ <para>
+ <command>rabbitmq-plugins</command> is a command line tool for managing
+ RabbitMQ broker plugins. It allows one to enable, disable and browse
+ plugins. It must be run by a user with write permissions to the RabbitMQ
+ configuration directory.
+ </para>
+ <para>
+ Some plugins depend on others to work
+ correctly. <command>rabbitmq-plugins</command> traverses these
+ dependencies and enables all required plugins. Plugins listed on
+ the <command>rabbitmq-plugins</command> command line are marked as
+ explicitly enabled; dependent plugins are marked as implicitly
+ enabled. Implicitly enabled plugins are automatically disabled again
+ when they are no longer required.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Commands</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><cmdsynopsis><command>list</command> <arg choice="opt">-v</arg> <arg choice="opt">-m</arg> <arg choice="opt">-E</arg> <arg choice="opt">-e</arg> <arg choice="opt"><replaceable>pattern</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>-v</term>
+ <listitem><para>Show all plugin details (verbose).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>-m</term>
+ <listitem><para>Show only plugin names (minimal).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>-E</term>
+ <listitem><para>Show only explicitly enabled
+ plugins.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>-e</term>
+ <listitem><para>Show only explicitly or implicitly
+ enabled plugins.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>pattern</term>
+ <listitem><para>Pattern to filter the plugin names by.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Lists available plugins, their versions, dependencies and
+ descriptions. Each plugin is prefixed with a status
+ indicator - [ ] to indicate that the plugin is not
+ enabled, [E] to indicate that it is explicitly enabled,
+ and [e] to indicate that it is implicitly enabled.
+ </para>
+ <para>
+ If the optional pattern is given, only plugins whose
+ name matches <command>pattern</command> are shown.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins list</screen>
+ <para role="example">
+ This command lists all the plugins available, on one line each.
+ </para>
+ <screen role="example">rabbitmq-plugins list -v </screen>
+ <para role="example">
+ This command lists all the plugins available.
+ </para>
+ <screen role="example">rabbitmq-plugins list -v management</screen>
+ <para role="example">
+ This command lists all the plugins available, but does not
+ display plugins whose name does not contain "management".
+ </para>
+ <screen role="example">rabbitmq-plugins list -e rabbit</screen>
+ <para role="example">
+ This command lists all implicitly or explicitly enabled
+ RabbitMQ plugins.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>One or more plugins to enable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Enables the specified plugins and all their
+ dependencies.
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins enable rabbitmq_shovel rabbitmq_management</screen>
+ <para role="example">
+ This command enables the <command>shovel</command> and
+ <command>management</command> plugins and all their
+ dependencies.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>plugin</term>
+ <listitem><para>One or more plugins to disable.</para></listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Disables the specified plugins and all plugins that
+ depend on them.
+ </para>
+
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmq-plugins disable amqp_client</screen>
+ <para role="example">
+ This command disables <command>amqp_client</command> and
+ all plugins that depend on it.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+
+ </refsect1>
+
+</refentry>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index ac6399c6..a603886c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -75,6 +75,13 @@
-record(message_properties, {expiry, needs_confirming = false}).
+-record(plugin, {name, %% atom()
+ version, %% string()
+ description, %% string()
+ type, %% 'ez' or 'dir'
+ dependencies, %% [{atom(), string()}]
+ location}). %% string()
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 0c5aa96a..fb27e9bd 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -55,6 +55,7 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-plugins
install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -114,7 +115,7 @@ done
%dir %{_sysconfdir}/rabbitmq
%{_initrddir}/rabbitmq-server
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
-%doc LICENSE LICENSE-MPL-RabbitMQ
+%doc LICENSE*
%clean
rm -rf %{buildroot}
diff --git a/packaging/common/LICENSE.head b/packaging/common/LICENSE.head
new file mode 100644
index 00000000..2b5a17ee
--- /dev/null
+++ b/packaging/common/LICENSE.head
@@ -0,0 +1,5 @@
+This package, the RabbitMQ server is licensed under the MPL.
+
+If you have any questions regarding licensing, please contact us at
+info@rabbitmq.com.
+
diff --git a/packaging/debs/Debian/debian/copyright b/packaging/common/LICENSE.tail
index 7206bb9b..5d842cc1 100755..100644
--- a/packaging/debs/Debian/debian/copyright
+++ b/packaging/common/LICENSE.tail
@@ -1,14 +1,7 @@
-This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on
-Wed, 3 Jan 2007 15:43:44 +0000.
-It was downloaded from http://www.rabbitmq.com/
+The MIT license is as follows:
-The files codegen/amqp-rabbitmq-0.8.json and
-codegen/amqp-rabbitmq-0.9.1.json are covered by the following terms:
-
- "Copyright (C) 2008-2011 VMware, Inc.
-
- Permission is hereby granted, free of charge, to any person
+ "Permission is hereby granted, free of charge, to any person
obtaining a copy of this file (the Software), to deal in the
Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute,
@@ -28,6 +21,37 @@ codegen/amqp-rabbitmq-0.9.1.json are covered by the following terms:
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE."
+
+The BSD 2-Clause license is as follows:
+
+ "Redistribution and use in source and binary forms, with or
+ without modification, are permitted provided that the
+ following conditions are met:
+
+ 1. Redistributions of source code must retain the above
+ copyright notice, this list of conditions and the following
+ disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials
+ provided with the distribution.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
+ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
+ INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
+ EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."
+
+
The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
@@ -490,13 +514,3 @@ EXHIBIT A -Mozilla Public License.
the notices in the Source Code files of the Original Code. You should
use the text of this Exhibit A rather than the text found in the
Original Code Source Code for Your Modifications.]
-
-
-
-
-
-If you have any questions regarding licensing, please contact us at
-info@rabbitmq.com.
-
-The Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed
-under the MPL 1.1, see above.
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 23d2a06c..0436f546 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -29,10 +29,10 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
-if [ `id -u` = 0 ] ; then
- @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
-elif [ `id -u` = `id -u rabbitmq` ] ; then
+if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
+elif [ `id -u` = 0 ] ; then
+ @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
else
/usr/lib/rabbitmq/bin/${SCRIPT}
echo
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 38c81134..8696427e 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -32,6 +32,9 @@ package: clean
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
+ echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
+ cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index a785b292..108b1ed5 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -14,7 +14,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/
install/rabbitmq-server::
mkdir -p $(DOCDIR)
rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL*
- for script in rabbitmqctl rabbitmq-server; do \
+ for script in rabbitmqctl rabbitmq-server rabbitmq-plugins; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 4a866305..03f087d9 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -83,24 +83,32 @@ post-destroot {
reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \
${realsbin}/rabbitmq-env
- foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} {
+ foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE ENABLED_PLUGINS_FILE} {
reinplace -E "s:^($var)=/:\\1=${prefix}/:" \
${realsbin}/rabbitmq-server \
- ${realsbin}/rabbitmqctl
+ ${realsbin}/rabbitmqctl \
+ ${realsbin}/rabbitmq-plugins
}
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-server
reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \
- ${wrappersbin}/rabbitmq-server
+ ${filespath}/rabbitmq-script-wrapper
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
- ${wrappersbin}/rabbitmq-server
+ ${filespath}/rabbitmq-script-wrapper
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
+ ${filespath}/rabbitmq-script-wrapper
+
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-server
- file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmqctl
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ ${wrappersbin}/rabbitmqctl
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ ${wrappersbin}/rabbitmq-plugins
+
- xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz \
+ xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz rabbitmq-plugins.1.gz \
${mandest}/man1/
xinstall -m 644 -W ${mansrc}/man5 rabbitmq-env.conf.5.gz ${mandest}/man5/
}
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index a0be8d89..828cf000 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -10,20 +10,23 @@ dist:
mkdir $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin
+ mv $(SOURCE_DIR)/scripts/rabbitmq-plugins.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
- rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
+ rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile $(SOURCE_DIR)/*mk
rm -f $(SOURCE_DIR)/README
rm -rf $(SOURCE_DIR)/docs
+ rm -rf $(SOURCE_DIR)/src
+ rm -rf $(SOURCE_DIR)/dist
mv $(SOURCE_DIR) $(TARGET_DIR)
mkdir -p $(TARGET_DIR)
- mkdir -p $(TARGET_DIR)/plugins
- echo Put your .ez plugin files in this directory > $(TARGET_DIR)/plugins/README
+ mv $(TARGET_DIR)/plugins/README $(TARGET_DIR)/plugins/README.txt
xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml
elinks -dump -no-references -no-numbering rabbitmq-service.html \
> $(TARGET_DIR)/readme-service.txt
todos $(TARGET_DIR)/readme-service.txt
+ rm -rf $(TARGET_DIR)/plugins-src
zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR)
rm -rf $(TARGET_DIR) rabbitmq-service.html
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
new file mode 100755
index 00000000..4c6cb1fa
--- /dev/null
+++ b/scripts/rabbitmq-plugins
@@ -0,0 +1,34 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License
+## at http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+## the License for the specific language governing rights and
+## limitations under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developer of the Original Code is VMware, Inc.
+## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+##
+
+. `dirname $0`/rabbitmq-env
+
+ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
+
+[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
+
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+
+exec erl \
+ -pa "${RABBITMQ_HOME}/ebin" \
+ -noinput \
+ -hidden \
+ -sname rabbitmq-plugins$$ \
+ -s rabbit_plugins \
+ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
+ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
new file mode 100755
index 00000000..ca874a7f
--- /dev/null
+++ b/scripts/rabbitmq-plugins.bat
@@ -0,0 +1,51 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License
+REM at http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+REM the License for the specific language governing rights and
+REM limitations under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developer of the Original Code is VMware, Inc.
+REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM
+
+setlocal
+
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
+set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+
+endlocal
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index deca5b30..11cc7215 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -21,6 +21,7 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
SERVER_START_ARGS=
+ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
. `dirname $0`/rabbitmq-env
@@ -52,6 +53,7 @@ fi
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
+[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
## Log rotation
@@ -74,7 +76,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
-hidden \
-s rabbit_prelaunch \
-sname rabbitmqprelaunch$$ \
- -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
+ -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
RABBITMQ_EBIN_PATH=""
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 56bed435..0a78794f 100644..100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -84,6 +84,10 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" (
set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
)
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -92,7 +96,8 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-noinput -hidden ^
-s rabbit_prelaunch ^
-sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+ "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 26c6ea65..e671ba7a 100644..100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -153,6 +153,10 @@ if errorlevel 1 (
echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -160,7 +164,8 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+ "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
""
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe1ddba0..46f6674b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -118,19 +118,19 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ State = #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = dict:new()},
+ {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
- State = requeue_and_run(
- AckTags,
- process_args(
- #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = MTC})),
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = MTC},
+ State1 = requeue_and_run(AckTags, process_args(
+ rabbit_event:init_stats_timer(
+ State, #q.stats_timer))),
lists:foldl(
fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
- State, Deliveries).
+ State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From,
- State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
- stats_timer = StatsTimer}) ->
+declare(Recover, From, State = #q{q = Q,
+ backing_queue = BQ,
+ backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -199,7 +198,7 @@ declare(Recover, From,
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(StatsTimer,
+ rabbit_event:if_enabled(State1, #q.stats_timer,
fun() -> emit_stats(State1) end),
noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
@@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
-ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = #amqqueue{pid = QPid}}) ->
- State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, QPid, emit_stats)}.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) ->
handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer),
assert_invariant(State1),
{noreply, State1, hibernate};
@@ -1167,18 +1164,17 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- stats_timer = StatsTimer}) ->
+ backing_queue_state = BQS}) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
- StatsTimer,
+ State, #q.stats_timer,
fun () -> emit_stats(State, [{idle_since, now()}]) end),
- State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
+ #q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bcffe2af..883e570a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -173,7 +173,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
- StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -194,7 +193,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
blocking = sets:new(),
queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
@@ -202,10 +200,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
- rabbit_event:if_enabled(StatsTimer,
- fun() -> emit_stats(State) end),
- {ok, State, hibernate,
+ State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _State) ->
@@ -319,10 +318,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State) ->
emit_stats(State),
noreply([ensure_stats_timer],
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+ rabbit_event:reset_stats_timer(State, #ch.stats_timer));
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -335,12 +334,12 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
rabbit_event:if_enabled(
- StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end),
- StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State#ch{stats_timer = StatsTimer1}}.
+ State, #ch.stats_timer,
+ fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
terminate(Reason, State) ->
{Res, _State1} = notify_queues(State),
@@ -385,9 +384,8 @@ next_state(Mask, State) ->
State2 = ?MASKED_CALL(send_confirms, Mask, State1),
State2.
-ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, self(), emit_stats)}.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -1163,11 +1161,11 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
false -> State
end.
-queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
- queue_consumers = QCons,
+queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
blocking = Blocking,
- unconfirmed_qm = UQM}) ->
- StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ unconfirmed_qm = UQM} = State) ->
+ StatsEnabled = rabbit_event:stats_level(
+ State, #ch.stats_timer) =:= fine,
ConsumerMonitored = dict:is_key(QPid, QCons),
QueueBlocked = sets:is_element(QPid, Blocking),
ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
@@ -1511,8 +1509,8 @@ maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_redeliver_stats(_, _, State) ->
State.
-maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
- case rabbit_event:stats_level(StatsTimer) of
+maybe_incr_stats(QXIncs, Measure, State) ->
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
fine -> lists:foldl(fun ({QX, Inc}, State0) ->
incr_stats(QX, Inc, Measure, State0)
end, State, QXIncs);
@@ -1544,9 +1542,9 @@ update_measures(Type, QX, Inc, Measure) ->
emit_stats(State) ->
emit_stats(State, []).
-emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
+emit_stats(State, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
- case rabbit_event:stats_level(StatsTimer) of
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
coarse ->
rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
fine ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index e9f0cf6c..905e4fd0 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -86,7 +86,7 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- quit(0);
+ rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
[string:join([atom_to_list(Command) | Args], " ")]),
@@ -96,17 +96,17 @@ start() ->
usage();
{error, Reason} ->
print_error("~p", [Reason]),
- quit(2);
+ rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
- quit(2);
+ rabbit_misc:quit(2);
{badrpc, Reason} ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
- quit(2);
+ rabbit_misc:quit(2);
Other ->
print_error("~p", [Other]),
- quit(2)
+ rabbit_misc:quit(2)
end.
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
@@ -157,7 +157,7 @@ stop() ->
usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
- quit(1).
+ rabbit_misc:quit(1).
%%----------------------------------------------------------------------------
@@ -514,10 +514,3 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
{T, V} <- Value];
prettify_typed_amqp_value(_Type, Value) -> Value.
-
-%% the slower shutdown on windows required to flush stdout
-quit(Status) ->
- case os:type() of
- {unix, _} -> halt(Status);
- {win32, _} -> init:stop(Status)
- end.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index bb765566..5ae40c78 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,9 +19,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]).
--export([reset_stats_timer/1]).
--export([stats_level/1, if_enabled/2]).
+-export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([reset_stats_timer/2]).
+-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -39,29 +39,23 @@
-type(event_timestamp() ::
{non_neg_integer(), non_neg_integer(), non_neg_integer()}).
--type(event() :: #event {
- type :: event_type(),
- props :: event_props(),
- timestamp :: event_timestamp()
- }).
+-type(event() :: #event { type :: event_type(),
+ props :: event_props(),
+ timestamp :: event_timestamp() }).
-type(level() :: 'none' | 'coarse' | 'fine').
--opaque(state() :: #state {
- level :: level(),
- interval :: integer(),
- timer :: atom()
- }).
-
-type(timer_fun() :: fun (() -> 'ok')).
+-type(container() :: tuple()).
+-type(pos() :: non_neg_integer()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()).
--spec(stop_stats_timer/1 :: (state()) -> state()).
--spec(reset_stats_timer/1 :: (state()) -> state()).
--spec(stats_level/1 :: (state()) -> level()).
--spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
+-spec(init_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()).
+-spec(stop_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(reset_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(stats_level/2 :: (container(), pos()) -> level()).
+-spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
@@ -75,58 +69,69 @@ start_link() ->
%% The idea is, for each stat-emitting object:
%%
%% On startup:
-%% Timer = init_stats_timer()
+%% init_stats_timer(State)
%% notify(created event)
%% if_enabled(internal_emit_stats) - so we immediately send something
%%
%% On wakeup:
-%% ensure_stats_timer(Timer, Pid, emit_stats)
+%% ensure_stats_timer(State, emit_stats)
%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
%%
%% emit_stats:
%% if_enabled(internal_emit_stats)
-%% reset_stats_timer(Timer) - just bookkeeping
+%% reset_stats_timer(State) - just bookkeeping
%%
%% Pre-hibernation:
%% if_enabled(internal_emit_stats)
-%% stop_stats_timer(Timer)
+%% stop_stats_timer(State)
%%
%% internal_emit_stats:
%% notify(stats)
-init_stats_timer() ->
+init_stats_timer(C, P) ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
- #state{level = StatsLevel, interval = Interval, timer = undefined}.
-
-ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) ->
- State;
-ensure_stats_timer(State = #state{interval = Interval,
- timer = undefined}, Pid, Msg) ->
- TRef = erlang:send_after(Interval, Pid, Msg),
- State#state{timer = TRef};
-ensure_stats_timer(State, _Pid, _Msg) ->
- State.
-
-stop_stats_timer(State = #state{level = none}) ->
- State;
-stop_stats_timer(State = #state{timer = undefined}) ->
- State;
-stop_stats_timer(State = #state{timer = TRef}) ->
- erlang:cancel_timer(TRef),
- State#state{timer = undefined}.
-
-reset_stats_timer(State) ->
- State#state{timer = undefined}.
-
-stats_level(#state{level = Level}) ->
+ setelement(P, C, #state{level = StatsLevel, interval = Interval,
+ timer = undefined}).
+
+ensure_stats_timer(C, P, Msg) ->
+ case element(P, C) of
+ #state{level = Level, interval = Interval, timer = undefined} = State
+ when Level =/= none ->
+ TRef = erlang:send_after(Interval, self(), Msg),
+ setelement(P, C, State#state{timer = TRef});
+ #state{} ->
+ C
+ end.
+
+stop_stats_timer(C, P) ->
+ case element(P, C) of
+ #state{level = Level, timer = TRef} = State
+ when Level =/= none andalso TRef =/= undefined ->
+ erlang:cancel_timer(TRef),
+ setelement(P, C, State#state{timer = undefined});
+ #state{} ->
+ C
+ end.
+
+reset_stats_timer(C, P) ->
+ case element(P, C) of
+ #state{timer = TRef} = State
+ when TRef =/= undefined ->
+ setelement(P, C, State#state{timer = undefined});
+ #state{} ->
+ C
+ end.
+
+stats_level(C, P) ->
+ #state{level = Level} = element(P, C),
Level.
-if_enabled(#state{level = none}, _Fun) ->
- ok;
-if_enabled(_State, Fun) ->
- Fun(),
- ok.
+if_enabled(C, P, Fun) ->
+ case element(P, C) of
+ #state{level = none} -> ok;
+ #state{} -> Fun(), ok
+ end.
notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 13a553f1..a9b15af8 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,6 +55,7 @@
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
+-export([quit/1]).
%%----------------------------------------------------------------------------
@@ -197,6 +198,7 @@
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
+-spec(quit/1 :: (integer() | string()) -> no_return()).
-endif.
@@ -842,3 +844,10 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
{badrpc, _} -> [];
_ -> Res
end || Res <- ResL]).
+
+%% the slower shutdown on windows required to flush stdout
+quit(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status)
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
new file mode 100644
index 00000000..27dadfc5
--- /dev/null
+++ b/src/rabbit_plugins.erl
@@ -0,0 +1,357 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_plugins).
+-include("rabbit.hrl").
+
+-export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1,
+ lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]).
+
+-define(VERBOSE_OPT, "-v").
+-define(MINIMAL_OPT, "-m").
+-define(ENABLED_OPT, "-E").
+-define(ENABLED_ALL_OPT, "-e").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]).
+-spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]).
+-spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]).
+-spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]).
+-spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ {ok, [[PluginsFile|_]|_]} =
+ init:get_argument(enabled_plugins_file),
+ {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
+ {[Command0 | Args], Opts} =
+ case rabbit_misc:get_options([{flag, ?VERBOSE_OPT},
+ {flag, ?MINIMAL_OPT},
+ {flag, ?ENABLED_OPT},
+ {flag, ?ENABLED_ALL_OPT}],
+ init:get_plain_arguments()) of
+ {[], _Opts} -> usage();
+ CmdArgsAndOpts -> CmdArgsAndOpts
+ end,
+ Command = list_to_atom(Command0),
+
+ case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ ok ->
+ rabbit_misc:quit(0);
+ {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")]),
+ usage();
+ {error, Reason} ->
+ print_error("~p", [Reason]),
+ rabbit_misc:quit(2);
+ Other ->
+ print_error("~s", [Other]),
+ rabbit_misc:quit(2)
+ end.
+
+stop() ->
+ ok.
+
+print_error(Format, Args) ->
+ rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+
+usage() ->
+ io:format("~s", [rabbit_plugins_usage:usage()]),
+ rabbit_misc:quit(1).
+
+%%----------------------------------------------------------------------------
+
+action(list, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+ format_plugins(Pat, Opts, PluginsFile, PluginsDir);
+
+action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToEnable0 of
+ [] -> throw("Not enough arguments for 'enable'");
+ _ -> ok
+ end,
+ AllPlugins = find_plugins(PluginsDir),
+ Enabled = read_enabled_plugins(PluginsFile),
+ ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins),
+ ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
+ Missing = ToEnable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> throw(fmt_list("The following plugins could not be found:",
+ Missing))
+ end,
+ NewEnabled = lists:usort(Enabled ++ ToEnable),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ case NewEnabled -- ImplicitlyEnabled of
+ [] -> io:format("Plugin configuration unchanged.~n");
+ _ -> NewImplicitlyEnabled =
+ calculate_required_plugins(NewEnabled, AllPlugins),
+ print_list("The following plugins have been enabled:",
+ NewImplicitlyEnabled -- ImplicitlyEnabled),
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n")
+ end;
+
+action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+ case ToDisable0 of
+ [] -> throw("Not enough arguments for 'disable'");
+ _ -> ok
+ end,
+ ToDisable = [list_to_atom(Name) || Name <- ToDisable0],
+ Enabled = read_enabled_plugins(PluginsFile),
+ AllPlugins = find_plugins(PluginsDir),
+ Missing = ToDisable -- plugin_names(AllPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> print_list("Warning: the following plugins could not be found:",
+ Missing)
+ end,
+ ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins),
+ NewEnabled = Enabled -- ToDisableDeps,
+ case length(Enabled) =:= length(NewEnabled) of
+ true -> io:format("Plugin configuration unchanged.~n");
+ false -> ImplicitlyEnabled =
+ calculate_required_plugins(Enabled, AllPlugins),
+ NewImplicitlyEnabled =
+ calculate_required_plugins(NewEnabled, AllPlugins),
+ print_list("The following plugins have been disabled:",
+ ImplicitlyEnabled -- NewImplicitlyEnabled),
+ write_enabled_plugins(PluginsFile, NewEnabled),
+ io:format("Plugin configuration has changed. "
+ "Restart RabbitMQ for changes to take effect.~n")
+ end.
+
+%%----------------------------------------------------------------------------
+
+%% Get the #plugin{}s ready to be enabled.
+find_plugins(PluginsDir) ->
+ EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
+ FreeApps = [{app, App} ||
+ App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
+ {Plugins, Problems} =
+ lists:foldl(fun ({error, EZ, Reason}, {Plugins1, Problems1}) ->
+ {Plugins1, [{EZ, Reason} | Problems1]};
+ (Plugin = #plugin{}, {Plugins1, Problems1}) ->
+ {[Plugin|Plugins1], Problems1}
+ end, {[], []},
+ [get_plugin_info(PluginsDir, Plug) ||
+ Plug <- EZs ++ FreeApps]),
+ case Problems of
+ [] -> ok;
+ _ -> io:format("Warning: Problem reading some plugins: ~p~n",
+ [Problems])
+ end,
+ Plugins.
+
+%% Get the #plugin{} from an .ez.
+get_plugin_info(Base, {ez, EZ0}) ->
+ EZ = filename:join([Base, EZ0]),
+ case read_app_file(EZ) of
+ {application, Name, Props} -> mkplugin(Name, Props, ez, EZ);
+ {error, Reason} -> {error, EZ, Reason}
+ end;
+%% Get the #plugin{} from an .app.
+get_plugin_info(Base, {app, App0}) ->
+ App = filename:join([Base, App0]),
+ case rabbit_file:read_term_file(App) of
+ {ok, [{application, Name, Props}]} ->
+ mkplugin(Name, Props, dir,
+ filename:absname(
+ filename:dirname(filename:dirname(App))));
+ {error, Reason} ->
+ {error, App, {invalid_app, Reason}}
+ end.
+
+mkplugin(Name, Props, Type, Location) ->
+ Version = proplists:get_value(vsn, Props, "0"),
+ Description = proplists:get_value(description, Props, ""),
+ Dependencies =
+ filter_applications(proplists:get_value(applications, Props, [])),
+ #plugin{name = Name, version = Version, description = Description,
+ dependencies = Dependencies, location = Location, type = Type}.
+
+%% Read the .app file from an ez.
+read_app_file(EZ) ->
+ case zip:list_dir(EZ) of
+ {ok, [_|ZippedFiles]} ->
+ case find_app_files(ZippedFiles) of
+ [AppPath|_] ->
+ {ok, [{AppPath, AppFile}]} =
+ zip:extract(EZ, [{file_list, [AppPath]}, memory]),
+ parse_binary(AppFile);
+ [] ->
+ {error, no_app_file}
+ end;
+ {error, Reason} ->
+ {error, {invalid_ez, Reason}}
+ end.
+
+%% Return the path of the .app files in ebin/.
+find_app_files(ZippedFiles) ->
+ {ok, RE} = re:compile("^.*/ebin/.*.app$"),
+ [Path || {zip_file, Path, _, _, _, _} <- ZippedFiles,
+ re:run(Path, RE, [{capture, none}]) =:= match].
+
+%% Parse a binary into a term.
+parse_binary(Bin) ->
+ try
+ {ok, Ts, _} = erl_scan:string(binary_to_list(Bin)),
+ {ok, Term} = erl_parse:parse_term(Ts),
+ Term
+ catch
+ Err -> {error, {invalid_app, Err}}
+ end.
+
+%% Pretty print a list of plugins.
+format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
+ Verbose = proplists:get_bool(?VERBOSE_OPT, Opts),
+ Minimal = proplists:get_bool(?MINIMAL_OPT, Opts),
+ Format = case {Verbose, Minimal} of
+ {false, false} -> normal;
+ {true, false} -> verbose;
+ {false, true} -> minimal;
+ {true, true} -> throw("Cannot specify -m and -v together")
+ end,
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
+
+ AvailablePlugins = find_plugins(PluginsDir),
+ EnabledExplicitly = read_enabled_plugins(PluginsFile),
+ EnabledImplicitly =
+ calculate_required_plugins(EnabledExplicitly, AvailablePlugins) --
+ EnabledExplicitly,
+ {ok, RE} = re:compile(Pattern),
+ Plugins = [ Plugin ||
+ Plugin = #plugin{name = Name} <- AvailablePlugins,
+ re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ true -> true
+ end,
+ if OnlyEnabledAll ->
+ lists:member(Name, EnabledImplicitly) or
+ lists:member(Name, EnabledExplicitly);
+ true ->
+ true
+ end],
+ Plugins1 = usort_plugins(Plugins),
+ MaxWidth = lists:max([length(atom_to_list(Name)) ||
+ #plugin{name = Name} <- Plugins1] ++ [0]),
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
+ MaxWidth) || P <- Plugins1],
+ ok.
+
+format_plugin(#plugin{name = Name, version = Version,
+ description = Description, dependencies = Deps},
+ EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ Glyph = case {lists:member(Name, EnabledExplicitly),
+ lists:member(Name, EnabledImplicitly)} of
+ {true, false} -> "[E]";
+ {false, true} -> "[e]";
+ _ -> "[ ]"
+ end,
+ case Format of
+ minimal -> io:format("~s~n", [Name]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
+ "w ~s~n", [Glyph, Name, Version]);
+ verbose -> io:format("~s ~w~n", [Glyph, Name]),
+ io:format(" Version: \t~s~n", [Version]),
+ case Deps of
+ [] -> ok;
+ _ -> io:format(" Dependencies:\t~p~n", [Deps])
+ end,
+ io:format(" Description:\t~s~n", [Description]),
+ io:format("~n")
+ end.
+
+print_list(Header, Plugins) ->
+ io:format(fmt_list(Header, Plugins)).
+
+fmt_list(Header, Plugins) ->
+ lists:flatten(
+ [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins], $\n]).
+
+usort_plugins(Plugins) ->
+ lists:usort(fun plugins_cmp/2, Plugins).
+
+plugins_cmp(#plugin{name = N1, version = V1},
+ #plugin{name = N2, version = V2}) ->
+ {N1, V1} =< {N2, V2}.
+
+%% Filter out applications that can be loaded *right now*.
+filter_applications(Applications) ->
+ [Application || Application <- Applications,
+ not is_available_app(Application)].
+
+%% Return whether is application is already available (and hence
+%% doesn't need enabling).
+is_available_app(Application) ->
+ case application:load(Application) of
+ {error, {already_loaded, _}} -> true;
+ ok -> application:unload(Application),
+ true;
+ _ -> false
+ end.
+
+%% Return the names of the given plugins.
+plugin_names(Plugins) ->
+ [Name || #plugin{name = Name} <- Plugins].
+
+%% Find plugins by name in a list of plugins.
+lookup_plugins(Names, AllPlugins) ->
+ [P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)].
+
+%% Read the enabled plugin names from disk.
+read_enabled_plugins(PluginsFile) ->
+ case rabbit_file:read_term_file(PluginsFile) of
+ {ok, [Plugins]} -> Plugins;
+ {error, enoent} -> [];
+ {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+%% Write the enabled plugin names on disk.
+write_enabled_plugins(PluginsFile, Plugins) ->
+ case rabbit_file:write_term_file(PluginsFile, [Plugins]) of
+ ok -> ok;
+ {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file,
+ PluginsFile, Reason}})
+ end.
+
+calculate_required_plugins(Sources, AllPlugins) ->
+ calculate_dependencies(false, Sources, AllPlugins).
+
+calculate_dependencies(Reverse, Sources, AllPlugins) ->
+ {ok, G} = rabbit_misc:build_acyclic_graph(
+ fun (App, _Deps) -> [{App, App}] end,
+ fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
+ [{Name, Deps}
+ || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ Dests = case Reverse of
+ false -> digraph_utils:reachable(Sources, G);
+ true -> digraph_utils:reaching(Sources, G)
+ end,
+ true = digraph:delete(G),
+ Dests.
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index cd0c322b..d34ed44a 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -18,6 +18,8 @@
-export([start/0, stop/0]).
+-include("rabbit.hrl").
+
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
@@ -41,14 +43,14 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(),
+ [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] =
+ init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
- %% Unpack any .ez plugins
- unpack_ez_plugins(PluginDir, UnpackedPluginDir),
+ prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir),
%% Build a list of required apps based on the fixed set, and any plugins
- PluginApps = find_plugins(PluginDir) ++ find_plugins(UnpackedPluginDir),
+ PluginApps = find_plugins(UnpackedPluginDir),
RequiredApps = ?BaseApps ++ PluginApps,
%% Build the entire set of dependencies - this will load the
@@ -145,7 +147,19 @@ delete_recursively(Fn) ->
Error -> Error
end.
-unpack_ez_plugins(SrcDir, DestDir) ->
+prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) ->
+ AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir),
+ Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile),
+ ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins),
+ ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins),
+
+ Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins),
+ case Missing of
+ [] -> ok;
+ _ -> io:format("Warning: the following enabled plugins were "
+ "not found: ~p~n", [Missing])
+ end,
+
%% Eliminate the contents of the destination directory
case delete_recursively(DestDir) of
ok -> ok;
@@ -155,12 +169,15 @@ unpack_ez_plugins(SrcDir, DestDir) ->
ok -> ok;
{error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
end,
- [unpack_ez_plugin(PluginName, DestDir) ||
- PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")].
-unpack_ez_plugin(PluginFn, PluginDestDir) ->
- zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
- ok.
+ [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins].
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
find_plugins(PluginDir) ->
[prepare_dir_plugin(PluginName) ||
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b4871cef..b359f7d4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -199,34 +199,32 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
+ State = #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none,
+ capabilities = []},
+ callback = uninitialized_callback,
+ recv_len = 0,
+ pending_recv = false,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun,
+ buf = [],
+ buf_len = 0,
+ auth_mechanism = none,
+ auth_state = none},
try
- recvloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- protocol = none,
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- capabilities = []},
- callback = uninitialized_callback,
- recv_len = 0,
- pending_recv = false,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer(),
- channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun,
- buf = [],
- buf_len = 0,
- auth_mechanism = none,
- auth_state = none
- },
- handshake, 8))
+ recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -605,10 +603,8 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
- connection_state = running}) ->
- State#v1{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, self(), emit_stats)};
+ensure_stats_timer(State = #v1{connection_state = running}) ->
+ rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.
@@ -695,8 +691,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock,
- stats_timer = StatsTimer}) ->
+ sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -707,7 +702,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(StatsTimer,
+ rabbit_event:if_enabled(State1, #v1.stats_timer,
fun() -> emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
@@ -937,6 +932,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
State1#v1.sock, 0, CloseMethod, Protocol),
State1.
-emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
- State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
+ rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 39f67ced..5e034ae7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
on_disk_capture() ->
- on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
-on_disk_capture({OnDisk, Awaiting, Pid}) ->
- Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of
- true -> Pid ! {self(), arrived}, undefined;
- false -> Pid
- end,
receive
- {await, MsgIds, Pid2} ->
- true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting),
- on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2});
- {on_disk, MsgIds} ->
- on_disk_capture({gb_sets:union(OnDisk, MsgIds),
- gb_sets:subtract(Awaiting, MsgIds),
- Pid1});
+ {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid);
+ stop -> done
+ end.
+
+on_disk_capture(OnDisk, Awaiting, Pid) ->
+ receive
+ {on_disk, MsgIdsS} ->
+ MsgIds = gb_sets:to_list(MsgIdsS),
+ on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
+ Pid);
stop ->
done
+ after 100 ->
+ case {OnDisk, Awaiting} of
+ {[], []} -> Pid ! {self(), arrived}, on_disk_capture();
+ {_, []} -> Pid ! {self(), surplus};
+ {[], _} -> Pid ! {self(), timeout};
+ {_, _} -> Pid ! {self(), surplus_timeout}
+ end
end.
on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
- Pid ! {await, gb_sets:from_list(MsgIds), self()},
- receive {Pid, arrived} -> ok end.
+ Pid ! {await, MsgIds, self()},
+ receive
+ {Pid, arrived} -> ok;
+ {Pid, Error} -> Error
+ end.
on_disk_stop(Pid) ->
MRef = erlang:monitor(process, Pid),
@@ -1915,9 +1922,15 @@ test_msg_store() ->
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
Ref = rabbit_guid:guid(),
- {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref),
+ {Cap, MSCState} = msg_store_client_init_capture(
+ ?PERSISTENT_MSG_STORE, Ref),
+ Ref2 = rabbit_guid:guid(),
+ {Cap2, MSC2State} = msg_store_client_init_capture(
+ ?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
+ %% test confirm logic
+ passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -1926,20 +1939,25 @@ test_msg_store() ->
ok = msg_store_write(MsgIds2ndHalf, MSCState),
%% check they're all in there
true = msg_store_contains(true, MsgIds, MSCState),
- %% publish the latter half twice so we hit the caching and ref count code
- ok = msg_store_write(MsgIds2ndHalf, MSCState),
+ %% publish the latter half twice so we hit the caching and ref
+ %% count code. We need to do this through a 2nd client since a
+ %% single client is not supposed to write the same message more
+ %% than once without first removing it.
+ ok = msg_store_write(MsgIds2ndHalf, MSC2State),
%% check they're still all in there
true = msg_store_contains(true, MsgIds, MSCState),
- %% sync on the 2nd half, but do lots of individual syncs to try
- %% and cause coalescing to happen
- ok = on_disk_await(Cap, MsgIds2ndHalf),
+ %% sync on the 2nd half
+ ok = on_disk_await(Cap2, MsgIds2ndHalf),
+ %% cleanup
+ ok = on_disk_stop(Cap2),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSC2State),
ok = on_disk_stop(Cap),
%% read them all
MSCState1 = msg_store_read(MsgIds, MSCState),
%% read them all again - this will hit the cache, not disk
MSCState2 = msg_store_read(MsgIds, MSCState1),
%% remove them all
- ok = rabbit_msg_store:remove(MsgIds, MSCState2),
+ ok = msg_store_remove(MsgIds, MSCState2),
%% check first half doesn't exist
false = msg_store_contains(false, MsgIds1stHalf, MSCState2),
%% check second half does exist
@@ -1977,7 +1995,7 @@ test_msg_store() ->
ok = rabbit_msg_store:client_terminate(
msg_store_read(MsgIds1stHalf, MSCState6)),
MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
- ok = rabbit_msg_store:remove(MsgIds1stHalf, MSCState7),
+ ok = msg_store_remove(MsgIds1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse msg_ids
@@ -2024,6 +2042,35 @@ test_msg_store() ->
restart_msg_store_empty(),
passed.
+%% We want to test that writes that get eliminated due to removes still
+%% get confirmed. Removes themselves do not.
+test_msg_store_confirms(MsgIds, Cap, MSCState) ->
+ %% write -> confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% remove -> _
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, []),
+ %% write, remove -> confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% write, remove, write -> confirmed, confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds ++ MsgIds),
+ %% remove, write -> confirmed
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% remove, write, remove -> confirmed
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ passed.
+
queue_name(Name) ->
rabbit_misc:r(<<"/">>, queue, Name).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 94c0913d..b853d983 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -494,9 +494,31 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, _ChPid, State) ->
- {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
- a(reduce_memory_use(State1)).
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = case bpqueue:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
+ end,
+ PCount1 = PCount + one_if(IsPersistent1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = UC1 })).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
@@ -1117,34 +1139,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, MsgOnDisk,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case bpqueue:is_empty(Q3) of
- false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
- end,
- PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 }}.
-
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;