diff options
author | Michael Bridgen <mikeb@lshift.net> | 2010-01-29 17:39:22 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2010-01-29 17:39:22 +0000 |
commit | f48331a165fb2e9944a30f89403ad60b97c1b97e (patch) | |
tree | 77c72994de62ac8eb532e8683877a8d252b2c71b | |
parent | a62d4e224c7fc44f389960f21f9612a5d493b39a (diff) | |
parent | edec4ef88293af59612aacc8fc5a8d91d616027b (diff) | |
download | rabbitmq-server-f48331a165fb2e9944a30f89403ad60b97c1b97e.tar.gz |
Merge from default to get an abundance of fixes; including (this is the
only non-conflict resolution) pre->enables and post->requires.
42 files changed, 407 insertions, 216 deletions
@@ -4,6 +4,7 @@ syntax: glob *.swp *.patch erl_crash.dump +deps.mk syntax: regexp ^cover/ @@ -19,6 +20,7 @@ syntax: regexp ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ ^packaging/debs/apt-repository/debian$ +^packaging/macports/macports$ ^packaging/generic-unix/rabbitmq-server-generic-unix-.*\.tar\.gz$ ^packaging/windows/rabbitmq-server-windows-.*\.zip$ @@ -6,12 +6,14 @@ RABBITMQ_SERVER_START_ARGS ?= RABBITMQ_MNESIA_DIR ?= $(TMPDIR)/rabbitmq-$(RABBITMQ_NODENAME)-mnesia RABBITMQ_LOG_BASE ?= $(TMPDIR) +DEPS_FILE=deps.mk SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) -BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS) +INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl +BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) +TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod)) @@ -58,16 +60,13 @@ ERL_EBIN=erl -noinput -pa $(EBIN_DIR) all: $(TARGETS) +$(DEPS_FILE): $(SOURCES) $(INCLUDES) + escript generate_deps $(INCLUDE_DIR) $(SOURCE_DIR) \$$\(EBIN_DIR\) $@ + $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app escript generate_app $(EBIN_DIR) $@ < $< -$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl - erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/rabbit_exchange_behaviour.beam: $(SOURCE_DIR)/rabbit_exchange_behaviour.erl - erlc $(ERLC_OPTS) $< - -$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam $(EBIN_DIR)/rabbit_exchange_behaviour.beam +$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< # ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< @@ -103,6 +102,7 @@ clean: rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f docs/*.[0-9].gz rm -f $(RABBIT_PLT) + rm -f $(DEPS_FILE) cleandb: rm -rf $(RABBITMQ_MNESIA_DIR)/* @@ -173,7 +173,7 @@ srcdist: distclean sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app generate_deps calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -223,3 +223,7 @@ install: all docs_all install_dirs install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(TARGET_DIR)/sbin + +ifneq ($(MAKECMDGOALS),clean) +-include $(DEPS_FILE) +endif @@ -92,6 +92,40 @@ class PackedMethodBitField: def full(self): return self.count() == 8 + +def printFileHeader(): + print """%% Autogenerated code. Do not edit. +%% +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%%""" def genErl(spec): def erlType(domain): @@ -251,6 +285,7 @@ def genErl(spec): methods = spec.allMethods() + printFileHeader() print """-module(rabbit_framing). -include("rabbit_framing.hrl"). @@ -325,6 +360,7 @@ def genHrl(spec): methods = spec.allMethods() + printFileHeader() print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_PORT, %d)." % (spec.port) diff --git a/generate_deps b/generate_deps new file mode 100644 index 00000000..916006d1 --- /dev/null +++ b/generate_deps @@ -0,0 +1,52 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +-mode(compile). + +main([IncludeDir, ErlDir, EbinDir, TargetFile]) -> + ErlDirContents = filelib:wildcard("*.erl", ErlDir), + ErlFiles = [filename:join(ErlDir, FileName) || FileName <- ErlDirContents], + Modules = sets:from_list( + [list_to_atom(filename:basename(FileName, ".erl")) || + FileName <- ErlDirContents]), + Headers = sets:from_list( + [filename:join(IncludeDir, FileName) || + FileName <- filelib:wildcard("*.hrl", IncludeDir)]), + Deps = lists:foldl( + fun (Path, Deps1) -> + dict:store(Path, detect_deps(IncludeDir, EbinDir, + Modules, Headers, Path), + Deps1) + end, dict:new(), ErlFiles), + {ok, Hdl} = file:open(TargetFile, [write, delayed_write]), + dict:fold( + fun (_Path, [], ok) -> + ok; + (Path, Dep, ok) -> + Module = filename:basename(Path, ".erl"), + ok = file:write(Hdl, [EbinDir, "/", Module, ".beam:"]), + ok = sets:fold(fun (E, ok) -> file:write(Hdl, [" ", E]) end, + ok, Dep), + file:write(Hdl, [" ", ErlDir, "/", Module, ".erl\n"]) + end, ok, Deps), + ok = file:write(Hdl, [TargetFile, ": ", escript:script_name(), "\n"]), + ok = file:sync(Hdl), + ok = file:close(Hdl). + +detect_deps(IncludeDir, EbinDir, Modules, Headers, Path) -> + {ok, Forms} = epp:parse_file(Path, [IncludeDir], [{use_specs, true}]), + lists:foldl( + fun ({attribute, _LineNumber, behaviour, Behaviour}, Deps) -> + case sets:is_element(Behaviour, Modules) of + true -> sets:add_element( + [EbinDir, "/", atom_to_list(Behaviour), ".beam"], + Deps); + false -> Deps + end; + ({attribute, _LineNumber, file, {FileName, _LineNumber1}}, Deps) -> + case sets:is_element(FileName, Headers) of + true -> sets:add_element(FileName, Deps); + false -> Deps + end; + (_Form, Deps) -> + Deps + end, sets:new(), Forms). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 4b157cbc..38d8c899 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -128,11 +128,17 @@ properties :: amqp_properties(), properties_bin :: 'none', payload_fragments_rev :: [binary()]}). +-type(unencoded_content() :: undecoded_content()). -type(decoded_content() :: #content{class_id :: amqp_class_id(), properties :: amqp_properties(), properties_bin :: maybe(binary()), payload_fragments_rev :: [binary()]}). +-type(encoded_content() :: + #content{class_id :: amqp_class_id(), + properties :: maybe(amqp_properties()), + properties_bin :: binary(), + payload_fragments_rev :: [binary()]}). -type(content() :: undecoded_content() | decoded_content()). -type(basic_message() :: #basic_message{exchange_name :: exchange_name(), diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index fa2844fd..bc5b58ca 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -34,6 +34,8 @@ prepare: -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ SOURCES/rabbitmq-server.init + sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + SOURCES/rabbitmq-script-wrapper cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 62fb1dfb..4dd22308 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -118,6 +118,9 @@ fi rm -rf %{buildroot} %changelog +* Fri Jan 22 2010 Matthew Sackman <matthew@lshift.net> 1.7.1-1 +- New Upstream Release + * Mon Oct 5 2009 David Wragg <dpw@lshift.net> 1.7.0-1 - New upstream release diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index dfb714f1..f66f8e59 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -45,7 +45,7 @@ cd /var/lib/rabbitmq SCRIPT=`basename $0` if [ `id -u` = 0 ] ; then - su rabbitmq -s /bin/sh -c "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" + @SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}" elif [ `id -u` = `id -u rabbitmq` ] ; then /usr/lib/rabbitmq/bin/${SCRIPT} "$@" else diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index dafaf9ce..ab05f732 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -26,6 +26,8 @@ package: clean -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ $(UNPACKED_DIR)/debian/rabbitmq-server.init + sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ + $(UNPACKED_DIR)/debian/rabbitmq-script-wrapper chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index e4cfe7b5..796a301a 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.7.1-1) intrepid; urgency=low + + * New Upstream Release + + -- Matthew Sackman <matthew@lshift.net> Fri, 22 Jan 2010 14:14:29 +0000 + rabbitmq-server (1.7.0-1) intrepid; urgency=low * New Upstream Release diff --git a/packaging/macports/Makefile b/packaging/macports/Makefile new file mode 100644 index 00000000..4db305eb --- /dev/null +++ b/packaging/macports/Makefile @@ -0,0 +1,59 @@ +TARBALL_DIR=../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) +COMMON_DIR=../common +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +# The URL at which things really get deployed +REAL_WEB_URL=http://www.rabbitmq.com/ + +# The user@host for an OSX machine with macports installed, which is +# used to generate the macports index files. That step will be +# skipped if this variable is not set. If you do set it, you might +# also want to set SSH_OPTS, which allows adding ssh options, e.g. to +# specify a key that will get into the OSX machine without a +# passphrase. +MACPORTS_USERHOST= + +MACPORTS_DIR=macports +DEST=$(MACPORTS_DIR)/net/rabbitmq-server + +all: macports + +dirs: + mkdir -p $(DEST)/files + +$(DEST)/Portfile: Portfile.in + for algo in md5 sha1 rmd160 ; do \ + checksum=$$(openssl $$algo $(TARBALL_DIR)/$(TARBALL) | awk '{print $$NF}') ; \ + echo "s|@$$algo@|$$checksum|g" ; \ + done >checksums.sed + sed -e "s|@VERSION@|$(VERSION)|g;s|@BASE_URL@|$(REAL_WEB_URL)|g" \ + -f checksums.sed <$^ >$@ + rm checksums.sed + +macports: dirs $(DEST)/Portfile + for f in rabbitmq-asroot-script-wrapper rabbitmq-script-wrapper ; do \ + cp $(COMMON_DIR)/$$f $(DEST)/files ; \ + done + sed -i -e 's|@SU_RABBITMQ_SH_C@|sudo -u rabbitmq -H /bin/sh -c|' \ + $(DEST)/files/rabbitmq-script-wrapper + cp patch-org.macports.rabbitmq-server.plist.diff $(DEST)/files + +# This target ssh's into the OSX host in order to finalize the +# macports repo +macports_index: + if [ -n "$(MACPORTS_USERHOST)" ] ; then \ + tar cf - -C $(MACPORTS_DIR) . | ssh $(SSH_OPTS) lshift@macrabbit ' \ + d="/tmp/mkportindex.$$$$" ; \ + mkdir $$d \ + && cd $$d \ + && tar xf - \ + && /opt/local/bin/portindex -a -o . >/dev/null \ + && tar cf - . \ + && cd \ + && rm -rf $$d' \ + | tar xf - -C $(MACPORTS_DIR) ; \ + fi + +clean: + rm -rf $(DEST) checksums.sed diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/Portfile.in index 739f99d0..e1f58212 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/Portfile.in @@ -3,10 +3,10 @@ PortSystem 1.0 name rabbitmq-server -version 1.7.0 -revision 0 +version @VERSION@ +revision 1 categories net -maintainers tonyg@rabbitmq.com +maintainers rabbitmq.com:tonyg platforms darwin description The RabbitMQ AMQP Server long_description \ @@ -15,13 +15,13 @@ long_description \ robust and scalable implementation of an AMQP broker. -homepage http://www.rabbitmq.com/ -master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/ +homepage @BASE_URL@ +master_sites @BASE_URL@releases/rabbitmq-server/v${version}/ checksums \ - md5 4505ca0fd8718439bd6f5e2af2379e56 \ - sha1 84fb86d403057bb808c1b51deee0c1fca3bf7bef \ - rmd160 092f90946825cc3eb277019805e24db637a559f4 + md5 @md5@ \ + sha1 @sha1@ \ + rmd160 @rmd160@ depends_build port:erlang depends_run port:erlang diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper deleted file mode 100644 index c4488dcb..00000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -cd /var/lib/rabbitmq - -SCRIPT=`basename $0` - -if [ `id -u` = 0 ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -else - echo -e "\nOnly root should run ${SCRIPT}\n" - exit 1 -fi - diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper deleted file mode 100644 index 80cb7bd5..00000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/bash -cd /var/lib/rabbitmq - -SCRIPT=`basename $0` - -if [ `id -u` = 0 ] ; then - sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -elif [ `id -u` = `id -u rabbitmq` ] ; then - /usr/lib/rabbitmq/bin/${SCRIPT} "$@" -else - /usr/lib/rabbitmq/bin/${SCRIPT} - echo -e "\nOnly root or rabbitmq should run ${SCRIPT}\n" - exit 1 -fi - diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff index 45b49496..45b49496 100644 --- a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff +++ b/packaging/macports/patch-org.macports.rabbitmq-server.plist.diff diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1a7eb97e..a6eb102a 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -30,8 +30,6 @@ ## Contributor(s): ______________________________________. ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= @@ -40,14 +38,18 @@ CONFIG_FILE=/etc/rabbitmq/rabbitmq . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 7f08cd9d..cbc295f7 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -31,8 +31,6 @@ ## NODENAME=rabbit -NODE_IP_ADDRESS=0.0.0.0 -NODE_PORT=5672 SERVER_ERL_ARGS="+K true +A30 \ -kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ -kernel inet_default_connect_options [{nodelay,true}]" @@ -44,14 +42,18 @@ SERVER_START_ARGS= . `dirname $0`/rabbitmq-env +DEFAULT_NODE_IP_ADDRESS=0.0.0.0 +DEFAULT_NODE_PORT=5672 +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] then if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} + then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} fi else if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${NODE_PORT} + then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} fi fi [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index d960d29d..46681125 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -213,7 +213,7 @@ set ERLANG_SERVICE_ARGUMENTS=%ERLANG_SERVICE_ARGUMENTS:"=\"% "%ERLANG_SERVICE_MANAGER_PATH%\erlsrv" set %RABBITMQ_SERVICENAME% ^
-machine "%ERLANG_SERVICE_MANAGER_PATH%\erl.exe" ^
--env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/log" ^
+-env ERL_CRASH_DUMP="%RABBITMQ_BASE_UNIX%/erl_crash.dump" ^
-workdir "%RABBITMQ_BASE%" ^
-stopaction "rabbit:stop_and_halt()." ^
-sname %RABBITMQ_NODENAME% ^
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 53edf8de..1b24f28e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -36,7 +36,7 @@ %% InitialTimeout supplied from init). After this timeout has %% occurred, hibernation will occur as normal. Upon awaking, a new %% current timeout value will be calculated. -%% +%% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the %% timeout value repeatedly if it is unable to sleep for the diff --git a/src/rabbit.erl b/src/rabbit.erl index 569bdb1f..9fdef96b 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,22 +51,22 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_exchange_type, [{description, "exchange type registry"}, {mfa, {rabbit_sup, start_child, [rabbit_exchange_type]}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_child, [rabbit_log]}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {pre, kernel_ready}]}). + {enables, kernel_ready}]}). -rabbit_boot_step({kernel_ready, [{description, "kernel ready"}]}). @@ -74,27 +74,27 @@ -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, {mfa, {rabbit_alarm, start, []}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_amqqueue_sup, [{description, "queue supervisor"}, {mfa, {rabbit_amqqueue, start, []}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, {mfa, {rabbit_sup, start_child, [rabbit_router]}}, - {post, kernel_ready}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {enables, core_initialized}]}). -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, - {post, kernel_ready}, - {post, rabbit_amqqueue_sup}, - {pre, core_initialized}]}). + {requires, kernel_ready}, + {requires, rabbit_amqqueue_sup}, + {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, [{description, "core initialized"}]}). @@ -102,27 +102,27 @@ -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, - {post, core_initialized}]}). + {requires, core_initialized}]}). -rabbit_boot_step({exchange_recovery, [{description, "exchange recovery"}, {mfa, {rabbit_exchange, recover, []}}, - {post, empty_db_check}]}). + {requires, empty_db_check}]}). -rabbit_boot_step({queue_recovery, [{description, "queue recovery"}, {mfa, {rabbit_amqqueue, recover, []}}, - {post, exchange_recovery}]}). + {requires, exchange_recovery}]}). -rabbit_boot_step({persister, [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {post, queue_recovery}]}). + {requires, queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, - {post, persister}, - {pre, routing_ready}]}). + {requires, persister}, + {enables, routing_ready}]}). -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}]}). @@ -130,12 +130,12 @@ -rabbit_boot_step({log_relay, [{description, "error log relay"}, {mfa, {rabbit_error_logger, boot, []}}, - {post, routing_ready}]}). + {requires, routing_ready}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, - {post, log_relay}, - {pre, networking_listening}]}). + {requires, log_relay}, + {enables, networking_listening}]}). -rabbit_boot_step({networking_listening, [{description, "network listeners available"}]}). @@ -251,9 +251,9 @@ run_boot_step({StepName, Attributes}) -> end, case [MFA || {mfa, MFA} <- Attributes] of [] -> - io:format("progress -- ~s~n", [Description]); + io:format("-- ~s~n", [Description]); MFAs -> - io:format("starting ~-40s ...", [Description]), + io:format("starting ~-60s ...", [Description]), [case catch apply(M,F,A) of {'EXIT', Reason} -> boot_error("FAILED~nReason: ~p~n", [Reason]); @@ -291,9 +291,9 @@ sort_boot_steps(UnsortedSteps) -> %% Add edges, detecting cycles and missing vertices. lists:foreach(fun ({StepName, Attributes}) -> [add_boot_step_dep(G, StepName, PrecedingStepName) - || {post, PrecedingStepName} <- Attributes], + || {requires, PrecedingStepName} <- Attributes], [add_boot_step_dep(G, SucceedingStepName, StepName) - || {pre, SucceedingStepName} <- Attributes] + || {enables, SucceedingStepName} <- Attributes] end, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cacdb010..c1ffff5a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -266,7 +266,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( @@ -293,7 +293,7 @@ limit_all(QPids, ChPid, LimiterPid) -> fun (_) -> ok end, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, QPids). - + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). @@ -302,7 +302,7 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80b7a92c..a3b0814c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -85,7 +85,7 @@ consumers, transactions, memory]). - + %%---------------------------------------------------------------------------- start_link(Q) -> @@ -166,7 +166,7 @@ record_current_channel_tx(ChPid, Txn) -> %% as a side effect this also starts monitoring the channel (if %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - + deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, @@ -290,7 +290,7 @@ possibly_unblock(State, ChPid, Update) -> blocked_consumers = NewBlockedConsumers}) end end. - + should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). @@ -453,7 +453,7 @@ all_tx() -> mark_tx_persistent(Txn) -> Tx = lookup_tx(Txn), store_tx(Txn, Tx#tx{is_persistent = true}). - + is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index bec2cd08..341aa7d0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -46,7 +46,7 @@ -spec(publish/1 :: (delivery()) -> publish_result()). -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> - delivery()). + delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 01ac4f02..b8e161a6 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -46,6 +46,7 @@ build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). +-export([ensure_content_encoded/1, clear_encoded_content/1]). -import(lists). @@ -60,9 +61,11 @@ -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). --spec(generate_table/1 :: (amqp_table()) -> binary()). +-spec(generate_table/1 :: (amqp_table()) -> binary()). -spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). +-spec(ensure_content_encoded/1 :: (content()) -> encoded_content()). +-spec(clear_encoded_content/1 :: (content()) -> unencoded_content()). -endif. @@ -262,3 +265,19 @@ check_empty_content_body_frame_size() -> exit({incorrect_empty_content_body_frame_size, ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. + +ensure_content_encoded(Content = #content{properties_bin = PropsBin}) + when PropsBin =/= 'none' -> + Content; +ensure_content_encoded(Content = #content{properties = Props}) -> + Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. + +clear_encoded_content(Content = #content{properties_bin = none}) -> + Content; +clear_encoded_content(Content = #content{properties = none}) -> + %% Only clear when we can rebuild the properties_bin later in + %% accordance to the content record definition comment - maximum + %% one of properties and properties_bin can be 'none' + Content; +clear_encoded_content(Content = #content{}) -> + Content#content{properties_bin = none}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 506e87ec..eaedeba1 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -139,7 +139,7 @@ parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort, end, parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort, Remainder, Rest). - + parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) -> {String, Rest}; parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 507dab48..94d06148 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -485,7 +485,7 @@ handle_method(#'basic.qos'{global = true}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "global=true", []); handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> - rabbit_misc:protocol_error(not_implemented, + rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, @@ -526,24 +526,24 @@ handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> - lists:foreach( - fun ({_DeliveryTag, none, _Msg}) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}) -> - %% Was sent as a proper consumer delivery. Resend it as - %% before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - ok = internal_deliver( + ok = rabbit_misc:queue_fold( + fun ({_DeliveryTag, none, _Msg}, ok) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> + %% Was sent as a proper consumer delivery. Resend + %% it as before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + internal_deliver( WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) - end, queue:to_list(UAMQ)), + end, ok, UAMQ), %% No answer required, apparently! {noreply, State}; @@ -756,9 +756,9 @@ handle_method(_MethodRecord, _Content, _State) -> binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> - %% FIXME: connection exception (!) on failure?? + %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - + %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_write_permitted(QueueName, State), @@ -872,7 +872,7 @@ rollback_and_notify(State) -> notify_queues(internal_rollback(State)). fold_per_queue(F, Acc0, UAQ) -> - D = lists:foldl( + D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> %% dict:append would be simpler and avoid the @@ -883,7 +883,7 @@ fold_per_queue(F, Acc0, UAQ) -> fun (MsgIds) -> [MsgId | MsgIds] end, [MsgId], D) - end, dict:new(), queue:to_list(UAQ)), + end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). @@ -894,7 +894,7 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> - [QPid || QueueName <- + [QPid || QueueName <- sets:to_list( dict:fold(fun (_ConsumerTag, QueueName, S) -> sets:add_element(QueueName, S) @@ -912,9 +912,9 @@ consumer_queues(Consumers) -> notify_limiter(undefined, _Acked) -> ok; notify_limiter(LimiterPid, Acked) -> - case lists:foldl(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, queue:to_list(Acked)) of + case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(LimiterPid, Count) end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ddd0c002..2fe3f33e 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -54,7 +54,7 @@ start() -> {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), - #params{quiet = Quiet, node = Node, command = Command, args = Args} = + #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of @@ -156,11 +156,11 @@ Available commands: list_queues [-p <VHostPath>] [<QueueInfoItem> ...] list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] - list_bindings [-p <VHostPath>] + list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] -Quiet output mode is selected with the \"-q\" flag. Informational messages -are suppressed when quiet mode is in effect. +Quiet output mode is selected with the \"-q\" flag. Informational +messages are suppressed when quiet mode is in effect. <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local @@ -169,24 +169,27 @@ usually be rabbit@server (unless RABBITMQ_NODENAME has been set to some non-default value at broker startup time). The output of hostname -s is usually the correct suffix to use after the \"@\" sign. -The list_queues, list_exchanges and list_bindings commands accept an optional -virtual host parameter for which to display results. The default value is \"/\". +The list_queues, list_exchanges and list_bindings commands accept an +optional virtual host parameter for which to display results. The +default value is \"/\". -<QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, -messages, acks_uncommitted, consumers, transactions, memory]. The default is - to display name and (number of) messages. +<QueueInfoItem> must be a member of the list [name, durable, +auto_delete, arguments, pid, messages_ready, messages_unacknowledged, +messages_uncommitted, messages, acks_uncommitted, consumers, +transactions, memory]. The default is to display name and (number of) +messages. -<ExchangeInfoItem> must be a member of the list [name, type, durable, +<ExchangeInfoItem> must be a member of the list [name, type, durable, auto_delete, arguments]. The default is to display name and type. -The output format for \"list_bindings\" is a list of rows containing +The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [pid, address, port, -peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. -The default is to display user, peer_address, peer_port and state. +<ConnectionInfoItem> must be a member of the list [pid, address, port, +peer_address, peer_port, state, channels, user, vhost, timeout, +frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, +send_pend]. The default is to display user, peer_address, peer_port +and state. "), halt(1). @@ -287,7 +290,7 @@ action(list_bindings, Node, Args, Inform) -> InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], + X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], InfoKeys), ok; @@ -317,9 +320,9 @@ action(list_permissions, Node, VHost, [], Inform) -> [VHost]})). parse_vhost_flag(Args) when is_list(Args) -> - case Args of + case Args of ["-p", VHost | RemainingArgs] -> - {VHost, RemainingArgs}; + {VHost, RemainingArgs}; RemainingArgs -> {"/", RemainingArgs} end. @@ -329,9 +332,9 @@ parse_vhost_flag_bin(Args) -> {list_to_binary(VHost), RemainingArgs}. default_if_empty(List, Default) when is_list(List) -> - if List == [] -> - Default; - true -> + if List == [] -> + Default; + true -> [list_to_atom(X) || X <- List] end. @@ -356,7 +359,7 @@ format_info_item(Key, Items) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> pid_to_string(Value); - Value when is_binary(Value) -> + Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> escape(atom_to_list(Value)); diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 183b6984..3ad85923 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -56,7 +56,7 @@ init({{File, Suffix}, []}) -> init({{File, _}, error}) -> init(File); %% Used only when swapping handlers without performing -%% log rotation +%% log rotation init({File, []}) -> init(File); init({File, _Type} = FileInfo) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a82a5ece..45236422 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -77,15 +77,15 @@ -spec(delete_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'binding_not_found'}). --spec(list_bindings/1 :: (vhost()) -> +-spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). --spec(list_queue_bindings/1 :: (queue_name()) -> +-spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> +-spec(list_exchange_bindings/1 :: (exchange_name()) -> [{queue_name(), routing_key(), amqp_table()}]). -endif. @@ -449,7 +449,7 @@ list_bindings(VHostPath) -> [{ExchangeName, QueueName, RoutingKey, Arguments} || #route{binding = #binding{ exchange_name = ExchangeName, - key = RoutingKey, + key = RoutingKey, queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( @@ -540,7 +540,7 @@ list_exchange_bindings(ExchangeName) -> [{QueueName, RoutingKey, Arguments} || #route{binding = #binding{queue_name = QueueName, key = RoutingKey, - args = Arguments}} + args = Arguments}} <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader @@ -550,5 +550,5 @@ list_queue_bindings(QueueName) -> [{ExchangeName, RoutingKey, Arguments} || #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, - args = Arguments}} + args = Arguments}} <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 5c447792..3bde0879 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -115,7 +115,7 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> collect_content_payload(ChannelPid, RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); - _ -> + _ -> rabbit_misc:protocol_error( command_invalid, "expected content body, got non content body frame instead", diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl index b3d271c2..3faeec26 100644 --- a/src/rabbit_hooks.erl +++ b/src/rabbit_hooks.erl @@ -61,8 +61,8 @@ unsubscribe(Hook, HandlerName) -> trigger(Hook, Args) -> Hooks = ets:lookup(?TableName, Hook), [case catch apply(M, F, [Hook, Name, Args | A]) of - {'EXIT', Reason} -> - rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", + {'EXIT', Reason} -> + rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", [Name, Hook, Reason]); _ -> ok end || {_, Name, {M, F, A}} <- Hooks], diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d939d084..dc1afa64 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -55,7 +55,7 @@ -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). --export([unfold/2, ceil/1]). +-export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). -import(mnesia). @@ -127,6 +127,7 @@ -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). -spec(ceil/1 :: (number()) -> number()). +-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -endif. @@ -490,9 +491,15 @@ unfold(Fun, Acc, Init) -> ceil(N) -> T = trunc(N), - case N - T of - 0 -> N; - _ -> 1 + T + case N == T of + true -> T; + false -> 1 + T + end. + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. %% Sorts a list of AMQP table fields as per the AMQP spec diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 749038db..1e700362 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -55,8 +55,8 @@ -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). --spec(is_clustered/0 :: () -> boolean()). --spec(empty_ram_only_tables/0 :: () -> 'ok'). +-spec(is_clustered/0 :: () -> boolean()). +-spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -endif. @@ -173,7 +173,7 @@ replicated_table_names() -> ]. dir() -> mnesia:system_info(directory). - + ensure_mnesia_dir() -> MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of @@ -389,7 +389,7 @@ wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). wait_for_tables() -> wait_for_tables(table_names()). -wait_for_tables(TableNames) -> +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> case mnesia:wait_for_tables(TableNames, 30000) of diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index a5ccc8e9..e9634c03 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -40,7 +40,7 @@ -ifdef(use_specs). --type(stat_option() :: +-type(stat_option() :: 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). -type(error() :: {'error', any()}). @@ -50,11 +50,11 @@ -spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). -spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). --spec(peername/1 :: (socket()) -> +-spec(peername/1 :: (socket()) -> {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(sockname/1 :: (socket()) -> +-spec(sockname/1 :: (socket()) -> {'ok', {ip_address(), non_neg_integer()}} | error()). --spec(getstat/2 :: (socket(), [stat_option()]) -> +-spec(getstat/2 :: (socket(), [stat_option()]) -> {'ok', [{stat_option(), integer()}]} | error()). -endif. @@ -66,8 +66,8 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> Pid = self(), Ref = make_ref(), - spawn(fun() -> Pid ! {inet_async, Sock, Ref, - ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} end), {ok, Ref}; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 84658a85..84be7918 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,10 +31,10 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, - stop_tcp_listener/2, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info/1, - connection_info/2, connection_info_all/0, +-export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info/1, + connection_info/2, connection_info_all/0, connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). @@ -46,11 +46,11 @@ -include_lib("kernel/include/inet.hrl"). -define(RABBIT_TCP_OPTS, [ - binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, {exit_on_close, false} ]). @@ -206,7 +206,7 @@ start_ssl_client(SslOpts, Sock) -> {error, {ssl_upgrade_error, Reason}}; {'EXIT', Reason} -> {error, {ssl_upgrade_failure, Reason}} - + end end). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index d0d60ddf..687fc5b3 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -57,7 +57,7 @@ -record(pstate, {log_handle, entry_count, deadline, pending_logs, pending_replies, - snapshot}). + snapshot}). %% two tables for efficient persistency %% one maps a key to a message @@ -166,7 +166,7 @@ handle_call({transaction, Key, MessageList}, From, State) -> do_noreply(internal_commit(From, Key, NewState)); handle_call({commit_transaction, TxnKey}, From, State) -> do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> +handle_call(force_snapshot, _From, State) -> do_reply(ok, flush(true, State)); handle_call(serial, _From, State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> @@ -211,7 +211,7 @@ internal_dirty_work(MessageList, State) -> log_work(fun (ML) -> {dirty_work, ML} end, MessageList, State). -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> +internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> Unit = {commit_transaction, Key}, NewSnapshot = internal_integrate1(Unit, Snapshot), complete(From, Unit, State#pstate{snapshot = NewSnapshot}). @@ -243,7 +243,7 @@ log_work(CreateWorkUnit, MessageList, fun(M = {publish, Message, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of [_] -> {tied, QK}; - [] -> ets:insert(Messages, {PKey, Message}), + [] -> ets:insert(Messages, {PKey, Message}), M end; (M) -> M @@ -252,7 +252,7 @@ log_work(CreateWorkUnit, MessageList, NewSnapshot = internal_integrate1(Unit, Snapshot), log(State#pstate{snapshot = NewSnapshot}, Unit). -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, +log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, Message) -> State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, ExistingDeadline), @@ -365,7 +365,7 @@ prune_table(Tab, Keys) -> true = ets:safe_fixtable(Tab, true), ok = prune_table(Tab, Keys, ets:first(Tab)), true = ets:safe_fixtable(Tab, false). - + prune_table(_Tab, _Keys, '$end_of_table') -> ok; prune_table(Tab, Keys, Key) -> case sets:is_element(Key, Keys) of @@ -374,7 +374,7 @@ prune_table(Tab, Keys, Key) -> end, prune_table(Tab, Keys, ets:next(Tab, Key)). -internal_load_snapshot(LogHandle, +internal_load_snapshot(LogHandle, Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), @@ -435,9 +435,9 @@ accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> - RequeueMessages = + RequeueMessages = [{{QName, PKey}, Message, Delivered} || - {PKey, Delivered} <- Requeues, + {PKey, Delivered} <- Requeues, {_, Message} <- ets:lookup(Messages, PKey)], rabbit_amqqueue:redeliver( QPid, @@ -459,7 +459,7 @@ replay([], LogHandle, K, Snapshot) -> {K1, Items} -> replay(Items, LogHandle, K1, Snapshot); {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", + rabbit_log:warning("~p bad bytes recovering persister log~n", [Badbytes]), replay(Items, LogHandle, K1, Snapshot); eof -> Snapshot diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e78d889d..503e2fb4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -193,7 +193,7 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -socket_op(Sock, Fun) -> +socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", @@ -213,7 +213,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - try + try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ @@ -271,7 +271,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> [Reason], none)); true -> ok end, - %% this is what we are expected to do according to + %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% %% If we wanted to be *really* nice we should wait for a @@ -671,7 +671,7 @@ i(peer_port, #v1{sock = Sock}) -> {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; - SockStat =:= recv_cnt; + SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 99f1bc67..353ca5be 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -105,7 +105,7 @@ test_priority_queue() -> {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = test_priority_queue(Q6), - %% merge 1-element priority Q with 1-element no-priority Q + %% merge 1-element priority Q with 1-element no-priority Q Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), priority_queue:in(bar, Q)), {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = @@ -290,7 +290,7 @@ test_field_values() -> 4,"long", "l", 1234567890:64, % + 14 = 145 5,"short", "s", 655:16, % + 9 = 154 4,"bool", "t", 1, % + 7 = 161 - 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 + 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 4,"void", "V", % + 6 = 194 5,"array", "A", 23:32, % + 11 = 205 "I", 54321:32, % + 5 = 210 @@ -423,7 +423,7 @@ test_log_management_during_startup() -> {sasl_report_tty_h, []}]), ok = control_action(start_app, []), - %% start application with tty logging and + %% start application with tty logging and %% proper handlers not installed ok = control_action(stop_app, []), ok = error_logger:tty(false), @@ -455,7 +455,7 @@ test_log_management_during_startup() -> ok = add_log_handlers([{error_logger_file_h, MainLog}]), ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); + log_rotation_no_write_permission_dir_test}); {error, {cannot_log_to_file, _, _}} -> ok end, @@ -476,7 +476,7 @@ test_log_management_during_startup() -> ok = file:del_dir(TmpDir), %% start application with standard error_logger_file_h - %% handler not installed + %% handler not installed ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -584,7 +584,7 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - + %% convert a disk node into a ram node ok = control_action(cluster, ["invalid1@invalid", "invalid2@invalid"]), @@ -760,11 +760,11 @@ test_hooks() -> {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), %% Invoking Pids - Remote = fun() -> - receive - {rabbitmq_hook,[remote_test,test,[],Target]} -> + Remote = fun() -> + receive + {rabbitmq_hook,[remote_test,test,[],Target]} -> Target ! invoked - end + end end, P = spawn(Remote), rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), @@ -790,7 +790,7 @@ control_action(Command, Node, Args) -> ok -> io:format("done.~n"), ok; - Other -> + Other -> io:format("failed.~n"), Other end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 1679ce7c..02b9968d 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -59,7 +59,7 @@ (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: (socket(), channel_number(), amqp_method()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/5 :: (socket(), channel_number(), amqp_method(), content(), non_neg_integer()) -> 'ok'). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index bc742561..5364acf9 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -48,14 +48,15 @@ start_link(Callback, LSock) -> %%-------------------------------------------------------------------- init({Callback, LSock}) -> - case prim_inet:async_accept(LSock, -1) of - {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}}; - Error -> {stop, {cannot_accept, Error}} - end. + gen_server:cast(self(), accept), + {ok, #state{callback=Callback, sock=LSock}}. handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast(accept, State) -> + accept(State); + handle_cast(_Msg, State) -> {noreply, State}. @@ -63,7 +64,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> %% patch up the socket so it looks like one we got from - %% gen_tcp:accept/1 + %% gen_tcp:accept/1 {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), @@ -83,10 +84,7 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, end, %% accept more - case prim_inet:async_accept(LSock, -1) of - {ok, NRef} -> {noreply, State#state{ref=NRef}}; - Error -> {stop, {cannot_accept, Error}, none} - end; + accept(State); handle_info({inet_async, LSock, Ref, {error, closed}}, State=#state{sock=LSock, ref=Ref}) -> %% It would be wrong to attempt to restart the acceptor when we @@ -104,3 +102,9 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). + +accept(State = #state{sock=LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> {noreply, State#state{ref=Ref}}; + Error -> {stop, {cannot_accept, Error}, State} + end. diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 4a2e149b..fdb6ec86 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -69,7 +69,7 @@ init({IPAddress, Port, SocketOpts, [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), {ok, #state{sock = LSock, - on_startup = OnStartup, on_shutdown = OnShutdown, + on_startup = OnStartup, on_shutdown = OnShutdown, label = Label}}; {error, Reason} -> error_logger:error_msg( diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 8be28f52..91788caa 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -51,7 +51,8 @@ -export([update/0, get_total_memory/0, get_check_interval/0, set_check_interval/1, - get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1]). + get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1, + get_memory_limit/0]). -define(SERVER, ?MODULE). @@ -77,6 +78,7 @@ ('ignore' | {'error', any()} | {'ok', pid()})). -spec(update/0 :: () -> 'ok'). -spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')). +-spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')). -spec(get_check_interval/0 :: () -> non_neg_integer()). -spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok'). -spec(get_vm_memory_high_watermark/0 :: () -> float()). @@ -96,16 +98,20 @@ get_total_memory() -> get_total_memory(os:type()). get_check_interval() -> - gen_server:call(?MODULE, get_check_interval). + gen_server:call(?MODULE, get_check_interval, infinity). set_check_interval(Fraction) -> - gen_server:call(?MODULE, {set_check_interval, Fraction}). + gen_server:call(?MODULE, {set_check_interval, Fraction}, infinity). get_vm_memory_high_watermark() -> - gen_server:call(?MODULE, get_vm_memory_high_watermark). + gen_server:call(?MODULE, get_vm_memory_high_watermark, infinity). set_vm_memory_high_watermark(Fraction) -> - gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}). + gen_server:call(?MODULE, {set_vm_memory_high_watermark, Fraction}, + infinity). + +get_memory_limit() -> + gen_server:call(?MODULE, get_memory_limit, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -152,6 +158,9 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; +handle_call(get_memory_limit, _From, State) -> + {reply, State#state.memory_limit, State}; + handle_call(_Request, _From, State) -> {noreply, State}. |