diff options
author | David Wragg <dpw@lshift.net> | 2010-04-23 17:58:55 +0100 |
---|---|---|
committer | David Wragg <dpw@lshift.net> | 2010-04-23 17:58:55 +0100 |
commit | a951db5c1e6cabd50949efd9d320771442e9c270 (patch) | |
tree | dd6c68805918c136fd42d06775c18ecf2c3a831b | |
parent | 44bdaf864f386b61da59e63f81b8c3e630fd2194 (diff) | |
parent | b278252edfd4f5f0819fdaa33470187732776411 (diff) | |
download | rabbitmq-server-a951db5c1e6cabd50949efd9d320771442e9c270.tar.gz |
Merge bug22407 into default
48 files changed, 2319 insertions, 717 deletions
@@ -11,8 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^src/rabbit_framing\.erl$ -^src/rabbitmqctl_usage\.erl$ -^src/rabbitmqmulti_usage\.erl$ +^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ ^ebin/rabbit\.(app|rel|boot|script)$ @@ -28,4 +27,3 @@ syntax: regexp ^docs/.*\.[15]\.gz$ ^docs/.*\.man\.xml$ -^docs/.*\.usage.erl$ @@ -10,14 +10,16 @@ DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include +DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl $(SOURCE_DIR)/rabbitmqmulti_usage.erl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ -MANPAGES=$(patsubst %.xml, %.gz, $(wildcard docs/*.[0-9].xml)) -WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard docs/*.[0-9].xml) docs/rabbitmq-service.xml) -USAGES=$(patsubst %.1.xml, %.usage.erl, $(wildcard docs/*.[0-9].xml)) +MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) +WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml +USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -60,6 +62,14 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e ERL_EBIN=erl -noinput -pa $(EBIN_DIR) +define usage_xml_to_erl + $(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, $(SOURCE_DIR)/rabbit_%_usage.erl, $(subst -,_,$(1)))) +endef + +define usage_dep + $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl +endef + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -68,9 +78,8 @@ $(DEPS_FILE): $(SOURCES) $(INCLUDES) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl +$(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@ @@ -78,12 +87,6 @@ $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.p $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@ -$(SOURCE_DIR)/rabbitmqctl_usage.erl: docs/rabbitmqctl.usage.erl - cp docs/rabbitmqctl.usage.erl $@ - -$(SOURCE_DIR)/rabbitmqmulti_usage.erl: docs/rabbitmq-multi.usage.erl - cp docs/rabbitmq-multi.usage.erl $@ - dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ "rabbit_dialyzer:halt_with_code(rabbit_dialyzer:dialyze_files(\"$(BASIC_PLT)\", \"$(BEAM_TARGETS)\"))." @@ -107,8 +110,8 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl $(SOURCE_DIR)/rabbitmqctl_usage.erl codegen.pyc - rm -f docs/*.[0-9].gz docs/*.usage.erl docs/*.man.xml docs/*.erl + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -161,7 +164,8 @@ stop-node: COVER_DIR=. start-cover: all - echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) + echo "rabbit_misc:start_cover([\"rabbit\", \"hare\"])." | $(ERL_CALL) + echo "rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -184,7 +188,7 @@ srcdist: distclean cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) - cp -r docs $(TARGET_SRC_DIR) + cp -r $(DOCS_DIR) $(TARGET_SRC_DIR) chmod 0755 $(TARGET_SRC_DIR)/scripts/* (cd dist; tar -zcf $(TARBALL_NAME).tar.gz $(TARBALL_NAME)) @@ -197,25 +201,31 @@ distclean: clean find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; # xmlto can not read from standard input, so we mess with a tmp file. -%.gz: %.xml docs/examples-to-end.xsl - xsltproc docs/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o docs $<.tmp && \ - gzip -f docs/`basename $< .xml` +%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp -%.usage.erl: %.1.xml docs/usage.xsl - xsltproc --stringparam modulename "`basename $< .1.xml | tr -d -`_usage" \ - docs/usage.xsl $< | sed -e s/\\\"/\\\\\\\"/g | sed -e s/%QUOTE%/\\\"/g | \ - fmt -s > docs/`basename $< .1.xml`.usage.erl +# Use tmp files rather than a pipeline so that we get meaningful errors +# Do not fold the cp into previous line, it's there to stop the file being +# generated but empty if we fail +$(SOURCE_DIR)/%_usage.erl: + xsltproc --stringparam modulename "`basename $@ .erl`" \ + $(DOCS_DIR)/usage.xsl $< > $@.tmp + sed -e 's/"/\\"/g' -e 's/%QUOTE%/"/g' $@.tmp > $@.tmp2 + fold -s $@.tmp2 > $@.tmp3 + mv $@.tmp3 $@ + rm $@.tmp $@.tmp2 # We rename the file before xmlto sees it since xmlto will use the name of # the file to make internal links. -%.man.xml: %.xml docs/html-to-website-xml.xsl +%.man.xml: %.xml $(DOCS_DIR)/html-to-website-xml.xsl cp $< `basename $< .xml`.xml && \ xmlto xhtml-nochunks `basename $< .xml`.xml ; rm `basename $< .xml`.xml cat `basename $< .xml`.html | \ - xsltproc --novalid docs/remove-namespaces.xsl - | \ - xsltproc --stringparam original `basename $<` docs/html-to-website-xml.xsl - | \ + xsltproc --novalid $(DOCS_DIR)/remove-namespaces.xsl - | \ + xsltproc --stringparam original `basename $<` $(DOCS_DIR)/html-to-website-xml.xsl - | \ xmllint --format - > $@ rm `basename $< .xml`.html @@ -237,8 +247,8 @@ install: all docs_all install_dirs done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ - for manpage in docs/*.$$section.pod; do \ - cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ + for manpage in $(DOCS_DIR)/*.$$section.gz; do \ + cp $$manpage $(MAN_DIR)/man$$section; \ done; \ done @@ -246,4 +256,27 @@ install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin --include $(DEPS_FILE) +$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) + +# Note that all targets which depend on clean must have clean in their +# name. Also any target that doesn't depend on clean should not have +# clean in its name, unless you know that you don't need any of the +# automatic dependency generation for that target (eg cleandb). + +# We want to load the dep file if *any* target *doesn't* contain +# "clean" - i.e. if removing all clean-like targets leaves something + +ifeq "$(MAKECMDGOALS)" "" +TESTABLEGOALS:=$(.DEFAULT_GOAL) +else +TESTABLEGOALS:=$(MAKECMDGOALS) +endif + +ifneq "$(strip $(TESTABLEGOALS))" "$(DEPS_FILE)" +ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" "" +ifeq "$(strip $(wildcard $(DEPS_FILE)))" "" +$(info $(shell $(MAKE) $(DEPS_FILE))) +endif +include $(DEPS_FILE) +endif +endif @@ -126,7 +126,7 @@ def printFileHeader(): %% %% Contributor(s): ______________________________________. %%""" - + def genErl(spec): def erlType(domain): return erlangTypeMap[spec.resolveDomain(domain)] @@ -151,7 +151,7 @@ def genErl(spec): def genMethodHasContent(m): print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower()) - + def genMethodIsSynchronous(m): hasNoWait = "nowait" in fieldNameList(m.arguments) if m.isSynchronous and hasNoWait: @@ -219,6 +219,9 @@ def genErl(spec): else: pass + def genMethodRecord(m): + print "method_record(%s) -> #%s{};" % (m.erlangName(), m.erlangName()) + def genDecodeMethodFields(m): packedFields = packMethodFields(m.arguments) binaryPattern = ', '.join([methodFieldFragment(f) for f in packedFields]) @@ -299,6 +302,7 @@ def genErl(spec): -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). +-export([method_record/1]). -export([method_fieldnames/1]). -export([decode_method_fields/2]). -export([decode_properties/2]). @@ -323,6 +327,9 @@ bitvalue(undefined) -> 0. for m in methods: genMethodIsSynchronous(m) print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodRecord(m) + print "method_record(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodFieldNames(m) print "method_fieldnames(Name) -> exit({unknown_method_name, Name})." @@ -362,7 +369,7 @@ def genHrl(spec): result += ' = ' + conv_fn(field.defaultvalue) return result return ', '.join([fillField(f) for f in fields]) - + methods = spec.allMethods() printFileHeader() @@ -386,7 +393,7 @@ def generateErl(specPath): def generateHrl(specPath): genHrl(AmqpSpec(specPath)) - + if __name__ == "__main__": do_main(generateHrl, generateErl) diff --git a/docs/examples-to-end.xsl b/docs/examples-to-end.xsl index b63ffcb3..496fcc1c 100644 --- a/docs/examples-to-end.xsl +++ b/docs/examples-to-end.xsl @@ -8,7 +8,7 @@ <xsl:output doctype-public="-//OASIS//DTD DocBook XML V4.5//EN" doctype-system="http://www.docbook.org/xml/4.5/docbookx.dtd" /> -<!-- Don't copy exmaples through in place --> +<!-- Don't copy examples through in place --> <xsl:template match="*[@role='example-prefix']"/> <xsl:template match="*[@role='example']"/> @@ -25,6 +25,7 @@ </xsl:for-each> <refsect1> <title>Examples</title> +<xsl:if test="//screen[@role='example']"> <variablelist> <xsl:for-each select="//screen[@role='example']"> <varlistentry> @@ -35,6 +36,16 @@ </varlistentry> </xsl:for-each> </variablelist> +</xsl:if> +<!-- +We need to handle multiline examples separately, since not using a +variablelist leads to slightly less nice formatting (the explanation doesn't get +indented) +--> +<xsl:for-each select="//screen[@role='example-multiline']"> +<screen><emphasis role="bold"><xsl:copy-of select="text()"/></emphasis></screen> +<xsl:copy-of select="following-sibling::para[@role='example']"/> +</xsl:for-each> </refsect1> </refentry> </xsl:template> diff --git a/docs/rabbitmq.conf.5.xml b/docs/rabbitmq.conf.5.xml index dcb1e49c..34f20f92 100644 --- a/docs/rabbitmq.conf.5.xml +++ b/docs/rabbitmq.conf.5.xml @@ -9,7 +9,7 @@ </refentryinfo> <refmeta> - <refentrytitle>/etc/rabbitmq/rabbitmq.conf</refentrytitle> + <refentrytitle>rabbitmq.conf</refentrytitle> <manvolnum>5</manvolnum> <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> </refmeta> @@ -59,11 +59,11 @@ environment variable names, with the <envar>RABBITMQ_</envar> prefix removed: <filename>/etc/rabbitmq/rabbitmq.conf</filename> file, etc. </para> <para role="example-prefix">For example:</para> - <screen role="example"> - # I am a complete /etc/rabbitmq/rabbitmq.conf file. - # Comment lines start with a hash character. - # This is a /bin/sh script file - use ordinary envt var syntax - NODENAME=hare + <screen role="example-multiline"> +# I am a complete /etc/rabbitmq/rabbitmq.conf file. +# Comment lines start with a hash character. +# This is a /bin/sh script file - use ordinary envt var syntax +NODENAME=hare </screen> <para role="example"> This is an example of a complete diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index e7fc45e4..7634b2d2 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -65,7 +65,7 @@ <title>Options</title> <variablelist> <varlistentry> - <term><option>-n</option> <replaceable>node</replaceable></term> + <term><cmdsynopsis><arg choice="opt">-n <replaceable>node</replaceable></arg></cmdsynopsis></term> <listitem> <para role="usage"> Default node is "rabbit@server", where server is the local host. On @@ -79,7 +79,7 @@ </listitem> </varlistentry> <varlistentry> - <term><option>-q</option></term> + <term><cmdsynopsis><arg choice="opt">-q</arg></cmdsynopsis></term> <listitem> <para role="usage"> Quiet output mode is selected with the "-q" flag. Informational diff --git a/docs/usage.xsl b/docs/usage.xsl index 841b2a84..72f8880a 100644 --- a/docs/usage.xsl +++ b/docs/usage.xsl @@ -12,19 +12,17 @@ encoding="UTF-8" indent="no"/> <xsl:strip-space elements="*"/> -<xsl:preserve-space elements="term" /> +<xsl:preserve-space elements="cmdsynopsis arg" /> <xsl:template match="/"> <!-- Pull out cmdsynopsis to show the command usage line. -->%% Generated, do not edit! -module(<xsl:value-of select="$modulename" />). -export([usage/0]). -usage() -> io:format(%QUOTE%Usage: +usage() -> %QUOTE%Usage: <xsl:value-of select="refentry/refsynopsisdiv/cmdsynopsis/command"/> <xsl:text> </xsl:text> <xsl:for-each select="refentry/refsynopsisdiv/cmdsynopsis/arg"> - <xsl:if test="@choice='opt'">[</xsl:if> <xsl:apply-templates select="." /> - <xsl:if test="@choice='opt'">]</xsl:if> <xsl:text> </xsl:text> </xsl:for-each> @@ -60,7 +58,7 @@ usage() -> io:format(%QUOTE%Usage: </xsl:for-each> <xsl:apply-templates select=".//*[title='Commands']/refsect2" mode="command-usage" /> -%QUOTE%), halt(1). +%QUOTE%. </xsl:template> <!-- Option lists in command usage --> @@ -74,7 +72,7 @@ usage() -> io:format(%QUOTE%Usage: <!-- Don't show anything else in command usage --> <xsl:template match="text()" mode="command-usage"/> -<xsl:template match="option">[<xsl:apply-templates/>]</xsl:template> +<xsl:template match="arg[@choice='opt']">[<xsl:apply-templates/>]</xsl:template> <xsl:template match="replaceable"><<xsl:value-of select="."/>></xsl:template> </xsl:stylesheet> diff --git a/generate_deps b/generate_deps index 916006d1..29587b5a 100644 --- a/generate_deps +++ b/generate_deps @@ -23,10 +23,11 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> ok; (Path, Dep, ok) -> Module = filename:basename(Path, ".erl"), - ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam: ", + Path]), ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, ok, Dep), - file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + file:write(Hdl, ["\n"]) end, ok, Deps), ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), ok = file:sync(Hdl), @@ -35,7 +36,8 @@ main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), lists:foldl( - fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) -> + fun ({attribute, _LineNumber, Attribute, Behaviour}, Deps) + when Attribute =:= behaviour orelse Attribute =:= behavior -> case sets:is_element(Behaviour, Modules) of true -> sets:add_element( [EbinDir, "/", atom_to_list(Behaviour), ".beam"], diff --git a/include/rabbit.hrl b/include/rabbit.hrl index e2980eff..552aec67 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,7 +62,8 @@ -record(listener, {node, protocol, host, port}). --record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(basic_message, {exchange_name, routing_key, content, guid, + is_persistent}). -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). @@ -83,6 +84,7 @@ -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). -type(regexp() :: binary()). +-type(file_path() :: string()). %% this is really an abstract type, but dialyzer does not support them -type(guid() :: any()). @@ -144,7 +146,8 @@ #basic_message{exchange_name :: exchange_name(), routing_key :: routing_key(), content :: content(), - persistent_key :: maybe(pkey())}). + guid :: guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), @@ -154,7 +157,7 @@ message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). --type(msg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). +-type(qmsg() :: {queue_name(), pid(), msg_id(), boolean(), message()}). -type(listener() :: #listener{node :: erlang_node(), protocol :: atom(), @@ -166,6 +169,7 @@ #amqp_error{name :: atom(), explanation :: string(), method :: atom()}). + -endif. %%---------------------------------------------------------------------------- @@ -173,6 +177,11 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(MAX_WAIT, 16#ffffffff). + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl new file mode 100644 index 00000000..9864f1eb --- /dev/null +++ b/include/rabbit_exchange_type_spec.hrl @@ -0,0 +1,42 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). +-spec(validate/1 :: (exchange()) -> 'ok'). +-spec(create/1 :: (exchange()) -> 'ok'). +-spec(recover/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(delete/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). +-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok'). + +-endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index 199a0f89..1a979899 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -56,5 +56,5 @@ -type(password() :: binary()). -type(vhost() :: binary()). -type(ctag() :: binary()). --type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(exchange_type() :: atom()). -type(binding_key() :: binary()). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index bc5b58ca..74a1800a 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -39,11 +39,7 @@ prepare: cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare - rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ - --target i386 - rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \ - --define '_libdir /usr/lib64' --define '_arch x86_64' \ - --define '_defaultdocdir /usr/share/doc' --target x86_64 + rpmbuild -ba --nodeps SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) clean: rm -rf SOURCES SPECS RPMS SRPMS BUILD tmp diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index c8aede3e..b052d889 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -10,7 +10,9 @@ Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate Source4: rabbitmq-asroot-script-wrapper +Source5: rabbitmq-server.ocf URL: http://www.rabbitmq.com/ +BuildArch: noarch BuildRequires: erlang, python-simplejson, xmlto, libxslt Requires: erlang, logrotate BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root @@ -23,10 +25,12 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} -%define _rabbit_libdir %{_libdir}/rabbitmq +# We want to install into /usr/lib, even on 64-bit platforms +%define _rabbit_libdir %{_exec_prefix}/lib/rabbitmq +%define _rabbit_erllibdir %{_rabbit_libdir}/lib/rabbitmq_server-%{version} %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` +%define _rabbit_server_ocf %{_builddir}/`basename %{S:5}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -35,9 +39,8 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} -sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} cp %{S:4} %{_rabbit_asroot_wrapper} -sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} +cp %{S:5} %{_rabbit_server_ocf} make %{?_smp_mflags} %install @@ -57,6 +60,7 @@ install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-deactivate-plugins +install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf new file mode 100755 index 00000000..97c58ea2 --- /dev/null +++ b/packaging/common/rabbitmq-server.ocf @@ -0,0 +1,362 @@ +#!/bin/sh +## +## OCF Resource Agent compliant rabbitmq-server resource script. +## + +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2010 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2010 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +## OCF instance parameters +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args + +####################################################################### +# Initialization: + +. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs + +####################################################################### + +OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi" +OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" +OCF_RESKEY_nodename_default="rabbit@localhost" +OCF_RESKEY_log_base_default="/var/log/rabbitmq" +: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}} +: ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} +: ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} +: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} + +meta_data() { + cat <<END +<?xml version="1.0"?> +<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> +<resource-agent name="rabbitmq-server"> +<version>1.0</version> + +<longdesc lang="en"> +Resource agent for RabbitMQ-server +</longdesc> + +<shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc> + +<parameters> +<parameter name="multi" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmq-multi script +</longdesc> +<shortdesc lang="en">Path to rabbitmq-multi</shortdesc> +<content type="string" default="${OCF_RESKEY_multi_default}" /> +</parameter> + +<parameter name="ctl" unique="0" required="0"> +<longdesc lang="en"> +The path to the rabbitmqctl script +</longdesc> +<shortdesc lang="en">Path to rabbitmqctl</shortdesc> +<content type="string" default="${OCF_RESKEY_ctl_default}" /> +</parameter> + +<parameter name="nodename" unique="0" required="0"> +<longdesc lang="en"> +The node name for rabbitmq-server +</longdesc> +<shortdesc lang="en">Node name</shortdesc> +<content type="string" default="${OCF_RESKEY_nodename_default}" /> +</parameter> + +<parameter name="ip" unique="0" required="0"> +<longdesc lang="en"> +The IP address for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Address</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="port" unique="0" required="0"> +<longdesc lang="en"> +The IP Port for rabbitmq-server to listen on +</longdesc> +<shortdesc lang="en">IP Port</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="cluster_config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the cluster config file +</longdesc> +<shortdesc lang="en">Cluster config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="config_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the config file +</longdesc> +<shortdesc lang="en">Config file path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="log_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which logs will be created +</longdesc> +<shortdesc lang="en">Log base path</shortdesc> +<content type="string" default="${OCF_RESKEY_log_base_default}" /> +</parameter> + +<parameter name="mnesia_base" unique="0" required="0"> +<longdesc lang="en"> +Location of the directory under which mnesia will store data +</longdesc> +<shortdesc lang="en">Mnesia base path</shortdesc> +<content type="string" default="" /> +</parameter> + +<parameter name="server_start_args" unique="0" required="0"> +<longdesc lang="en"> +Additional arguments provided to the server on startup +</longdesc> +<shortdesc lang="en">Server start arguments</shortdesc> +<content type="string" default="" /> +</parameter> + +</parameters> + +<actions> +<action name="start" timeout="600" /> +<action name="stop" timeout="120" /> +<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="validate-all" timeout="30" /> +<action name="meta-data" timeout="5" /> +</actions> +</resource-agent> +END +} + +rabbit_usage() { + cat <<END +usage: $0 {start|stop|monitor|validate-all|meta-data} + +Expects to have a fully populated OCF RA-compliant environment set. +END +} + +RABBITMQ_MULTI=$OCF_RESKEY_multi +RABBITMQ_CTL=$OCF_RESKEY_ctl +RABBITMQ_NODENAME=$OCF_RESKEY_nodename +RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip +RABBITMQ_NODE_PORT=$OCF_RESKEY_port +RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file +RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file +RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base +RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base +RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +[ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" +[ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME + +export_vars() { + [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS + [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT + [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE + [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE + [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE + [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE + [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS +} + +rabbit_validate_partial() { + if [ ! -x $RABBITMQ_MULTI ]; then + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi + + if [ ! -x $RABBITMQ_CTL ]; then + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + return $OCF_ERR_ARGS; + fi +} + +rabbit_validate_full() { + if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + return $OCF_ERR_ARGS; + fi + + rabbit_validate_partial + + return $OCF_SUCCESS +} + +rabbit_status() { + local rc + $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null + rc=$? + case "$rc" in + 0) + return $OCF_SUCCESS + ;; + 2) + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + return $OCF_ERR_GENERIC + esac +} + +rabbit_start() { + local rc + + rabbit_validate_full + rc=$? + if [ "$rc" != $OCF_SUCCESS ]; then + return $rc + fi + + export_vars + + $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc + fi + + # Spin waiting for the server to come up. + # Let the CRM/LRM time us out if required + start_wait=1 + while [ $start_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_stop() { + local rc + $RABBITMQ_MULTI stop_all & + rc=$? + + if [ "$rc" != 0 ]; then + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc + fi + + # Spin waiting for the server to shut down. + # Let the CRM/LRM time us out if required + stop_wait=1 + while [ $stop_wait = 1 ]; do + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 + break + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + return $OCF_ERR_GENERIC + fi + sleep 2 + done + + return $OCF_SUCCESS +} + +rabbit_monitor() { + rabbit_status + return $? +} + +case $__OCF_ACTION in + meta-data) + meta_data + exit $OCF_SUCCESS + ;; + usage|help) + rabbit_usage + exit $OCF_SUCCESS + ;; +esac + +rabbit_validate_partial || exit + +case $__OCF_ACTION in + start) + rabbit_start + ;; + stop) + rabbit_stop + ;; + monitor) + rabbit_monitor + ;; + validate-all) + exit $OCF_SUCCESS + ;; + *) + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; +esac + +exit $?
\ No newline at end of file diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 3799c438..45659602 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -21,3 +21,4 @@ install/rabbitmq-server:: install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm + install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile index d5633955..0ef7dd5e 100644 --- a/packaging/macports/Makefile +++ b/packaging/macports/Makefile @@ -35,7 +35,7 @@ macports: dirs $(DEST)/Portfile for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ cp $(COMMON_DIR)/$$f $(DEST)/files ; \ done - sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -E -u rabbitmq -H /bin/sh -c|' \ + sed -i -e 's|@SU_RABBITMQ_SH_C@|SHELL=/bin/sh su -m rabbitmq -c|' \ $(DEST)/files/rabbitmq-script-wrapper cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files if [ -n "$(MACPORTS_USERHOST)" ] ; then \ diff --git a/src/pg_local.erl b/src/pg_local.erl index fa41fe46..1501331d 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -206,7 +206,7 @@ ensure_started() -> case whereis(?MODULE) of undefined -> C = {pg_local, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, + 16#ffffffff, worker, [?MODULE]}, supervisor:start_child(kernel_safe_sup, C); PgLocalPid -> {ok, PgLocalPid} diff --git a/src/rabbit.erl b/src/rabbit.erl index 35d3ce4a..259ac040 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,20 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). + +-rabbit_boot_step({rabbit_exchange_type_registry, + [{description, "exchange type registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_exchange_type_registry]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, - {mfa, {rabbit_sup, start_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_log]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, @@ -72,23 +91,18 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, - {requires, kernel_ready}, - {enables, core_initialized}]}). - -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, - {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_router]}}, {requires, kernel_ready}, {enables, core_initialized}]}). -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, - {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -104,18 +118,20 @@ {mfa, {rabbit_exchange, recover, []}}, {requires, empty_db_check}]}). --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}]}). -rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). + [{mfa, {rabbit_sup, start_child, + [rabbit_persister]}}, + {requires, queue_sup_queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, - {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_guid]}}, {requires, persister}, {enables, routing_ready}]}). @@ -187,15 +203,12 @@ stop() -> ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> - spawn(fun () -> - SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; " - "halting in ~p milliseconds~n", - [SleepTime]), - timer:sleep(SleepTime), - init:stop() - end), - case catch stop() of _ -> ok end. + try + stop() + after + init:stop() + end, + ok. status() -> [{running_applications, application:which_applications()}] ++ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 3b9eeec1..7e96d9a3 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -57,10 +57,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + true -> ok; + false -> rabbit_sup:start_restartable_child(vm_memory_monitor, + [MemoryWatermark]) end, ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f25d72e..f6278836 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -40,8 +40,8 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). +-export([notify_sent/2, unblock/2, flush_all/2]). +-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -63,7 +63,6 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -92,13 +91,13 @@ -spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). --spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). +-spec(rollback_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> @@ -107,6 +106,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -117,45 +117,47 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. - -recover_durable_queues() -> +find_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). + +recover_durable_queues(DurableQueues) -> + lists:foldl( + fun (RecoveredQ, Acc) -> Q = start_queue_process(RecoveredQ), %% We need to catch the case where a client connected to %% another node has deleted the queue (and possibly %% re-created it). case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end + fun () -> + case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, + read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + true -> [Q | Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -200,7 +202,7 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -284,18 +286,18 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). -commit_all(QPids, Txn) -> +commit_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end, QPids). -rollback_all(QPids, Txn) -> +rollback_all(QPids, Txn, ChPid) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -329,39 +331,53 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). + gen_server2:pcast(QPid, 7, {unblock, ChPid}). + +flush_all(QPids, ChPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, + QPids). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok - end - end). + case + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName) + end + end) of + Err = {error, _} -> Err; + PostHook -> + PostHook(), + ok + end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:fold( - fun (QueueName, Acc) -> - ok = rabbit_exchange:delete_transient_queue_bindings( - QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - Acc - end, - ok, - qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end). + [Hook() || + Hook <- rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)], + ok. + +delete_queue(QueueName) -> + Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Post. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4791f95..449e79ea 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -export([start_link/1, info_keys/0]). @@ -59,7 +57,7 @@ -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(tx, {is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumer_count, @@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; @@ -434,8 +431,7 @@ do_if_persistent(F, Txn, QName) -> lookup_tx(Txn) -> case get({txn, Txn}) of - undefined -> #tx{ch_pid = none, - is_persistent = false, + undefined -> #tx{is_persistent = false, pending_messages = [], pending_acks = []}; V -> V @@ -464,26 +460,19 @@ is_tx_persistent(Txn) -> record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], - ch_pid = ChPid}). + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), record_current_channel_tx(ChPid, Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], - ch_pid = ChPid}). - -process_pending(Txn, State) -> - #tx{ch_pid = ChPid, - pending_messages = PendingMessages, - pending_acks = PendingAcks} = lookup_tx(Txn), - case lookup_ch(ChPid) of - not_found -> ok; - C = #cr{unacked_messages = UAM} -> - {_Acked, Remaining} = - collect_messages(lists:append(PendingAcks), UAM), - store_ch_record(C#cr{unacked_messages = Remaining}) - end, + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending]}). + +process_pending(Txn, ChPid, State) -> + #tx{pending_messages = PendingMessages, pending_acks = PendingAcks} = + lookup_tx(Txn), + C = #cr{unacked_messages = UAM} = lookup_ch(ChPid), + {_Acked, Remaining} = collect_messages(lists:append(PendingAcks), UAM), + store_ch_record(C#cr{unacked_messages = Remaining}), deliver_or_enqueue_n(lists:reverse(PendingMessages), State). collect_messages(MsgIds, UAM) -> @@ -592,12 +581,13 @@ handle_call({deliver, Txn, Message, ChPid}, _From, State) -> {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({commit, Txn}, From, State) -> +handle_call({commit, Txn, ChPid}, From, State) -> ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue gen_server2:reply(From, ok), - NewState = process_pending(Txn, State), + NewState = process_pending(Txn, ChPid, State), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(NewState); handle_call({notify_down, ChPid}, _From, State) -> @@ -779,9 +769,10 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> noreply(State) end; -handle_cast({rollback, Txn}, State) -> +handle_cast({rollback, Txn, ChPid}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), + record_current_channel_tx(ChPid, none), noreply(State); handle_cast({redeliver, Messages}, State) -> @@ -826,7 +817,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({flush, ChPid}, State) -> + ok = rabbit_channel:flushed(ChPid, self()), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 0f3a8664..dbd65780 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_child/1]). -export([init/1]). @@ -42,6 +42,9 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). +start_child(Args) -> + supervisor:start_child(?SERVER, Args). + init([]) -> {ok, {{simple_one_for_one, 10, 10}, [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72..4ab7a2a0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 585c59dc..7d3cd722 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public Licenses +%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -45,11 +45,8 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, - most_recently_declared_queue, consumer_mapping}). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). + username, virtual_host, most_recently_declared_queue, + consumer_mapping, blocking}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -75,8 +72,9 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -110,7 +108,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). + +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). list() -> pg_local:get_members(rabbit_channels). @@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}, + consumer_mapping = dict:new(), + blocking = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; +handle_cast({flushed, QPid}, State) -> + {noreply, queue_blocked(QPid, State)}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -215,7 +220,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; +handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -331,6 +338,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +queue_blocked(QPid, State = #ch{blocking = Blocking}) -> + case dict:find(QPid, Blocking) of + error -> State; + {ok, MRef} -> true = erlang:demonitor(MRef), + Blocking1 = dict:erase(QPid, Blocking), + ok = case dict:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + State#ch{blocking = Blocking1} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -362,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -540,25 +559,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - unacked_message_q = UAMQ }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; + _, State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> undefined; + {undefined, _} -> start_limiter(State); + {_, _} -> LimiterPid + end, + LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of + ok -> LimiterPid1; + stopped -> unlimit_queues(State) + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -791,9 +802,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'channel.flow'{active = _}, _, State) -> - %% FIXME: implement - {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; + +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + ok = rabbit_limiter:block(LimiterPid1), + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + case Queues of + [] -> {reply, #'channel.flow_ok'{active = false}, State}; + _ -> {noreply, State#ch{limiter_pid = LimiterPid1, + blocking = dict:from_list(Queues)}} + end; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we @@ -895,7 +928,7 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> ok = notify_limiter(State#ch.limiter_pid, State#ch.uncommitted_ack_q), new_tx(State); @@ -912,7 +945,7 @@ internal_rollback(State = #ch{transaction_id = TxnKey, queue:len(UAQ), queue:len(UAMQ)]), case rabbit_amqqueue:rollback_all(sets:to_list(Participants), - TxnKey) of + TxnKey, self()) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> rabbit_misc:protocol_error( @@ -928,21 +961,27 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). +start_limiter(State = #ch{unacked_message_q = UAMQ}) -> + LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid. + notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). +unlimit_queues(State) -> + ok = limit_queues(undefined, State), + undefined. + limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). @@ -972,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6f564b1d..d1834b3b 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -46,6 +46,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(action/4 :: (atom(), erlang_node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -129,8 +130,9 @@ parse_args([], _) -> stop() -> ok. -usage() -> - rabbitmqctl_usage:usage(). +usage() -> + io:format("~s", [rabbit_ctl_usage:usage()]), + halt(1). action(stop, Node, [], Inform) -> Inform("Stopping and halting node ~p", [Node]), diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620..f19e8d02 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 832acd16..1cfba00e 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,7 +30,6 @@ %% -module(rabbit_exchange). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -40,7 +39,7 @@ -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). +-export([check_type/1, assert_type/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -49,7 +48,6 @@ -import(mnesia). -import(sets). -import(lists). --import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -82,10 +80,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -100,17 +96,37 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - ok = rabbit_misc:table_foreach( - fun(Exchange) -> ok = mnesia:write(rabbit_exchange, - Exchange, write) - end, rabbit_durable_exchange), - ok = rabbit_misc:table_foreach( - fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write) - end, rabbit_durable_route). + Exs = rabbit_misc:table_fold( + fun(Exchange, Acc) -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + [Exchange | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_misc:table_fold( + fun(Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route), + recover_with_bindings(Bs, Exs), + ok. + +recover_with_bindings(Bs, Exs) -> + recover_with_bindings( + lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#exchange.name, Exs), []). + +recover_with_bindings([B = #binding{exchange_name = Name} | Rest], + Xs = [#exchange{name = Name} | _], + Bindings) -> + recover_with_bindings(Rest, Xs, [B | Bindings]); +recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> + (type_to_module(Type)):recover(X, Bindings), + recover_with_bindings(Bs, Xs, []); +recover_with_bindings([], [], []) -> + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), - if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); - true -> ok - end, - Exchange; - [ExistingX] -> ExistingX - end - end). + %% We want to upset things if it isn't ok; this is different from + %% the other hooks invocations, where we tend to ignore the return + %% value. + TypeModule = type_to_module(Type), + ok = TypeModule:validate(Exchange), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = case Durable of + true -> + mnesia:write(rabbit_durable_exchange, + Exchange, write); + false -> + ok + end, + {new, Exchange}; + [ExistingX] -> + {existing, ExistingX} + end + end) of + {new, X} -> TypeModule:create(X), + X; + {existing, X} -> X; + Err -> Err + end. -check_type(<<"fanout">>) -> - fanout; -check_type(<<"direct">>) -> - direct; -check_type(<<"topic">>) -> - topic; -check_type(<<"headers">>) -> - headers; -check_type(T) -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]). +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + case rabbit_exchange_type_registry:lookup_module(T) of + {ok, Module} -> Module; + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]) + end. + +%% Used with binaries sent over the wire; the type may not exist. +check_type(TypeBin) -> + case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown exchange type '~s'", [TypeBin]); + T -> + _Module = type_to_module(T), + T + end. assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> @@ -157,7 +195,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of - {ok, X} -> X; + {ok, X} -> X; {error, not_found} -> rabbit_misc:not_found(Name) end. @@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of +publish(X = #exchange{type = Type}, Seen, Delivery) -> + case (type_to_module(Type)):publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{ AName -> NewSeen = [XName | Seen], case lists:member(AName, NewSeen) of - true -> - R; - false -> - case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end + true -> R; + false -> case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, Delivery); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end end end; R -> R end. -%% return the list of qpids to which a message with a given routing -%% key, sent to a particular exchange, should be delivered. -%% -%% The function ensures that a qpid appears in the return list exactly -%% as many times as a message should be delivered to it. With the -%% current exchange types that is at most once. -route(X = #exchange{type = topic}, RoutingKey, _Content) -> - match_bindings(X, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); - -route(X = #exchange{type = headers}, _RoutingKey, Content) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, - match_bindings(X, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - match_routing_key(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - match_routing_key(X, RoutingKey). - -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(#exchange{name = Name}, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(#exchange{name = Name}, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), - ok = delete_forward_routes(Route) + ok = delete_forward_routes(Route), + Route#route.binding end || Route <- mnesia:match_object( rabbit_route, #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - write)], - ok. + write)]. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), - [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], - ok. + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Cleanup = cleanup_deleted_queue_bindings( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + no_delete -> Module:remove_bindings(X, Bs) + end + end, Cleanup) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% cleanup_deleted_queue_bindings1) works properly. +cleanup_deleted_queue_bindings([], Acc) -> + Acc; +cleanup_deleted_queue_bindings( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). + +cleanup_deleted_queue_bindings( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); +cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> + %% either Deleted is [], or its head has a non-matching ExchangeName + NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], + cleanup_deleted_queue_bindings(Deleted, NewAcc). + +cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + {maybe_auto_delete(X), Bindings}. + delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -340,15 +340,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + case mnesia:read(rabbit_route, B) of + [] -> + sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end + end + end) of + {new, Exchange = #exchange{ type = Type }, Binding} -> + (type_to_module(Type)):add_binding(Exchange, Binding); + {existing, _, _} -> + ok; + Err = {error, _} -> + Err + end. delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} + end + end) of + Err = {error, _} -> + Err; + {{Action, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case Action of + auto_delete -> Module:delete(X, [B]); + no_delete -> Module:remove_bindings(X, [B]) + end + end. binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}) + Fun(X, Q, #binding{ + exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = rabbit_misc:sort_field_table(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -440,8 +455,8 @@ list_bindings(VHostPath) -> rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + _ = '_'}, + _ = '_'})]. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); @@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; + queue_name = Queue, + key = Key, + args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and {add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -delete(ExchangeName, _IfUnused = true) -> - call_with_exchange(ExchangeName, fun conditional_delete/1); -delete(ExchangeName, _IfUnused = false) -> - call_with_exchange(ExchangeName, fun unconditional_delete/1). - -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; + queue_name = Queue, + key = Key, + args = Args}. + +delete(ExchangeName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + case call_with_exchange(ExchangeName, Fun) of + {deleted, X = #exchange{type = Type}, Bs} -> + (type_to_module(Type)):delete(X, Bs), + ok; + Error = {error, _InUseOrNotFound} -> + Error + end. + +maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> + {no_delete, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. + case conditional_delete(Exchange) of + {error, in_use} -> {no_delete, Exchange}; + {deleted, Exchange, []} -> {auto_deleted, Exchange} + end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of + case contains(rabbit_route, Match) orelse + contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. -unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_exchange_bindings(ExchangeName), +unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> + Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({rabbit_exchange, ExchangeName}), + {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl new file mode 100644 index 00000000..a8c071e6 --- /dev/null +++ b/src/rabbit_exchange_type.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {description, 0}, + {publish, 2}, + + %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} + {validate, 1}, + + %% called after declaration when previously absent + {create, 1}, + + %% called when recovering + {recover, 2}, + + %% called after exchange deletion. + {delete, 2}, + + %% called after a binding has been added + {add_binding, 2}, + + %% called after bindings have been deleted. + {remove_bindings, 2} + + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl new file mode 100644 index 00000000..9b71e0e1 --- /dev/null +++ b/src/rabbit_exchange_type_direct.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_direct). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type direct"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"direct">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"direct">>}, + {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), + Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl new file mode 100644 index 00000000..311654ab --- /dev/null +++ b/src/rabbit_exchange_type_fanout.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_fanout). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type fanout"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"fanout">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"fanout">>}, + {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl new file mode 100644 index 00000000..285dab1a --- /dev/null +++ b/src/rabbit_exchange_type_headers.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_headers). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type headers"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"headers">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-ifdef(use_specs). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-endif. + +description() -> + [{name, <<"headers">>}, + {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{content = Content}}) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> rabbit_misc:sort_field_table(H) + end, + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end), + Delivery). + +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort +%% (rabbit_misc:sort_field_table) that route/3 and +%% rabbit_exchange:{add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl new file mode 100644 index 00000000..175d15ad --- /dev/null +++ b/src/rabbit_exchange_type_registry.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_registry). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/2, binary_to_type/1, lookup_module/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}). +-spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}). +-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}). + +-endif. + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName}). + +%% This is used with user-supplied arguments (e.g., on exchange +%% declare), so we restrict it to existing atoms only. This means it +%% can throw a badarg, indicating that the type cannot have been +%% registered. +binary_to_type(TypeBin) when is_binary(TypeBin) -> + case catch list_to_existing_atom(binary_to_list(TypeBin)) of + {'EXIT', {badarg, _}} -> {error, not_found}; + TypeAtom -> TypeAtom + end. + +lookup_module(T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, T) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +%%--------------------------------------------------------------------------- + +internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +internal_register(TypeName, ModuleName) + when is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(ModuleName), + true = ets:insert(?ETS_NAME, + {internal_binary_to_type(TypeName), ModuleName}), + ok. + +sanity_check_module(Module) -> + case catch lists:member(rabbit_exchange_type, + lists:flatten( + [Bs || {Attr, Bs} <- + Module:module_info(attributes), + Attr =:= behavior orelse + Attr =:= behaviour])) of + {'EXIT', {undef, _}} -> {error, not_module}; + false -> {error, not_exchange_type}; + true -> ok + end. + +%%--------------------------------------------------------------------------- + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + {ok, none}. + +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), + {reply, ok, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl new file mode 100644 index 00000000..8a3dceea --- /dev/null +++ b/src/rabbit_exchange_type_topic.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_topic). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type topic"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"topic">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-export([topic_matches/2]). + +-ifdef(use_specs). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-endif. + +description() -> + [{name, <<"topic">>}, + {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end), + Delivery). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) + when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or + last_topic_match(P, [BacktrackNext | R], BacktrackList). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183f..878af029 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -47,12 +47,14 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -113,6 +117,17 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -120,29 +135,45 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + case maybe_notify(State, State#lim{blocked = false}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -164,14 +195,21 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> notify_queues(NewState); - false -> NewState + case (limit_reached(OldState) orelse is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse is_blocked(NewState)) of + true -> NewState1 = notify_queues(NewState), + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont + end, NewState1}; + false -> {cont, NewState} end. limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), @@ -209,3 +247,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0654f58a..2c180846 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -43,21 +43,24 @@ -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). +-export([start_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). --export([table_foreach/2]). +-export([table_fold/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -95,9 +98,10 @@ undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). +-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok'). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -114,23 +118,31 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). +-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -218,6 +230,10 @@ enable_cover(Root) -> _ -> ok end. +start_cover(NodesS) -> + {ok, _} = cover:start([makenode(N) || N <- NodesS]), + ok. + report_cover() -> report_cover("."). @@ -305,7 +321,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. @@ -357,20 +373,20 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% For each entry in a table, execute a function in a transaction. -%% This is often far more efficient than wrapping a tx around the lot. +%% Fold over each entry in a table, executing the cons function in a +%% transaction. This is often far more efficient than wrapping a tx +%% around the lot. %% %% We ignore entries that have been modified or removed. -table_foreach(F, TableName) -> - lists:foreach( - fun (E) -> execute_mnesia_transaction( +table_fold(F, Acc0, TableName) -> + lists:foldl( + fun (E, Acc) -> execute_mnesia_transaction( fun () -> case mnesia:match_object(TableName, E, read) of - [] -> ok; - _ -> F(E) + [] -> Acc; + _ -> F(E, Acc) end end) - end, dirty_read_all(TableName)), - ok. + end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -504,6 +520,10 @@ queue_fold(Fun, Init, Q) -> {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. +%% Sorts a list of AMQP table fields as per the AMQP spec +sort_field_table(Arguments) -> + lists:keysort(1, Arguments). + %% This provides a string representation of a pid that is the same %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. @@ -595,3 +615,46 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74..55a6761d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 5e0a3af1..336f74bf 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -42,6 +42,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -51,7 +52,7 @@ start() -> RpcTimeout = case init:get_argument(maxwait) of {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> 30000 + _ -> ?MAX_WAIT end, case init:get_plain_arguments() of [] -> @@ -85,8 +86,9 @@ parse_args([Command | Args]) -> stop() -> ok. -usage() -> - rabbitmqmulti_usage:usage(). +usage() -> + io:format("~s", [rabbit_multi_usage:usage()]), + halt(1). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index cf04f05b..7978573d 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -117,15 +117,25 @@ start() -> transient, infinity, supervisor, [tcp_client_sup]}), ok. +getaddr(Host) -> + %% inet_parse:address takes care of ip string, like "0.0.0.0" + %% inet:getaddr returns immediately for ip tuple {0,0,0,0}, + %% and runs 'inet_gethost' port process for dns lookups. + %% On Windows inet:getaddr runs dns resolver for ip string, which may fail. + case inet_parse:address(Host) of + {ok, IPAddress1} -> IPAddress1; + {error, _} -> + case inet:getaddr(Host, inet) of + {ok, IPAddress2} -> IPAddress2; + {error, Reason} -> + error_logger:error_msg("invalid host ~p - ~p~n", + [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}) + end + end. + check_tcp_listener_address(NamePrefix, Host, Port) -> - IPAddress = - case inet:getaddr(Host, inet) of - {ok, IPAddress1} -> IPAddress1; - {error, Reason} -> - error_logger:error_msg("invalid host ~p - ~p~n", - [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}) - end, + IPAddress = getaddr(Host), if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), @@ -157,7 +167,7 @@ start_listener(Host, Port, Label, OnConnect) -> ok. stop_tcp_listener(Host, Port) -> - {ok, IPAddress} = inet:getaddr(Host, inet), + IPAddress = getaddr(Host), Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 019d2a26..53335a6f 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -53,7 +53,7 @@ -define(MAX_WRAP_ENTRIES, 500). --define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 5}). -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, @@ -64,24 +64,24 @@ %% the other maps a key to one or more queues. %% The aim is to reduce the overload of storing a message multiple times %% when it appears in several queues. --record(psnapshot, {serial, transactions, messages, queues}). +-record(psnapshot, {serial, transactions, messages, queues, next_seq_id}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok'). -spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: (txn()) -> 'ok'). --spec(rollback_transaction/1 :: (txn()) -> 'ok'). +-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). +-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok'). -spec(force_snapshot/0 :: () -> 'ok'). -spec(serial/0 :: () -> non_neg_integer()). @@ -128,7 +128,8 @@ init(_Args) -> Snapshot = #psnapshot{serial = 0, transactions = dict:new(), messages = ets:new(messages, []), - queues = ets:new(queues, [])}, + queues = ets:new(queues, []), + next_seq_id = 0}, LogHandle = case disk_log:open([{name, rabbit_persister}, {head, current_snapshot(Snapshot)}, @@ -153,12 +154,12 @@ init(_Args) -> rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], pending_replies = [], - snapshot = NewSnapshot}, + snapshot = NewSnapshot}, {ok, State}. handle_call({transaction, Key, MessageList}, From, State) -> @@ -236,8 +237,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline, %% "tied" is met. log_work(CreateWorkUnit, MessageList, State = #pstate{ - snapshot = Snapshot = #psnapshot{ - messages = Messages}}) -> + snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( fun(M = {publish, Message, QK = {_QName, PKey}}) -> @@ -343,20 +343,22 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, pending_logs = [], pending_replies = []}. -current_snapshot(_Snapshot = #psnapshot{serial = Serial, - transactions= Ts, - messages = Messages, - queues = Queues}) -> +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions = Ts, + messages = Messages, + queues = Queues, + next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore prune_table(Messages, ets:foldl( - fun ({{_QName, PKey}, _Delivered}, S) -> + fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues)), InnerSnapshot = {{serial, Serial}, {txns, Ts}, {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}}, + {queues, ets:tab2list(Queues)}, + {next_seq_id, NextSeqId}}, ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, term_to_binary(InnerSnapshot)}. @@ -380,14 +382,15 @@ internal_load_snapshot(LogHandle, {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), case check_version(Loaded_Snapshot) of {ok, StateBin} -> - {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = - binary_to_term(StateBin), + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}, + {next_seq_id, NextSeqId}} = binary_to_term(StateBin), true = ets:insert(Messages, Ms), true = ets:insert(Queues, Qs), Snapshot1 = replay(Items, LogHandle, K, Snapshot#psnapshot{ serial = Serial, - transactions = Ts}), + transactions = Ts, + next_seq_id = NextSeqId}), Snapshot2 = requeue_messages(Snapshot1), %% uncompleted transactions are discarded - this is TRTTD %% since we only get into this code on node restart, so @@ -406,7 +409,10 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered, SeqId}, Acc) -> + rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc) + end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( rabbit_misc:upmap( @@ -416,8 +422,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, fun ({QName, Requeues}) -> requeue(QName, Requeues, Messages) end, dict:to_list(Work))), - NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], - NewQueues = [{QK, D} || {QK, _M, D} <- L], + NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L], + NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L], ets:delete_all_objects(Messages), ets:delete_all_objects(Queues), true = ets:insert(Messages, NewMessages), @@ -425,19 +431,12 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, %% contains the mutated messages and queues tables Snapshot. -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> RequeueMessages = - [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + [{SeqId, QName, PKey, Message, Delivered} || + {SeqId, PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -447,7 +446,7 @@ requeue(QName, Requeues, Messages) -> %% per-channel basis, and channels are bound to specific %% processes, sorting the list does provide the correct %% ordering properties. - [{Message, Delivered} || {_, Message, Delivered} <- + [{Message, Delivered} || {_, _, _, Message, Delivered} <- lists:sort(RequeueMessages)]), RequeueMessages; {error, not_found} -> @@ -474,50 +473,55 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; internal_integrate1({commit_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues}) -> + messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> case dict:find(Key, Transactions) of {ok, MessageLists} -> ?LOGDEBUG("persist committing txn ~p~n", [Key]), - lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, - lists:reverse(MessageLists)), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + NextSeqId = + lists:foldr( + fun (ML, SeqIdN) -> + perform_work(ML, Messages, Queues, SeqIdN) end, + SeqId, MessageLists), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), + next_seq_id = NextSeqId}; error -> Snapshot end; internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot {messages = Messages, - queues = Queues}) -> - perform_work(MessageList, Messages, Queues), - Snapshot. - -perform_work(MessageList, Messages, Queues) -> - lists:foreach( - fun (Item) -> perform_work_item(Item, Messages, Queues) end, - MessageList). - -perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> - ets:insert(Messages, {PKey, Message}), - ets:insert(Queues, {QK, false}); - -perform_work_item({tied, QK}, _Messages, Queues) -> - ets:insert(Queues, {QK, false}); - -perform_work_item({deliver, QK}, _Messages, Queues) -> - %% from R12B-2 onward we could use ets:update_element/3 here - ets:delete(Queues, QK), - ets:insert(Queues, {QK, true}); - -perform_work_item({ack, QK}, _Messages, Queues) -> - ets:delete(Queues, QK). + Snapshot = #psnapshot{messages = Messages, + queues = Queues, + next_seq_id = SeqId}) -> + Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, + Queues, SeqId)}. + +perform_work(MessageList, Messages, Queues, SeqId) -> + lists:foldl(fun (Item, NextSeqId) -> + perform_work_item(Item, Messages, Queues, NextSeqId) + end, SeqId, MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, + Messages, Queues, NextSeqId) -> + true = ets:insert(Messages, {PKey, Message}), + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, NextSeqId}), + NextSeqId + 1; + +perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> + true = ets:update_element(Queues, QK, {2, true}), + NextSeqId; + +perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> + true = ets:delete(Queues, QK), + NextSeqId. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1a4830e1..5cf519b7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,6 +39,8 @@ -export([init/1, mainloop/3]). +-export([server_properties/0]). + -export([analyze_frame/2]). -import(gen_tcp). @@ -133,6 +135,7 @@ -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(server_properties/0 :: () -> amqp_table()). -endif. @@ -198,6 +201,16 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +server_properties() -> + {ok, Product} = application:get_key(rabbit, id), + {ok, Version} = application:get_key(rabbit, vsn), + [{list_to_binary(K), longstr, list_to_binary(V)} || + {K, V} <- [{"product", Product}, + {"version", Version}, + {"platform", "Erlang/OTP"}, + {"copyright", ?COPYRIGHT_MESSAGE}, + {"information", ?INFORMATION_MESSAGE}]]. + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> @@ -510,21 +523,12 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, case check_version({ProtocolMajor, ProtocolMinor}, {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of true -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), ok = send_on_channel0( Sock, #'connection.start'{ version_major = ?PROTOCOL_VERSION_MAJOR, version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- - [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]], + server_properties = server_properties(), mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }), {State#v1{connection = Connection#connection{ diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl new file mode 100644 index 00000000..06d59249 --- /dev/null +++ b/src/rabbit_restartable_sup.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_restartable_sup). + +-behaviour(supervisor). + +-export([start_link/2]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link(Name, {_M, _F, _A} = Fun) -> + supervisor:start_link({local, Name}, ?MODULE, [Fun]). + +init([{Mod, _F, _A} = Fun]) -> + {ok, {{one_for_one, 10, 10}, + [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ee2b82c5..a449e19e 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,12 +30,15 @@ %% -module(rabbit_router). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2]). + deliver/2, + match_bindings/2, + match_routing_key/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -73,13 +76,9 @@ deliver(QPids, Delivery) -> %% which then in turn delivers it to its queues. deliver_per_node( dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), + lists:foldl(fun (QPid, D) -> + rabbit_misc:dict_cons(node(QPid), QPid, D) + end, dict:new(), QPids)), Delivery). deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> @@ -129,6 +128,46 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(Name, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(Name, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index a1b89481..2c5e5112 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,10 +33,13 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2]). +-export([start_link/0, start_child/1, start_child/2, start_child/3, + start_restartable_child/1, start_restartable_child/2]). -export([init/1]). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). start_link() -> @@ -46,10 +49,25 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]}), + ok. + +start_restartable_child(Mod) -> + start_restartable_child(Mod, []). + +start_restartable_child(Mod, Args) -> + Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), + {ok, _} = supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]}), ok. init([]) -> - {ok, {{one_for_one, 10, 10}, []}}. + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 51d95c35..d645d183 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -330,7 +330,8 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), + list_to_binary(R)) of Expected -> passed; _ -> @@ -624,8 +625,12 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), %% NB: this will log an inconsistent_database error, which is harmless + %% Turning cover on / off is OK even if we're not in general using cover, + %% it just turns the engine on / off, doesn't actually log anything. + cover:stop([SecondaryNode]), true = disconnect_node(SecondaryNode), pong = net_adm:ping(SecondaryNode), + cover:start([SecondaryNode]), %% leaving a cluster as a ram node ok = control_action(reset, []), diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 0fe15426..493925ef 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -63,4 +63,4 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, OnStartup, OnShutdown, Label]}, - transient, 100, worker, [tcp_listener]}]}}. + transient, 16#ffffffff, worker, [tcp_listener]}]}}. diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 00000000..97e07545 --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,155 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, submit_async/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, + hibernate}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }, + hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, {next_free, From}}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { pending = Pending1 } + end, hibernate}; + +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 00000000..4ded63a8 --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 00000000..d3a48119 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, submit_async/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |