diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-12 22:08:12 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-12 22:08:12 +0100 |
commit | b9963d98ea0778457e9f132d2ab1c17ae4d30fff (patch) | |
tree | 1a38845c7a9513663ac50db5efd4e4cfb9793447 | |
parent | e537261021b2cf35b3e9f9f5f7f698688ded69bc (diff) | |
parent | e680875be3709a7b90f6f278e8308097e59ed268 (diff) | |
download | rabbitmq-server-b9963d98ea0778457e9f132d2ab1c17ae4d30fff.tar.gz |
merge default into bug24308
26 files changed, 1038 insertions, 277 deletions
@@ -14,11 +14,11 @@ DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) +TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) plugins WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) -USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) QC_MODULES := rabbit_backing_queue_qc QC_TRIALS ?= 100 @@ -57,6 +57,8 @@ endif ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) VERSION=0.0.0 +PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) +PLUGINS_DIR=plugins TARBALL_NAME=rabbitmq-server-$(VERSION) TARGET_SRC_DIR=dist/$(TARBALL_NAME) @@ -101,6 +103,19 @@ endif all: $(TARGETS) +.PHONY: plugins +ifneq "$(PLUGINS_SRC_DIR)" "" +plugins: + [ -d "$(PLUGINS_SRC_DIR)/rabbitmq-server" ] || ln -s "$(CURDIR)" "$(PLUGINS_SRC_DIR)/rabbitmq-server" + mkdir -p $(PLUGINS_DIR) + PLUGINS_SRC_DIR="" $(MAKE) -C "$(PLUGINS_SRC_DIR)" plugins-dist PLUGINS_DIST_DIR="$(CURDIR)/$(PLUGINS_DIR)" VERSION=$(VERSION) + echo "Put your EZs here and use rabbitmq-plugins to enable them." > $(PLUGINS_DIR)/README + rm -f $(PLUGINS_DIR)/rabbit_common*.ez +else +plugins: +# Not building plugins +endif + $(DEPS_FILE): $(SOURCES) $(INCLUDES) rm -f $@ echo $(subst : ,:,$(foreach FILE,$^,$(FILE):)) | escript generate_deps $@ $(EBIN_DIR) @@ -143,6 +158,8 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel + rm -f $(PLUGINS_DIR)/*.ez + [ -d "$(PLUGINS_SRC_DIR)" ] && PLUGINS_SRC_DIR="" PRESERVE_CLONE_DIR=1 make -C $(PLUGINS_SRC_DIR) clean || true rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) @@ -244,7 +261,20 @@ srcdist: distclean cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* - (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) +ifneq "$(PLUGINS_SRC_DIR)" "" + cp -r $(PLUGINS_SRC_DIR) $(TARGET_SRC_DIR)/plugins-src + rm $(TARGET_SRC_DIR)/LICENSE + cat packaging/common/LICENSE.head >> $(TARGET_SRC_DIR)/LICENSE + cat $(AMQP_CODEGEN_DIR)/license_info >> $(TARGET_SRC_DIR)/LICENSE + find $(PLUGINS_SRC_DIR)/licensing -name "license_info_*" -exec cat '{}' >> $(TARGET_SRC_DIR)/LICENSE \; + cat packaging/common/LICENSE.tail >> $(TARGET_SRC_DIR)/LICENSE + find $(PLUGINS_SRC_DIR)/licensing -name "LICENSE-*" -exec cp '{}' $(TARGET_SRC_DIR) \; + rm -rf $(TARGET_SRC_DIR)/licensing +else + @echo No plugins source distribution found +endif + + (cd dist; tar -zchf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) (cd dist; zip -q -r $(TARBALL_NAME).zip $(TARBALL_NAME)) rm -rf $(TARGET_SRC_DIR) @@ -288,15 +318,16 @@ docs_all: $(MANPAGES) $(WEB_MANPAGES) install: install_bin install_docs install_bin: all install_dirs - cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) + cp -r ebin include LICENSE* INSTALL $(TARGET_DIR) chmod 0755 scripts/* - for script in rabbitmq-env rabbitmq-server rabbitmqctl; do \ + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-plugins; do \ cp scripts/$$script $(TARGET_DIR)/sbin; \ [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ done - mkdir -p $(TARGET_DIR)/plugins - echo Put your .ez plugin files in this directory. > $(TARGET_DIR)/plugins/README + + mkdir -p $(TARGET_DIR)/$(PLUGINS_DIR) + [ -d "$(PLUGINS_DIR)" ] && cp $(PLUGINS_DIR)/*.ez $(PLUGINS_DIR)/README $(TARGET_DIR)/$(PLUGINS_DIR) || true install_docs: docs_all install_dirs for section in 1 5; do \ diff --git a/docs/rabbitmq-plugins.1.xml b/docs/rabbitmq-plugins.1.xml new file mode 100644 index 00000000..5d74c6e1 --- /dev/null +++ b/docs/rabbitmq-plugins.1.xml @@ -0,0 +1,181 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> +<!-- + There is some extra magic in this document besides the usual DocBook semantics + to allow us to derive manpages, HTML and usage messages from the same source + document. + + Examples need to be moved to the end for man pages. To this end, <para>s and + <screen>s with role="example" will be moved, and with role="example-prefix" + will be removed. + + The usage messages are more involved. We have some magic in usage.xsl to pull + out the command synopsis, global option and subcommand synopses. We also pull + out <para>s with role="usage". + + Finally we construct lists of possible values for subcommand options, if the + subcommand's <varlistentry> has role="usage-has-option-list". The option which + takes the values should be marked with role="usage-option-list". +--> + +<refentry lang="en"> + <refentryinfo> + <productname>RabbitMQ Server</productname> + <authorgroup> + <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>rabbitmq-plugins</refentrytitle> + <manvolnum>1</manvolnum> + <refmiscinfo class="manual">RabbitMQ Service</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>rabbitmq-plugins</refname> + <refpurpose>command line tool for managing RabbitMQ broker plugins</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>rabbitmq-plugins</command> + <arg choice="req"><replaceable>command</replaceable></arg> + <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + <para> + <command>rabbitmq-plugins</command> is a command line tool for managing + RabbitMQ broker plugins. It allows one to enable, disable and browse + plugins. It must be run by a user with write permissions to the RabbitMQ + configuration directory. + </para> + <para> + Some plugins depend on others to work + correctly. <command>rabbitmq-plugins</command> traverses these + dependencies and enables all required plugins. Plugins listed on + the <command>rabbitmq-plugins</command> command line are marked as + explicitly enabled; dependent plugins are marked as implicitly + enabled. Implicitly enabled plugins are automatically disabled again + when they are no longer required. + </para> + </refsect1> + + <refsect1> + <title>Commands</title> + + <variablelist> + <varlistentry> + <term><cmdsynopsis><command>list</command> <arg choice="opt">-v</arg> <arg choice="opt">-m</arg> <arg choice="opt">-E</arg> <arg choice="opt">-e</arg> <arg choice="opt"><replaceable>pattern</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>-v</term> + <listitem><para>Show all plugin details (verbose).</para></listitem> + </varlistentry> + <varlistentry> + <term>-m</term> + <listitem><para>Show only plugin names (minimal).</para></listitem> + </varlistentry> + <varlistentry> + <term>-E</term> + <listitem><para>Show only explicitly enabled + plugins.</para></listitem> + </varlistentry> + <varlistentry> + <term>-e</term> + <listitem><para>Show only explicitly or implicitly + enabled plugins.</para></listitem> + </varlistentry> + <varlistentry> + <term>pattern</term> + <listitem><para>Pattern to filter the plugin names by.</para></listitem> + </varlistentry> + </variablelist> + <para> + Lists available plugins, their versions, dependencies and + descriptions. Each plugin is prefixed with a status + indicator - [ ] to indicate that the plugin is not + enabled, [E] to indicate that it is explicitly enabled, + and [e] to indicate that it is implicitly enabled. + </para> + <para> + If the optional pattern is given, only plugins whose + name matches <command>pattern</command> are shown. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins list</screen> + <para role="example"> + This command lists all the plugins available, on one line each. + </para> + <screen role="example">rabbitmq-plugins list -v </screen> + <para role="example"> + This command lists all the plugins available. + </para> + <screen role="example">rabbitmq-plugins list -v management</screen> + <para role="example"> + This command lists all the plugins available, but does not + display plugins whose name does not contain "management". + </para> + <screen role="example">rabbitmq-plugins list -e rabbit</screen> + <para role="example"> + This command lists all implicitly or explicitly enabled + RabbitMQ plugins. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>enable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>plugin</term> + <listitem><para>One or more plugins to enable.</para></listitem> + </varlistentry> + </variablelist> + <para> + Enables the specified plugins and all their + dependencies. + </para> + + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins enable rabbitmq_shovel rabbitmq_management</screen> + <para role="example"> + This command enables the <command>shovel</command> and + <command>management</command> plugins and all their + dependencies. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><cmdsynopsis><command>disable</command> <arg choice="req"><replaceable>plugin</replaceable> ...</arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>plugin</term> + <listitem><para>One or more plugins to disable.</para></listitem> + </varlistentry> + </variablelist> + <para> + Disables the specified plugins and all plugins that + depend on them. + </para> + + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmq-plugins disable amqp_client</screen> + <para role="example"> + This command disables <command>amqp_client</command> and + all plugins that depend on it. + </para> + </listitem> + </varlistentry> + </variablelist> + + </refsect1> + +</refentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index ac6399c6..a603886c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -75,6 +75,13 @@ -record(message_properties, {expiry, needs_confirming = false}). +-record(plugin, {name, %% atom() + version, %% string() + description, %% string() + type, %% 'ez' or 'dir' + dependencies, %% [{atom(), string()}] + location}). %% string() + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc."). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 0c5aa96a..fb27e9bd 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -55,6 +55,7 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-plugins install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -114,7 +115,7 @@ done %dir %{_sysconfdir}/rabbitmq %{_initrddir}/rabbitmq-server %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server -%doc LICENSE LICENSE-MPL-RabbitMQ +%doc LICENSE* %clean rm -rf %{buildroot} diff --git a/packaging/common/LICENSE.head b/packaging/common/LICENSE.head new file mode 100644 index 00000000..2b5a17ee --- /dev/null +++ b/packaging/common/LICENSE.head @@ -0,0 +1,5 @@ +This package, the RabbitMQ server is licensed under the MPL. + +If you have any questions regarding licensing, please contact us at +info@rabbitmq.com. + diff --git a/packaging/debs/Debian/debian/copyright b/packaging/common/LICENSE.tail index 7206bb9b..5d842cc1 100755..100644 --- a/packaging/debs/Debian/debian/copyright +++ b/packaging/common/LICENSE.tail @@ -1,14 +1,7 @@ -This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on -Wed, 3 Jan 2007 15:43:44 +0000. -It was downloaded from http://www.rabbitmq.com/ +The MIT license is as follows: -The files codegen/amqp-rabbitmq-0.8.json and -codegen/amqp-rabbitmq-0.9.1.json are covered by the following terms: - - "Copyright (C) 2008-2011 VMware, Inc. - - Permission is hereby granted, free of charge, to any person + "Permission is hereby granted, free of charge, to any person obtaining a copy of this file (the Software), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, @@ -28,6 +21,37 @@ codegen/amqp-rabbitmq-0.9.1.json are covered by the following terms: FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE." + +The BSD 2-Clause license is as follows: + + "Redistribution and use in source and binary forms, with or + without modification, are permitted provided that the + following conditions are met: + + 1. Redistributions of source code must retain the above + copyright notice, this list of conditions and the following + disclaimer. + + 2. Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials + provided with the distribution. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND + CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, + INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR + CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR + OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE." + + The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: @@ -490,13 +514,3 @@ EXHIBIT A -Mozilla Public License. the notices in the Source Code files of the Original Code. You should use the text of this Exhibit A rather than the text found in the Original Code Source Code for Your Modifications.] - - - - - -If you have any questions regarding licensing, please contact us at -info@rabbitmq.com. - -The Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed -under the MPL 1.1, see above. diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 23d2a06c..0436f546 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -29,10 +29,10 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` -if [ `id -u` = 0 ] ; then - @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" -elif [ `id -u` = `id -u rabbitmq` ] ; then +if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +elif [ `id -u` = 0 ] ; then + @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" else /usr/lib/rabbitmq/bin/${SCRIPT} echo diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 38c81134..8696427e 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -32,6 +32,9 @@ package: clean sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules + echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright + cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright + echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) rm -rf $(UNPACKED_DIR) diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index a785b292..108b1ed5 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -14,7 +14,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/ install/rabbitmq-server:: mkdir -p $(DOCDIR) rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL* - for script in rabbitmqctl rabbitmq-server; do \ + for script in rabbitmqctl rabbitmq-server rabbitmq-plugins; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 4a866305..03f087d9 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -83,24 +83,32 @@ post-destroot { reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env - foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} { + foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE ENABLED_PLUGINS_FILE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ ${realsbin}/rabbitmq-server \ - ${realsbin}/rabbitmqctl + ${realsbin}/rabbitmqctl \ + ${realsbin}/rabbitmq-plugins } xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-server reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \ - ${wrappersbin}/rabbitmq-server + ${filespath}/rabbitmq-script-wrapper reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ - ${wrappersbin}/rabbitmq-server + ${filespath}/rabbitmq-script-wrapper reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${filespath}/rabbitmq-script-wrapper + + xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-server - file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmqctl + xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ + ${wrappersbin}/rabbitmqctl + xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ + ${wrappersbin}/rabbitmq-plugins + - xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz \ + xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz rabbitmq-plugins.1.gz \ ${mandest}/man1/ xinstall -m 644 -W ${mansrc}/man5 rabbitmq-env.conf.5.gz ${mandest}/man5/ } diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index a0be8d89..828cf000 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -10,20 +10,23 @@ dist: mkdir $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-plugins.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts - rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile + rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile $(SOURCE_DIR)/*mk rm -f $(SOURCE_DIR)/README rm -rf $(SOURCE_DIR)/docs + rm -rf $(SOURCE_DIR)/src + rm -rf $(SOURCE_DIR)/dist mv $(SOURCE_DIR) $(TARGET_DIR) mkdir -p $(TARGET_DIR) - mkdir -p $(TARGET_DIR)/plugins - echo Put your .ez plugin files in this directory > $(TARGET_DIR)/plugins/README + mv $(TARGET_DIR)/plugins/README $(TARGET_DIR)/plugins/README.txt xmlto -o . xhtml-nochunks ../../docs/rabbitmq-service.xml elinks -dump -no-references -no-numbering rabbitmq-service.html \ > $(TARGET_DIR)/readme-service.txt todos $(TARGET_DIR)/readme-service.txt + rm -rf $(TARGET_DIR)/plugins-src zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR) rm -rf $(TARGET_DIR) rabbitmq-service.html diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins new file mode 100755 index 00000000..4c6cb1fa --- /dev/null +++ b/scripts/rabbitmq-plugins @@ -0,0 +1,34 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License +## at http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +## the License for the specific language governing rights and +## limitations under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developer of the Original Code is VMware, Inc. +## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## + +. `dirname $0`/rabbitmq-env + +ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins + +[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} + +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" + +exec erl \ + -pa "${RABBITMQ_HOME}/ebin" \ + -noinput \ + -hidden \ + -sname rabbitmq-plugins$$ \ + -s rabbit_plugins \ + -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ + -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ + -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat new file mode 100755 index 00000000..ca874a7f --- /dev/null +++ b/scripts/rabbitmq-plugins.bat @@ -0,0 +1,51 @@ +@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License
+REM at http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+REM the License for the specific language governing rights and
+REM limitations under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developer of the Original Code is VMware, Inc.
+REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM
+
+setlocal
+
+rem Preserve values that might contain exclamation marks before
+rem enabling delayed expansion
+set TDP0=%~dp0
+set STAR=%*
+setlocal enabledelayedexpansion
+
+if "!RABBITMQ_BASE!"=="" (
+ set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+)
+
+if not exist "!ERLANG_HOME!\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
+set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+
+endlocal
+endlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index deca5b30..11cc7215 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -21,6 +21,7 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia SERVER_START_ARGS= +ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins . `dirname $0`/rabbitmq-env @@ -52,6 +53,7 @@ fi [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand +[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" ## Log rotation @@ -74,7 +76,7 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then -hidden \ -s rabbit_prelaunch \ -sname rabbitmqprelaunch$$ \ - -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" + -extra "$RABBITMQ_ENABLED_PLUGINS_FILE" "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" then RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" RABBITMQ_EBIN_PATH="" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 56bed435..0a78794f 100644..100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -84,6 +84,10 @@ if "!RABBITMQ_PLUGINS_EXPAND_DIR!"=="" ( set RABBITMQ_PLUGINS_EXPAND_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-plugins-expand
)
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -92,7 +96,8 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -noinput -hidden ^
-s rabbit_prelaunch ^
-sname rabbitmqprelaunch!RANDOM! ^
--extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+ "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 26c6ea65..e671ba7a 100644..100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -153,6 +153,10 @@ if errorlevel 1 ( echo !RABBITMQ_SERVICENAME! service is already present - only updating service parameters
)
+if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
+ set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
+)
+
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
@@ -160,7 +164,8 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin -pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
+-extra "!RABBITMQ_ENABLED_PLUGINS_FILE!" ^
+ "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
""
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe1ddba0..46f6674b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -118,19 +118,19 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + State = #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = dict:new()}, + {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, none -> ok; _ -> erlang:monitor(process, Owner) end, - State = requeue_and_run( - AckTags, - process_args( - #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = RateTRef, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = MTC})), + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = MTC}, + State1 = requeue_and_run(AckTags, process_args( + rabbit_event:init_stats_timer( + State, #q.stats_timer))), lists:foldl( fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, - State, Deliveries). + State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, - State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, - stats_timer = StatsTimer}) -> +declare(Recover, From, State = #q{q = Q, + backing_queue = BQ, + backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -199,7 +198,7 @@ declare(Recover, From, State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(StatsTimer, + rabbit_event:if_enabled(State1, #q.stats_timer, fun() -> emit_stats(State1) end), noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} @@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. -ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = #amqqueue{pid = QPid}}) -> - State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, QPid, emit_stats)}. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) -> handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer), assert_invariant(State1), {noreply, State1, hibernate}; @@ -1167,18 +1164,17 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - stats_timer = StatsTimer}) -> + backing_queue_state = BQS}) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( - StatsTimer, + State, #q.stats_timer, fun () -> emit_stats(State, [{idle_since, now()}]) end), - State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, + #q.stats_timer), {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bcffe2af..883e570a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -173,7 +173,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), - StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -194,7 +193,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, blocking = sets:new(), queue_consumers = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), @@ -202,10 +200,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), - rabbit_event:if_enabled(StatsTimer, - fun() -> emit_stats(State) end), - {ok, State, hibernate, + State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_call(Msg, _From, _State) -> @@ -319,10 +318,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State) -> emit_stats(State), noreply([ensure_stats_timer], - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); + rabbit_event:reset_stats_timer(State, #ch.stats_timer)); handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), @@ -335,12 +334,12 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), rabbit_event:if_enabled( - StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), - StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State#ch{stats_timer = StatsTimer1}}. + State, #ch.stats_timer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), + {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. terminate(Reason, State) -> {Res, _State1} = notify_queues(State), @@ -385,9 +384,8 @@ next_state(Mask, State) -> State2 = ?MASKED_CALL(send_confirms, Mask, State1), State2. -ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, self(), emit_stats)}. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -1163,11 +1161,11 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> false -> State end. -queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, - queue_consumers = QCons, +queue_monitor_needed(QPid, #ch{queue_consumers = QCons, blocking = Blocking, - unconfirmed_qm = UQM}) -> - StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + unconfirmed_qm = UQM} = State) -> + StatsEnabled = rabbit_event:stats_level( + State, #ch.stats_timer) =:= fine, ConsumerMonitored = dict:is_key(QPid, QCons), QueueBlocked = sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), @@ -1511,8 +1509,8 @@ maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_redeliver_stats(_, _, State) -> State. -maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) -> - case rabbit_event:stats_level(StatsTimer) of +maybe_incr_stats(QXIncs, Measure, State) -> + case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> lists:foldl(fun ({QX, Inc}, State0) -> incr_stats(QX, Inc, Measure, State0) end, State, QXIncs); @@ -1544,9 +1542,9 @@ update_measures(Type, QX, Inc, Measure) -> emit_stats(State) -> emit_stats(State, []). -emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> +emit_stats(State, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), - case rabbit_event:stats_level(StatsTimer) of + case rabbit_event:stats_level(State, #ch.stats_timer) of coarse -> rabbit_event:notify(channel_stats, Extra ++ CoarseStats); fine -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e9f0cf6c..905e4fd0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -86,7 +86,7 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - quit(0); + rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", [string:join([atom_to_list(Command) | Args], " ")]), @@ -96,17 +96,17 @@ start() -> usage(); {error, Reason} -> print_error("~p", [Reason]), - quit(2); + rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), - quit(2); + rabbit_misc:quit(2); {badrpc, Reason} -> print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), - quit(2); + rabbit_misc:quit(2); Other -> print_error("~p", [Other]), - quit(2) + rabbit_misc:quit(2) end. fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). @@ -157,7 +157,7 @@ stop() -> usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), - quit(1). + rabbit_misc:quit(1). %%---------------------------------------------------------------------------- @@ -514,10 +514,3 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value); prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; prettify_typed_amqp_value(_Type, Value) -> Value. - -%% the slower shutdown on windows required to flush stdout -quit(Status) -> - case os:type() of - {unix, _} -> halt(Status); - {win32, _} -> init:stop(Status) - end. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index bb765566..5ae40c78 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,9 +19,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]). --export([reset_stats_timer/1]). --export([stats_level/1, if_enabled/2]). +-export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]). +-export([reset_stats_timer/2]). +-export([stats_level/2, if_enabled/3]). -export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- @@ -39,29 +39,23 @@ -type(event_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). --type(event() :: #event { - type :: event_type(), - props :: event_props(), - timestamp :: event_timestamp() - }). +-type(event() :: #event { type :: event_type(), + props :: event_props(), + timestamp :: event_timestamp() }). -type(level() :: 'none' | 'coarse' | 'fine'). --opaque(state() :: #state { - level :: level(), - interval :: integer(), - timer :: atom() - }). - -type(timer_fun() :: fun (() -> 'ok')). +-type(container() :: tuple()). +-type(pos() :: non_neg_integer()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()). --spec(stop_stats_timer/1 :: (state()) -> state()). --spec(reset_stats_timer/1 :: (state()) -> state()). --spec(stats_level/1 :: (state()) -> level()). --spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). +-spec(init_stats_timer/2 :: (container(), pos()) -> container()). +-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()). +-spec(stop_stats_timer/2 :: (container(), pos()) -> container()). +-spec(reset_stats_timer/2 :: (container(), pos()) -> container()). +-spec(stats_level/2 :: (container(), pos()) -> level()). +-spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). @@ -75,58 +69,69 @@ start_link() -> %% The idea is, for each stat-emitting object: %% %% On startup: -%% Timer = init_stats_timer() +%% init_stats_timer(State) %% notify(created event) %% if_enabled(internal_emit_stats) - so we immediately send something %% %% On wakeup: -%% ensure_stats_timer(Timer, Pid, emit_stats) +%% ensure_stats_timer(State, emit_stats) %% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) %% %% emit_stats: %% if_enabled(internal_emit_stats) -%% reset_stats_timer(Timer) - just bookkeeping +%% reset_stats_timer(State) - just bookkeeping %% %% Pre-hibernation: %% if_enabled(internal_emit_stats) -%% stop_stats_timer(Timer) +%% stop_stats_timer(State) %% %% internal_emit_stats: %% notify(stats) -init_stats_timer() -> +init_stats_timer(C, P) -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), - #state{level = StatsLevel, interval = Interval, timer = undefined}. - -ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) -> - State; -ensure_stats_timer(State = #state{interval = Interval, - timer = undefined}, Pid, Msg) -> - TRef = erlang:send_after(Interval, Pid, Msg), - State#state{timer = TRef}; -ensure_stats_timer(State, _Pid, _Msg) -> - State. - -stop_stats_timer(State = #state{level = none}) -> - State; -stop_stats_timer(State = #state{timer = undefined}) -> - State; -stop_stats_timer(State = #state{timer = TRef}) -> - erlang:cancel_timer(TRef), - State#state{timer = undefined}. - -reset_stats_timer(State) -> - State#state{timer = undefined}. - -stats_level(#state{level = Level}) -> + setelement(P, C, #state{level = StatsLevel, interval = Interval, + timer = undefined}). + +ensure_stats_timer(C, P, Msg) -> + case element(P, C) of + #state{level = Level, interval = Interval, timer = undefined} = State + when Level =/= none -> + TRef = erlang:send_after(Interval, self(), Msg), + setelement(P, C, State#state{timer = TRef}); + #state{} -> + C + end. + +stop_stats_timer(C, P) -> + case element(P, C) of + #state{level = Level, timer = TRef} = State + when Level =/= none andalso TRef =/= undefined -> + erlang:cancel_timer(TRef), + setelement(P, C, State#state{timer = undefined}); + #state{} -> + C + end. + +reset_stats_timer(C, P) -> + case element(P, C) of + #state{timer = TRef} = State + when TRef =/= undefined -> + setelement(P, C, State#state{timer = undefined}); + #state{} -> + C + end. + +stats_level(C, P) -> + #state{level = Level} = element(P, C), Level. -if_enabled(#state{level = none}, _Fun) -> - ok; -if_enabled(_State, Fun) -> - Fun(), - ok. +if_enabled(C, P, Fun) -> + case element(P, C) of + #state{level = none} -> ok; + #state{} -> Fun(), ok + end. notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 13a553f1..a9b15af8 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,6 +55,7 @@ -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). +-export([quit/1]). %%---------------------------------------------------------------------------- @@ -197,6 +198,7 @@ -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). +-spec(quit/1 :: (integer() | string()) -> no_return()). -endif. @@ -842,3 +844,10 @@ append_rpc_all_nodes(Nodes, M, F, A) -> {badrpc, _} -> []; _ -> Res end || Res <- ResL]). + +%% the slower shutdown on windows required to flush stdout +quit(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status) + end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl new file mode 100644 index 00000000..27dadfc5 --- /dev/null +++ b/src/rabbit_plugins.erl @@ -0,0 +1,357 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_plugins). +-include("rabbit.hrl"). + +-export([start/0, stop/0, find_plugins/1, read_enabled_plugins/1, + lookup_plugins/2, calculate_required_plugins/2, plugin_names/1]). + +-define(VERBOSE_OPT, "-v"). +-define(MINIMAL_OPT, "-m"). +-define(ENABLED_OPT, "-E"). +-define(ENABLED_ALL_OPT, "-e"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(find_plugins/1 :: (file:filename()) -> [#plugin{}]). +-spec(read_enabled_plugins/1 :: (file:filename()) -> [atom()]). +-spec(lookup_plugins/2 :: ([atom()], [#plugin{}]) -> [#plugin{}]). +-spec(calculate_required_plugins/2 :: ([atom()], [#plugin{}]) -> [atom()]). +-spec(plugin_names/1 :: ([#plugin{}]) -> [atom()]). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + {ok, [[PluginsFile|_]|_]} = + init:get_argument(enabled_plugins_file), + {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), + {[Command0 | Args], Opts} = + case rabbit_misc:get_options([{flag, ?VERBOSE_OPT}, + {flag, ?MINIMAL_OPT}, + {flag, ?ENABLED_OPT}, + {flag, ?ENABLED_ALL_OPT}], + init:get_plain_arguments()) of + {[], _Opts} -> usage(); + CmdArgsAndOpts -> CmdArgsAndOpts + end, + Command = list_to_atom(Command0), + + case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + ok -> + rabbit_misc:quit(0); + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]), + usage(); + {error, Reason} -> + print_error("~p", [Reason]), + rabbit_misc:quit(2); + Other -> + print_error("~s", [Other]), + rabbit_misc:quit(2) + end. + +stop() -> + ok. + +print_error(Format, Args) -> + rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). + +usage() -> + io:format("~s", [rabbit_plugins_usage:usage()]), + rabbit_misc:quit(1). + +%%---------------------------------------------------------------------------- + +action(list, [], Opts, PluginsFile, PluginsDir) -> + action(list, [".*"], Opts, PluginsFile, PluginsDir); +action(list, [Pat], Opts, PluginsFile, PluginsDir) -> + format_plugins(Pat, Opts, PluginsFile, PluginsDir); + +action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> + case ToEnable0 of + [] -> throw("Not enough arguments for 'enable'"); + _ -> ok + end, + AllPlugins = find_plugins(PluginsDir), + Enabled = read_enabled_plugins(PluginsFile), + ImplicitlyEnabled = calculate_required_plugins(Enabled, AllPlugins), + ToEnable = [list_to_atom(Name) || Name <- ToEnable0], + Missing = ToEnable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> throw(fmt_list("The following plugins could not be found:", + Missing)) + end, + NewEnabled = lists:usort(Enabled ++ ToEnable), + write_enabled_plugins(PluginsFile, NewEnabled), + case NewEnabled -- ImplicitlyEnabled of + [] -> io:format("Plugin configuration unchanged.~n"); + _ -> NewImplicitlyEnabled = + calculate_required_plugins(NewEnabled, AllPlugins), + print_list("The following plugins have been enabled:", + NewImplicitlyEnabled -- ImplicitlyEnabled), + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n") + end; + +action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> + case ToDisable0 of + [] -> throw("Not enough arguments for 'disable'"); + _ -> ok + end, + ToDisable = [list_to_atom(Name) || Name <- ToDisable0], + Enabled = read_enabled_plugins(PluginsFile), + AllPlugins = find_plugins(PluginsDir), + Missing = ToDisable -- plugin_names(AllPlugins), + case Missing of + [] -> ok; + _ -> print_list("Warning: the following plugins could not be found:", + Missing) + end, + ToDisableDeps = calculate_dependencies(true, ToDisable, AllPlugins), + NewEnabled = Enabled -- ToDisableDeps, + case length(Enabled) =:= length(NewEnabled) of + true -> io:format("Plugin configuration unchanged.~n"); + false -> ImplicitlyEnabled = + calculate_required_plugins(Enabled, AllPlugins), + NewImplicitlyEnabled = + calculate_required_plugins(NewEnabled, AllPlugins), + print_list("The following plugins have been disabled:", + ImplicitlyEnabled -- NewImplicitlyEnabled), + write_enabled_plugins(PluginsFile, NewEnabled), + io:format("Plugin configuration has changed. " + "Restart RabbitMQ for changes to take effect.~n") + end. + +%%---------------------------------------------------------------------------- + +%% Get the #plugin{}s ready to be enabled. +find_plugins(PluginsDir) -> + EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)], + FreeApps = [{app, App} || + App <- filelib:wildcard("*/ebin/*.app", PluginsDir)], + {Plugins, Problems} = + lists:foldl(fun ({error, EZ, Reason}, {Plugins1, Problems1}) -> + {Plugins1, [{EZ, Reason} | Problems1]}; + (Plugin = #plugin{}, {Plugins1, Problems1}) -> + {[Plugin|Plugins1], Problems1} + end, {[], []}, + [get_plugin_info(PluginsDir, Plug) || + Plug <- EZs ++ FreeApps]), + case Problems of + [] -> ok; + _ -> io:format("Warning: Problem reading some plugins: ~p~n", + [Problems]) + end, + Plugins. + +%% Get the #plugin{} from an .ez. +get_plugin_info(Base, {ez, EZ0}) -> + EZ = filename:join([Base, EZ0]), + case read_app_file(EZ) of + {application, Name, Props} -> mkplugin(Name, Props, ez, EZ); + {error, Reason} -> {error, EZ, Reason} + end; +%% Get the #plugin{} from an .app. +get_plugin_info(Base, {app, App0}) -> + App = filename:join([Base, App0]), + case rabbit_file:read_term_file(App) of + {ok, [{application, Name, Props}]} -> + mkplugin(Name, Props, dir, + filename:absname( + filename:dirname(filename:dirname(App)))); + {error, Reason} -> + {error, App, {invalid_app, Reason}} + end. + +mkplugin(Name, Props, Type, Location) -> + Version = proplists:get_value(vsn, Props, "0"), + Description = proplists:get_value(description, Props, ""), + Dependencies = + filter_applications(proplists:get_value(applications, Props, [])), + #plugin{name = Name, version = Version, description = Description, + dependencies = Dependencies, location = Location, type = Type}. + +%% Read the .app file from an ez. +read_app_file(EZ) -> + case zip:list_dir(EZ) of + {ok, [_|ZippedFiles]} -> + case find_app_files(ZippedFiles) of + [AppPath|_] -> + {ok, [{AppPath, AppFile}]} = + zip:extract(EZ, [{file_list, [AppPath]}, memory]), + parse_binary(AppFile); + [] -> + {error, no_app_file} + end; + {error, Reason} -> + {error, {invalid_ez, Reason}} + end. + +%% Return the path of the .app files in ebin/. +find_app_files(ZippedFiles) -> + {ok, RE} = re:compile("^.*/ebin/.*.app$"), + [Path || {zip_file, Path, _, _, _, _} <- ZippedFiles, + re:run(Path, RE, [{capture, none}]) =:= match]. + +%% Parse a binary into a term. +parse_binary(Bin) -> + try + {ok, Ts, _} = erl_scan:string(binary_to_list(Bin)), + {ok, Term} = erl_parse:parse_term(Ts), + Term + catch + Err -> {error, {invalid_app, Err}} + end. + +%% Pretty print a list of plugins. +format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> + Verbose = proplists:get_bool(?VERBOSE_OPT, Opts), + Minimal = proplists:get_bool(?MINIMAL_OPT, Opts), + Format = case {Verbose, Minimal} of + {false, false} -> normal; + {true, false} -> verbose; + {false, true} -> minimal; + {true, true} -> throw("Cannot specify -m and -v together") + end, + OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), + OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), + + AvailablePlugins = find_plugins(PluginsDir), + EnabledExplicitly = read_enabled_plugins(PluginsFile), + EnabledImplicitly = + calculate_required_plugins(EnabledExplicitly, AvailablePlugins) -- + EnabledExplicitly, + {ok, RE} = re:compile(Pattern), + Plugins = [ Plugin || + Plugin = #plugin{name = Name} <- AvailablePlugins, + re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + true -> true + end, + if OnlyEnabledAll -> + lists:member(Name, EnabledImplicitly) or + lists:member(Name, EnabledExplicitly); + true -> + true + end], + Plugins1 = usort_plugins(Plugins), + MaxWidth = lists:max([length(atom_to_list(Name)) || + #plugin{name = Name} <- Plugins1] ++ [0]), + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format, + MaxWidth) || P <- Plugins1], + ok. + +format_plugin(#plugin{name = Name, version = Version, + description = Description, dependencies = Deps}, + EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) -> + Glyph = case {lists:member(Name, EnabledExplicitly), + lists:member(Name, EnabledImplicitly)} of + {true, false} -> "[E]"; + {false, true} -> "[e]"; + _ -> "[ ]" + end, + case Format of + minimal -> io:format("~s~n", [Name]); + normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ + "w ~s~n", [Glyph, Name, Version]); + verbose -> io:format("~s ~w~n", [Glyph, Name]), + io:format(" Version: \t~s~n", [Version]), + case Deps of + [] -> ok; + _ -> io:format(" Dependencies:\t~p~n", [Deps]) + end, + io:format(" Description:\t~s~n", [Description]), + io:format("~n") + end. + +print_list(Header, Plugins) -> + io:format(fmt_list(Header, Plugins)). + +fmt_list(Header, Plugins) -> + lists:flatten( + [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins], $\n]). + +usort_plugins(Plugins) -> + lists:usort(fun plugins_cmp/2, Plugins). + +plugins_cmp(#plugin{name = N1, version = V1}, + #plugin{name = N2, version = V2}) -> + {N1, V1} =< {N2, V2}. + +%% Filter out applications that can be loaded *right now*. +filter_applications(Applications) -> + [Application || Application <- Applications, + not is_available_app(Application)]. + +%% Return whether is application is already available (and hence +%% doesn't need enabling). +is_available_app(Application) -> + case application:load(Application) of + {error, {already_loaded, _}} -> true; + ok -> application:unload(Application), + true; + _ -> false + end. + +%% Return the names of the given plugins. +plugin_names(Plugins) -> + [Name || #plugin{name = Name} <- Plugins]. + +%% Find plugins by name in a list of plugins. +lookup_plugins(Names, AllPlugins) -> + [P || P = #plugin{name = Name} <- AllPlugins, lists:member(Name, Names)]. + +%% Read the enabled plugin names from disk. +read_enabled_plugins(PluginsFile) -> + case rabbit_file:read_term_file(PluginsFile) of + {ok, [Plugins]} -> Plugins; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, + PluginsFile, Reason}}) + end. + +%% Write the enabled plugin names on disk. +write_enabled_plugins(PluginsFile, Plugins) -> + case rabbit_file:write_term_file(PluginsFile, [Plugins]) of + ok -> ok; + {error, Reason} -> throw({error, {cannot_write_enabled_plugins_file, + PluginsFile, Reason}}) + end. + +calculate_required_plugins(Sources, AllPlugins) -> + calculate_dependencies(false, Sources, AllPlugins). + +calculate_dependencies(Reverse, Sources, AllPlugins) -> + {ok, G} = rabbit_misc:build_acyclic_graph( + fun (App, _Deps) -> [{App, App}] end, + fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, + [{Name, Deps} + || #plugin{name = Name, dependencies = Deps} <- AllPlugins]), + Dests = case Reverse of + false -> digraph_utils:reachable(Sources, G); + true -> digraph_utils:reaching(Sources, G) + end, + true = digraph:delete(G), + Dests. diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index cd0c322b..d34ed44a 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -18,6 +18,8 @@ -export([start/0, stop/0]). +-include("rabbit.hrl"). + -define(BaseApps, [rabbit]). -define(ERROR_CODE, 1). @@ -41,14 +43,14 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), %% Determine our various directories - [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(), + [EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir, NodeStr] = + init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", - %% Unpack any .ez plugins - unpack_ez_plugins(PluginDir, UnpackedPluginDir), + prepare_plugins(EnabledPluginsFile, PluginsDistDir, UnpackedPluginDir), %% Build a list of required apps based on the fixed set, and any plugins - PluginApps = find_plugins(PluginDir) ++ find_plugins(UnpackedPluginDir), + PluginApps = find_plugins(UnpackedPluginDir), RequiredApps = ?BaseApps ++ PluginApps, %% Build the entire set of dependencies - this will load the @@ -145,7 +147,19 @@ delete_recursively(Fn) -> Error -> Error end. -unpack_ez_plugins(SrcDir, DestDir) -> +prepare_plugins(EnabledPluginsFile, PluginsDistDir, DestDir) -> + AllPlugins = rabbit_plugins:find_plugins(PluginsDistDir), + Enabled = rabbit_plugins:read_enabled_plugins(EnabledPluginsFile), + ToUnpack = rabbit_plugins:calculate_required_plugins(Enabled, AllPlugins), + ToUnpackPlugins = rabbit_plugins:lookup_plugins(ToUnpack, AllPlugins), + + Missing = Enabled -- rabbit_plugins:plugin_names(ToUnpackPlugins), + case Missing of + [] -> ok; + _ -> io:format("Warning: the following enabled plugins were " + "not found: ~p~n", [Missing]) + end, + %% Eliminate the contents of the destination directory case delete_recursively(DestDir) of ok -> ok; @@ -155,12 +169,15 @@ unpack_ez_plugins(SrcDir, DestDir) -> ok -> ok; {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2]) end, - [unpack_ez_plugin(PluginName, DestDir) || - PluginName <- filelib:wildcard(SrcDir ++ "/*.ez")]. -unpack_ez_plugin(PluginFn, PluginDestDir) -> - zip:unzip(PluginFn, [{cwd, PluginDestDir}]), - ok. + [prepare_plugin(Plugin, DestDir) || Plugin <- ToUnpackPlugins]. + +prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> + zip:unzip(Location, [{cwd, PluginDestDir}]); +prepare_plugin(#plugin{type = dir, name = Name, location = Location}, + PluginsDestDir) -> + rabbit_file:recursive_copy(Location, + filename:join([PluginsDestDir, Name])). find_plugins(PluginDir) -> [prepare_dir_plugin(PluginName) || diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b4871cef..b359f7d4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -199,34 +199,32 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + State = #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none, + capabilities = []}, + callback = uninitialized_callback, + recv_len = 0, + pending_recv = false, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun, + buf = [], + buf_len = 0, + auth_mechanism = none, + auth_state = none}, try - recvloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - protocol = none, - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - capabilities = []}, - callback = uninitialized_callback, - recv_len = 0, - pending_recv = false, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer(), - channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun, - buf = [], - buf_len = 0, - auth_mechanism = none, - auth_state = none - }, - handshake, 8)) + recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -605,10 +603,8 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). -ensure_stats_timer(State = #v1{stats_timer = StatsTimer, - connection_state = running}) -> - State#v1{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, self(), emit_stats)}; +ensure_stats_timer(State = #v1{connection_state = running}) -> + rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); ensure_stats_timer(State) -> State. @@ -695,8 +691,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - sock = Sock, - stats_timer = StatsTimer}) -> + sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -707,7 +702,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(StatsTimer, + rabbit_event:if_enabled(State1, #v1.stats_timer, fun() -> emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> @@ -937,6 +932,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, State1#v1.sock, 0, CloseMethod, Protocol), State1. -emit_stats(State = #v1{stats_timer = StatsTimer}) -> +emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), - State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. + rabbit_event:reset_stats_timer(State, #v1.stats_timer). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 39f67ced..5e034ae7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). on_disk_capture() -> - on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). -on_disk_capture({OnDisk, Awaiting, Pid}) -> - Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of - true -> Pid ! {self(), arrived}, undefined; - false -> Pid - end, receive - {await, MsgIds, Pid2} -> - true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting), - on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2}); - {on_disk, MsgIds} -> - on_disk_capture({gb_sets:union(OnDisk, MsgIds), - gb_sets:subtract(Awaiting, MsgIds), - Pid1}); + {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid); + stop -> done + end. + +on_disk_capture(OnDisk, Awaiting, Pid) -> + receive + {on_disk, MsgIdsS} -> + MsgIds = gb_sets:to_list(MsgIdsS), + on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds, + Pid); stop -> done + after 100 -> + case {OnDisk, Awaiting} of + {[], []} -> Pid ! {self(), arrived}, on_disk_capture(); + {_, []} -> Pid ! {self(), surplus}; + {[], _} -> Pid ! {self(), timeout}; + {_, _} -> Pid ! {self(), surplus_timeout} + end end. on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> - Pid ! {await, gb_sets:from_list(MsgIds), self()}, - receive {Pid, arrived} -> ok end. + Pid ! {await, MsgIds, self()}, + receive + {Pid, arrived} -> ok; + {Pid, Error} -> Error + end. on_disk_stop(Pid) -> MRef = erlang:monitor(process, Pid), @@ -1915,9 +1922,15 @@ test_msg_store() -> MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), Ref = rabbit_guid:guid(), - {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref), + {Cap, MSCState} = msg_store_client_init_capture( + ?PERSISTENT_MSG_STORE, Ref), + Ref2 = rabbit_guid:guid(), + {Cap2, MSC2State} = msg_store_client_init_capture( + ?PERSISTENT_MSG_STORE, Ref2), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), + %% test confirm logic + passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half @@ -1926,20 +1939,25 @@ test_msg_store() -> ok = msg_store_write(MsgIds2ndHalf, MSCState), %% check they're all in there true = msg_store_contains(true, MsgIds, MSCState), - %% publish the latter half twice so we hit the caching and ref count code - ok = msg_store_write(MsgIds2ndHalf, MSCState), + %% publish the latter half twice so we hit the caching and ref + %% count code. We need to do this through a 2nd client since a + %% single client is not supposed to write the same message more + %% than once without first removing it. + ok = msg_store_write(MsgIds2ndHalf, MSC2State), %% check they're still all in there true = msg_store_contains(true, MsgIds, MSCState), - %% sync on the 2nd half, but do lots of individual syncs to try - %% and cause coalescing to happen - ok = on_disk_await(Cap, MsgIds2ndHalf), + %% sync on the 2nd half + ok = on_disk_await(Cap2, MsgIds2ndHalf), + %% cleanup + ok = on_disk_stop(Cap2), + ok = rabbit_msg_store:client_delete_and_terminate(MSC2State), ok = on_disk_stop(Cap), %% read them all MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk MSCState2 = msg_store_read(MsgIds, MSCState1), %% remove them all - ok = rabbit_msg_store:remove(MsgIds, MSCState2), + ok = msg_store_remove(MsgIds, MSCState2), %% check first half doesn't exist false = msg_store_contains(false, MsgIds1stHalf, MSCState2), %% check second half does exist @@ -1977,7 +1995,7 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate( msg_store_read(MsgIds1stHalf, MSCState6)), MSCState7 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), - ok = rabbit_msg_store:remove(MsgIds1stHalf, MSCState7), + ok = msg_store_remove(MsgIds1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty restart_msg_store_empty(), %% now safe to reuse msg_ids @@ -2024,6 +2042,35 @@ test_msg_store() -> restart_msg_store_empty(), passed. +%% We want to test that writes that get eliminated due to removes still +%% get confirmed. Removes themselves do not. +test_msg_store_confirms(MsgIds, Cap, MSCState) -> + %% write -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove -> _ + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, []), + %% write, remove -> confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% write, remove, write -> confirmed, confirmed + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds ++ MsgIds), + %% remove, write -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + %% remove, write, remove -> confirmed + ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_write(MsgIds, MSCState), + ok = msg_store_remove(MsgIds, MSCState), + ok = on_disk_await(Cap, MsgIds), + passed. + queue_name(Name) -> rabbit_misc:r(<<"/">>, queue, Name). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 94c0913d..b853d983 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -494,9 +494,31 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, _ChPid, State) -> - {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), - a(reduce_memory_use(State1)). +publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + State2 = case bpqueue:is_empty(Q3) of + false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } + end, + PCount1 = PCount + one_if(IsPersistent1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + unconfirmed = UC1 })). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, @@ -1117,34 +1139,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, MsgOnDisk, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case bpqueue:is_empty(Q3) of - false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } - end, - PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 }}. - maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> MsgStatus; |