diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-09 16:53:24 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-02-09 16:53:24 +0000 |
commit | ff53141a67e48c7363e1910157904f698c01f61e (patch) | |
tree | d808139d5899ac7fd664c20a1d8d94a66c96cbe8 | |
parent | f8763d64de0c28f608bb10d22c64d8725deb1990 (diff) | |
parent | 54250f72aac333157ff6a4497ab69682a3bc27fb (diff) | |
download | rabbitmq-server-ff53141a67e48c7363e1910157904f698c01f61e.tar.gz |
Merged default
129 files changed, 691 insertions, 472 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ index 14bcc21d..d50e32ef 100644 --- a/LICENSE-MPL-RabbitMQ +++ b/LICENSE-MPL-RabbitMQ @@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should @@ -11,7 +11,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## from __future__ import nested_scopes @@ -118,7 +118,7 @@ def printFileHeader(): %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %%""" def genErl(spec): diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index ee29706e..a317e63b 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index f6a8a303..faf3059a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -record(user, {username, @@ -86,7 +86,7 @@ %%---------------------------------------------------------------------------- --define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc."). +-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). -define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). @@ -95,7 +95,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(CREDIT_DISC_BOUND, {2000, 1500}). +-define(CREDIT_DISC_BOUND, {2000, 500}). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl index 803bb75c..61a2e22a 100644 --- a/include/rabbit_auth_backend_spec.hrl +++ b/include/rabbit_auth_backend_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl index 614a3eed..9a2f5e05 100644 --- a/include/rabbit_auth_mechanism_spec.hrl +++ b/include/rabbit_auth_mechanism_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 4a657951..2a8cc13c 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -type(fetch_result(Ack) :: diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index f6283ef7..8f7e22d3 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index e9150a97..f7c10bd8 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -include("rabbit.hrl"). diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index 2ae5b000..75d7eb71 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -include("rabbit_msg_store.hrl"). diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail index 5d842cc1..b9c2629b 100644 --- a/packaging/common/LICENSE.tail +++ b/packaging/common/LICENSE.tail @@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved. + Copyright (c) 2007-2012 VMware, Inc. All rights reserved. MOZILLA PUBLIC LICENSE @@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2011 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 0436f546..0e59c218 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## # Escape spaces and quotes, because shell is revolting. diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index e6776eff..14557286 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## ## diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 8696427e..79e9c1dd 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -34,7 +34,7 @@ package: clean chmod a+x $(UNPACKED_DIR)/debian/rules echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright - echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright + echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) rm -rf $(UNPACKED_DIR) diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index 45f5c5c4..fb02cd6a 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -2,7 +2,7 @@ Source: rabbitmq-server Section: net Priority: extra Maintainer: RabbitMQ Team <packaging@rabbitmq.com> -Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc +Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip Standards-Version: 3.8.0 Package: rabbitmq-server diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index 27e4e1dc..91510991 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? -VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved." VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index a2ef8d3c..1fd1339d 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## # Determine where this script is really located @@ -36,7 +36,16 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.." [ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` NODENAME=rabbit@${HOSTNAME%%.*} -# Load configuration from the rabbitmq.conf file +## Set (non-empty) default values for rabbitmq-env.conf variables to override +SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ +-kernel inet_default_connect_options [{nodelay,true}]" +CONFIG_FILE=/etc/rabbitmq/rabbitmq +LOG_BASE=/var/log/rabbitmq +MNESIA_BASE=/var/lib/rabbitmq/mnesia +PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins + +## Load configuration from the rabbitmq.conf file if [ -f /etc/rabbitmq/rabbitmq.conf ] && \ [ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- " diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 4c6cb1fa..14a18d57 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -12,16 +12,19 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env -ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +##--- End of overridden <var_name> variables exec erl \ -pa "${RABBITMQ_HOME}/ebin" \ diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index ca874a7f..66a900a1 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 39a68c8e..0a5a4640 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -12,33 +12,23 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## -SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ --kernel inet_default_connect_options [{nodelay,true}]" -CONFIG_FILE=/etc/rabbitmq/rabbitmq -LOG_BASE=/var/log/rabbitmq -MNESIA_BASE=/var/lib/rabbitmq/mnesia -SERVER_START_ARGS= -ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins - +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + DEFAULT_NODE_IP_ADDRESS=auto DEFAULT_NODE_PORT=5672 -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] -then - if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} - fi -else - if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} - fi -fi +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} +[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} + +[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} +[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} + [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} [ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} @@ -48,13 +38,16 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} + +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE} [ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" + +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} @@ -62,6 +55,8 @@ fi [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS} [ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log" +##--- End of overridden <var_name> variables + RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 44ce1ce1..ca49a5d8 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 1582bfb1..9e274840 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 9a11c3b3..4aad6b8f 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -12,14 +12,20 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. ## +# Get default settings with user overrides for (RABBITMQ_)<var_name> +# Non-empty defaults should be set in rabbitmq-env . `dirname $0`/rabbitmq-env +##--- Set environment vars RABBITMQ_<var_name> to defaults if not set + [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} +##--- End of overridden <var_name> variables + exec erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index a74a91fd..f37fae48 100644..100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 397aa191..072f4d9d 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -11,23 +11,26 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(credit_flow). -%% Credit starts at MaxCredit and goes down. Both sides keep -%% track. When the receiver goes below MoreCreditAt it issues more -%% credit by sending a message to the sender. The sender should pass -%% this message in to handle_bump_msg/1. The sender should block when -%% it goes below 0 (check by invoking blocked/0). If a process is both -%% a sender and a receiver it will not grant any more credit to its -%% senders when it is itself blocked - thus the only processes that -%% need to check blocked/0 are ones that read from network sockets. - --define(DEFAULT_CREDIT, {200, 150}). - --export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]). +%% Credit flow is controlled by a credit specification - a +%% {InitialCredit, MoreCreditAfter} tuple. For the message sender, +%% credit starts at InitialCredit and is decremented with every +%% message sent. The message receiver grants more credit to the sender +%% by sending it a {bump_credit, ...} control message after receiving +%% MoreCreditAfter messages. The sender should pass this message in to +%% handle_bump_msg/1. The sender should block when it goes below 0 +%% (check by invoking blocked/0). If a process is both a sender and a +%% receiver it will not grant any more credit to its senders when it +%% is itself blocked - thus the only processes that need to check +%% blocked/0 are ones that read from network sockets. + +-define(DEFAULT_CREDIT, {200, 50}). + +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -35,14 +38,14 @@ -ifdef(use_specs). -opaque(bump_msg() :: {pid(), non_neg_integer()}). --opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}). +-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(ack/1 :: (pid()) -> 'ok'). -spec(ack/2 :: (pid(), credit_spec()) -> 'ok'). -spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). -spec(blocked/0 :: () -> boolean()). --spec(send/1 :: (pid()) -> 'ok'). --spec(send/2 :: (pid(), credit_spec()) -> 'ok'). -spec(peer_down/1 :: (pid()) -> 'ok'). -endif. @@ -60,39 +63,32 @@ %% For any given pair of processes, ack/2 and send/2 must always be %% called with the same credit_spec(). -ack(To) -> ack(To, ?DEFAULT_CREDIT). +send(From) -> send(From, ?DEFAULT_CREDIT). -ack(To, {MaxCredit, MoreCreditAt}) -> - MoreCreditAt1 = MoreCreditAt + 1, - Credit = - case get({credit_to, To}, MaxCredit) of - MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt), - MaxCredit; - C -> C - 1 - end, - put({credit_to, To}, Credit). +send(From, {InitialCredit, _MoreCreditAfter}) -> + update({credit_from, From}, InitialCredit, + fun (1) -> block(From), + 0; + (C) -> C - 1 + end). -handle_bump_msg({From, MoreCredit}) -> - Credit = get({credit_from, From}, 0) + MoreCredit, - put({credit_from, From}, Credit), - case Credit > 0 of - true -> unblock(From), - ok; - false -> ok - end. +ack(To) -> ack(To, ?DEFAULT_CREDIT). -blocked() -> - get(credit_blocked, []) =/= []. +ack(To, {_InitialCredit, MoreCreditAfter}) -> + update({credit_to, To}, MoreCreditAfter, + fun (1) -> grant(To, MoreCreditAfter), + MoreCreditAfter; + (C) -> C - 1 + end). -send(From) -> send(From, ?DEFAULT_CREDIT). +handle_bump_msg({From, MoreCredit}) -> + update({credit_from, From}, 0, + fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From), + C + MoreCredit; + (C) -> C + MoreCredit + end). -send(From, {MaxCredit, _MoreCreditAt}) -> - Credit = get({credit_from, From}, MaxCredit) - 1, - case Credit of - 0 -> block(From); - _ -> ok - end, - put({credit_from, From}, Credit). +blocked() -> get(credit_blocked, []) =/= []. peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it @@ -108,20 +104,18 @@ grant(To, Quantity) -> Msg = {bump_credit, {self(), Quantity}}, case blocked() of false -> To ! Msg; - true -> Deferred = get(credit_deferred, []), - put(credit_deferred, [{To, Msg} | Deferred]) + true -> update(credit_deferred, [], + fun (Deferred) -> [{To, Msg} | Deferred] end) end. -block(From) -> - put(credit_blocked, [From | get(credit_blocked, [])]). +block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end). unblock(From) -> - NewBlocks = get(credit_blocked, []) -- [From], - put(credit_blocked, NewBlocks), - case NewBlocks of - [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], - erase(credit_deferred); - _ -> ok + update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end), + case blocked() of + false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + true -> ok end. get(Key, Default) -> @@ -129,3 +123,5 @@ get(Key, Default) -> undefined -> Default; Value -> Value end. + +update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok. diff --git a/src/delegate.erl b/src/delegate.erl index edb4eba4..d595e481 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate). diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 4c131a6c..2a8b915b 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(delegate_sup). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index a362c94f..f3b4dbaf 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(file_handle_cache). diff --git a/src/gatherer.erl b/src/gatherer.erl index fe976b50..98b36038 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gatherer). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 49913d26..f8537487 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -73,7 +73,7 @@ %% but where the second argument is specifically the priority_queue %% which contains the prioritised message_queue. -%% All modifications are (C) 2009-2011 VMware, Inc. +%% All modifications are (C) 2009-2012 VMware, Inc. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm). diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 5e5a3a5a..57217541 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_soak_test). diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index defb0f29..dad75bd4 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_speed_test). diff --git a/src/gm_tests.erl b/src/gm_tests.erl index ca0ffd64..0a2d4204 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(gm_tests). diff --git a/src/lqueue.erl b/src/lqueue.erl index 04b40706..c4e046b5 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(lqueue). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 3ba8f50d..a599effa 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor). diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index d48a9ca5..e8baabe8 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor_tests). diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl new file mode 100644 index 00000000..a3773d90 --- /dev/null +++ b/src/mnesia_sync.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(mnesia_sync). + +%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk +%% at commit. This module is an attempt to minimise the risk of data loss by +%% performing a coalesced log fsync. Unfortunately this is performed regardless +%% of whether or not the log was appended to. + +-behaviour(gen_server). + +-export([sync/0]). + +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {waiting, disc_node}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(sync/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +sync() -> + gen_server:call(?SERVER, sync, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}. + +handle_call(sync, _From, #state{disc_node = false} = State) -> + {reply, ok, State}; +handle_call(sync, From, #state{waiting = Waiting} = State) -> + {noreply, State#state{waiting = [From | Waiting]}, 0}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(timeout, #state{waiting = Waiting} = State) -> + ok = disk_log:sync(latest_log), + [gen_server:reply(From, ok) || From <- Waiting], + {noreply, State#state{waiting = []}}; +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/pg_local.erl b/src/pg_local.erl index c9c3a3a7..e2e82f1f 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -13,7 +13,7 @@ %% versions of Erlang/OTP. The remaining type specs have been %% removed. -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %CopyrightBegin% %% diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 4fc8b469..780fa2e9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% Priority queues have essentially the same interface as ordinary diff --git a/src/rabbit.erl b/src/rabbit.erl index 3dcd4938..0a0ca90a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit). @@ -45,6 +45,12 @@ {requires, file_handle_cache}, {enables, external_infrastructure}]}). +-rabbit_boot_step({database_sync, + [{description, "database sync"}, + {mfa, {rabbit_sup, start_child, [mnesia_sync]}}, + {requires, database}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({file_handle_cache, [{description, "file handle cache server"}, {mfa, {rabbit_sup, start_restartable_child, @@ -360,10 +366,8 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> - ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), - print_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], io:format("~nbroker running~n"), @@ -501,6 +505,19 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + {Err, Nodes} = + case rabbit_mnesia:read_previously_running_nodes() of + [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" + " shut down forcefully~nit cannot determine which nodes" + " are timing out. Details on all nodes will~nfollow.~n", + rabbit_mnesia:all_clustered_nodes() -- [node()]}; + Ns -> {rabbit_misc:format( + "Timeout contacting cluster nodes: ~p.~n", [Ns]), + Ns} + end, + boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); + boot_step_error(Reason, Stacktrace) -> boot_error("Error description:~n ~p~n~n" "Log files (may contain more information):~n ~s~n ~s~n~n" @@ -662,7 +679,7 @@ print_banner() -> {"app descriptor", app_location()}, {"home dir", home_dir()}, {"config file(s)", config_files()}, - {"cookie hash", rabbit_misc:cookie_hash()}, + {"cookie hash", rabbit_nodes:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, {"database dir", rabbit_mnesia:dir()}, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index ec9affa6..75c53511 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_access_control). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 517dd4ec..187ec1ab 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_alarm). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 94a99a49..a7dfd535 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue). @@ -25,7 +25,7 @@ -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, unblock/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([store_queue/1]). @@ -40,6 +40,8 @@ -define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). +-define(MORE_CONSUMER_CREDIT_AFTER, 50). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -137,6 +139,7 @@ -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: @@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + Key = {consumer_credit_to, QPid}, + put(Key, case get(Key) of + 1 -> gen_server2:cast( + QPid, {notify_sent, ChPid, + ?MORE_CONSUMER_CREDIT_AFTER}), + ?MORE_CONSUMER_CREDIT_AFTER; + undefined -> erlang:monitor(process, QPid), + ?MORE_CONSUMER_CREDIT_AFTER - 1; + C -> C - 1 + end), + ok. + +notify_sent_queue_down(QPid) -> + erase({consumer_credit_to, QPid}), + ok. unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f63a09d3..b3a620fa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). @@ -20,7 +20,7 @@ -behaviour(gen_server2). --define(UNSENT_MESSAGE_LIMIT, 100). +-define(UNSENT_MESSAGE_LIMIT, 200). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). @@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) -> {_, _} -> ok end. -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, +deliver_msgs_to_consumers(_DeliverFun, true, State) -> + {true, State}; +deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case PredFun(FunAcc, State) of - false -> {FunAcc, State}; - true -> case queue:out(ActiveConsumers) of - {empty, _} -> - {FunAcc, State}; - {{value, QEntry}, Tail} -> - {FunAcc1, State1} = - deliver_msg_to_consumer( + case queue:out(ActiveConsumers) of + {empty, _} -> + {false, State}; + {{value, QEntry}, Tail} -> + {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, - FunAcc, State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(Funs, FunAcc1, State1) - end + State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {FunAcc, State}; + {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, self(), Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), - {FunAcc, State}; + {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, FunAcc, + DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end. @@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - FunAcc, State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), + State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, Stop, State1} = + DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {FunAcc1, State1}. - -deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. + {Stop, State1}. -deliver_from_queue_deliver(AckRequired, false, State) -> +deliver_from_queue_deliver(AckRequired, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. @@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) -> State. run_message_queue(State) -> - Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3}, State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), - IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + {_IsEmpty1, State2} = deliver_msgs_to_consumers( + fun deliver_from_queue_deliver/2, + BQ:is_empty(BQS), State1), State2. attempt_delivery(Delivery = #delivery{sender = ChPid, @@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, - State1 = #q{backing_queue_state = BQS2}) -> + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> %% we don't need an expiry here because %% messages are not being enqueued, so we use %% an empty message_properties. @@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State1#q{backing_queue_state = BQS3}} end, {Delivered, State2} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + deliver_msgs_to_consumers(DeliverFun, false, State#q{backing_queue_state = BQS1}), {Delivered, Confirm, State2}; {Duplicate, BQS1} -> @@ -830,7 +823,7 @@ prioritise_cast(Msg, _State) -> {set_maximum_since_use, _Age} -> 8; {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; + {notify_sent, _ChPid, _Credit} -> 7; {unblock, _ChPid} -> 7; {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 @@ -1064,11 +1057,11 @@ handle_cast({unblock, ChPid}, State) -> possibly_unblock(State, ChPid, fun (C) -> C#cr{is_limit_active = false} end)); -handle_cast({notify_sent, ChPid}, State) -> +handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> - C#cr{unsent_message_count = Count - 1} + C#cr{unsent_message_count = Count - Credit} end)); handle_cast({limit, ChPid, Limiter}, State) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index 7b3ebcf2..a4305e5f 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_sup). diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index ade158bb..e0e252b8 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend). diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 086a90b4..3ef81d32 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend_internal). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 897199ee..0c8251b8 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism). diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index b8682a46..3de6e7a6 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_amqplain). diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index acbb6e48..64b01d8e 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_cr_demo). diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 2448acb6..19fb5875 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_plain). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c3b322ee..364eb8f6 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue). diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index c61184a6..7b00fa5f 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue_qc). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b116821c..b8211d43 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_basic). @@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) -> {ok, #basic_message{ exchange_name = XName, content = strip_header(DecodedContent, ?DELETED_HEADER), - id = rabbit_guid:guid(), + id = rabbit_guid:gen(), is_persistent = is_message_persistent(DecodedContent), routing_keys = [RoutingKey | header_routes(Props#'P_basic'.headers)]}} diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 494f3203..d69376fb 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_generator). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index f3ca4e98..5f0016b6 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binary_parser). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 655bbb73..bb44797e 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_binding). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d8f55085..a101886f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel). @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -295,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> - State1 = lock_message(AckRequired, - ack_record(DeliveryTag, ConsumerTag, Msg), - State), - - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); - + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + ok = rabbit_writer:send_command_and_notify( + WriterPid, QPid, self(), + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -695,38 +686,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}} -> - State1 = lock_message(not(NoAck), - ack_record(DeliveryTag, none, Msg), - State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}} -> ok = rabbit_writer:send_command( WriterPid, - #'basic.get_ok'{delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, record_sent(none, not(NoAck), Msg, State)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -746,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of - <<>> -> rabbit_guid:binstring_guid("amq.ctag"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.ctag"); Other -> Other end, @@ -975,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin, false -> none end, ActualNameBin = case QueueNameBin of - <<>> -> rabbit_guid:binstring_guid("amq.gen"); + <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), + "amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), @@ -1083,10 +1066,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> +handle_method(#'tx.commit'{}, _, + State = #ch{uncommitted_message_q = TMQ, + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), ack(TAL, State1), + lists:foreach( + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1094,8 +1082,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:append([L || {_, L} <- TNL]), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1259,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject(DeliveryTag, Requeue, Multiple, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> + reject(Requeue, Acked, State1#ch.limiter), + State1; + in_progress -> + State1#ch{uncommitted_nacks = + [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + end}. + +reject(Requeue, Acked, Limiter) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. - -ack_record(DeliveryTag, ConsumerTag, - _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> - {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + ok = notify_limiter(Limiter, Acked). + +record_sent(ConsumerTag, AckRequired, + Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + State = #ch{unacked_message_q = UAMQ, + next_tag = DeliveryTag, + trace_state = TraceState}) -> + maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), + rabbit_trace:tap_trace_out(Msg, TraceState), + UAMQ1 = case AckRequired of + true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + UAMQ); + false -> UAMQ + end, + State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. collect_acks(Q, 0, true) -> {queue:to_list(Q), queue:new()}; @@ -1305,7 +1323,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1395,11 +1414,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> end end, State#ch{unconfirmed_mq = UMQ1}, QPids). -lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> - State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; -lock_message(false, _MsgStruct, State) -> - State. - send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index a19b6bfd..dc262b49 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup). diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index e2561c80..995c41fb 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup_sup). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index 4ba01b4f..c508f1b9 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_client_sup). diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index a0953eab..adf6e417 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_command_assembler). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index b2aba2ee..12a532b6 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_connection_sup). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 22b57b1a..6a775adf 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -11,13 +11,13 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5, diagnostics/1]). +-export([start/0, stop/0, action/5]). -define(RPC_TIMEOUT, infinity). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -49,7 +49,6 @@ (atom(), node(), [string()], [{string(), any()}], fun ((string(), [any()]) -> 'ok')) -> 'ok'). --spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). -endif. @@ -67,7 +66,7 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Opts1 = [case K of - ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)}; + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; _ -> {K, V} end || {K, V} <- Opts], Command = list_to_atom(Command0), @@ -143,26 +142,7 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> - [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)]. - -diagnostics(Node) -> - {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - [{"diagnostics:", []}, - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]}; - {ok, NamePorts} -> - {"- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]} - end, - {"- current node: ~w", [node()]}, - case init:get_argument(home) of - {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; - Other -> {"- no current node home dir: ~p", [Other]} - end, - {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}]. + fmt_stderr(rabbit_nodes:diagnostics([Node]), []). stop() -> ok. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 6f9a4650..e2928cae 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_direct). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 6e29ace7..f1672f4e 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 7b6e07c1..042ab23c 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger_file_h). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 5ae40c78..4ec141cf 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_event). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 68c0d988..83e28c44 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange). diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index ab3d00dc..44a08e24 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index b485e31f..4bce42d4 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_direct). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 3c029722..cc3fb87c 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_fanout). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index f09e4aae..de9979b4 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_headers). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 91c7b5d3..84f4f8a9 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_topic). @@ -250,7 +250,7 @@ remove_all(Table, Pattern) -> mnesia:match_object(Table, Pattern, write)). new_node_id() -> - rabbit_guid:guid(). + rabbit_guid:gen(). split_topic_key(Key) -> split_topic_key(Key, [], []). @@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> split_topic_key(Rest, [C | RevWordAcc], RevResAcc). - diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 5cb8e7b6..59df14f3 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_file). diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl index da1a6a49..a79188ab 100644 --- a/src/rabbit_framing.erl +++ b/src/rabbit_framing.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% TODO auto-generate diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2d0f5014..f4c425ca 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_guid). @@ -19,7 +19,7 @@ -behaviour(gen_server). -export([start_link/0]). --export([guid/0, string_guid/1, binstring_guid/1]). +-export([gen/0, gen_secure/0, string/2, binary/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,9 +38,10 @@ -type(guid() :: binary()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(guid/0 :: () -> guid()). --spec(string_guid/1 :: (any()) -> string()). --spec(binstring_guid/1 :: (any()) -> binary()). +-spec(gen/0 :: () -> guid()). +-spec(gen_secure/0 :: () -> guid()). +-spec(string/2 :: (guid(), any()) -> string()). +-spec(binary/2 :: (guid(), any()) -> binary()). -endif. @@ -65,11 +66,8 @@ update_disk_serial() -> end, Serial. -%% generate a GUID. -%% -%% The id is only unique within a single cluster and as long as the -%% serial store hasn't been deleted. -guid() -> +%% Generate an un-hashed guid. +fresh() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a %% restart, or ids were generated at a high rate (which causes @@ -78,29 +76,74 @@ guid() -> %% %% A persisted serial number, the node, and a unique reference %% (per node incarnation) uniquely identifies a process in space - %% and time. We combine that with a process-local counter to give - %% us a GUID. - G = case get(guid) of - undefined -> Serial = gen_server:call(?SERVER, serial, infinity), - {{Serial, node(), make_ref()}, 0}; + %% and time. + Serial = gen_server:call(?SERVER, serial, infinity), + {Serial, node(), make_ref()}. + +advance_blocks({B1, B2, B3, B4}, I) -> + %% To produce a new set of blocks, we create a new 32bit block + %% hashing {B5, I}. The new hash is used as last block, and the + %% other three blocks are XORed with it. + %% + %% Doing this is convenient because it avoids cascading conflits, + %% while being very fast. The conflicts are avoided by propagating + %% the changes through all the blocks at each round by XORing, so + %% the only occasion in which a collision will take place is when + %% all 4 blocks are the same and the counter is the same. + %% + %% The range (2^32) is provided explicitly since phash uses 2^27 + %% by default. + B5 = erlang:phash2({B1, I}, 4294967296), + {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}. + +blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>. + +%% generate a GUID. This function should be used when performance is a +%% priority and predictability is not an issue. Otherwise use +%% gen_secure/0. +gen() -> + %% We hash a fresh GUID with md5, split it in 4 blocks, and each + %% time we need a new guid we rotate them producing a new hash + %% with the aid of the counter. Look at the comments in + %% advance_blocks/2 for details. + {BS, I} = case get(guid) of + undefined -> <<B1:32, B2:32, B3:32, B4:32>> = + erlang:md5(term_to_binary(fresh())), + {{B1,B2,B3,B4}, 0}; + {BS0, I0} -> advance_blocks(BS0, I0) + end, + put(guid, {BS, I}), + blocks_to_binary(BS). + +%% generate a non-predictable GUID. +%% +%% The id is only unique within a single cluster and as long as the +%% serial store hasn't been deleted. +%% +%% If you are not concerned with predictability, gen/0 is faster. +gen_secure() -> + %% Here instead of hashing once we hash the GUID and the counter + %% each time, so that the GUID is not predictable. + G = case get(guid_secure) of + undefined -> {fresh(), 0}; {S, I} -> {S, I+1} end, - put(guid, G), + put(guid_secure, G), erlang:md5(term_to_binary(G)). %% generate a readable string representation of a GUID. %% %% employs base64url encoding, which is safer in more contexts than %% plain base64. -string_guid(Prefix) -> +string(G, Prefix) -> Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; ($\/, Acc) -> [$\_ | Acc]; ($\=, Acc) -> Acc; (Chr, Acc) -> [Chr | Acc] - end, [], base64:encode_to_string(guid())). + end, [], base64:encode_to_string(G)). -binstring_guid(Prefix) -> - list_to_binary(string_guid(Prefix)). +binary(G, Prefix) -> + list_to_binary(string(G, Prefix)). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 177ae868..80b4e768 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_heartbeat). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a08d4b6..9fa6213b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_limiter). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 83cead6e..a6b4eeb0 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_log). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index c25a177b..f22ad874 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index ee64b5a8..d0b5bab7 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_coordinator). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index f60562ef..64a4a737 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_master). @@ -280,8 +280,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. -status(#state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQ:status(BQS). +status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:status(BQS) ++ + [ {mirror_seen, dict:size(State #state.seen_status)}, + {mirror_senders, sets:size(State #state.known_senders)} ]. invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 226fbea0..db7d8ecc 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_misc). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1fd927dd..9bf89bce 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave). diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index fc04ec79..8eacb1f3 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave_sup). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0578cf7d..9a6879b1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_misc). @@ -34,11 +34,11 @@ -export([execute_mnesia_transaction/2]). -export([execute_mnesia_tx_with_tail/1]). -export([ensure_ok/2]). --export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). +-export([tcp_name/3]). -export([upmap/2, map_in_order/2]). -export([table_filter/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). --export([format_stderr/2, with_local_io/1, local_info_msg/2]). +-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). -export([sort_field_table/1]). @@ -141,9 +141,6 @@ -spec(execute_mnesia_tx_with_tail/1 :: (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). --spec(makenode/1 :: ({string(), string()} | string()) -> node()). --spec(nodeparts/1 :: (node() | string()) -> {string(), string()}). --spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), inet:ip_address(), rabbit_networking:ip_port()) -> atom()). @@ -155,6 +152,7 @@ -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()). +-spec(format/2 :: (string(), [any()]) -> string()). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(with_local_io/1 :: (fun (() -> A)) -> A). -spec(local_info_msg/2 :: (string(), [any()]) -> 'ok'). @@ -222,7 +220,7 @@ frame_error(MethodName, BinaryFields) -> protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). amqp_error(Name, ExplanationFormat, Params, Method) -> - Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + Explanation = format(ExplanationFormat, Params), #amqp_error{name = Name, explanation = Explanation, method = Method}. protocol_error(Name, ExplanationFormat, Params) -> @@ -276,8 +274,7 @@ val({Type, Value}) -> true -> "~s"; false -> "~w" end, - lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'", - [Value, Type])). + format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]). %% Normally we'd call mnesia:dirty_read/1 here, but that is quite %% expensive due to general mnesia overheads (figuring out table types @@ -320,8 +317,7 @@ r_arg(VHostPath, Kind, Table, Key) -> end. rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> - lists:flatten(io_lib:format("~s '~s' in vhost '~s'", - [Kind, Name, VHostPath])). + format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]). enable_cover() -> enable_cover(["."]). @@ -337,7 +333,7 @@ enable_cover(Dirs) -> end, ok, Dirs). start_cover(NodesS) -> - {ok, _} = cover:start([makenode(N) || N <- NodesS]), + {ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]), ok. report_cover() -> report_cover(["."]). @@ -418,12 +414,25 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of - {atomic, Result} -> Result; - {aborted, Reason} -> throw({error, Reason}) + case worker_pool:submit( + fun () -> + case mnesia:is_transaction() of + false -> DiskLogBefore = mnesia_dumper:get_log_writes(), + Res = mnesia:sync_transaction(TxFun), + DiskLogAfter = mnesia_dumper:get_log_writes(), + case DiskLogAfter == DiskLogBefore of + true -> Res; + false -> {sync, Res} + end; + true -> mnesia:sync_transaction(TxFun) + end + end) of + {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; + {sync, {aborted, Reason}} -> throw({error, Reason}); + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) end. - %% Like execute_mnesia_transaction/1 with additional Pre- and Post- %% commit function execute_mnesia_transaction(TxFun, PrePostCommitFun) -> @@ -450,29 +459,10 @@ execute_mnesia_tx_with_tail(TxFun) -> ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). -makenode({Prefix, Suffix}) -> - list_to_atom(lists:append([Prefix, "@", Suffix])); -makenode(NodeStr) -> - makenode(nodeparts(NodeStr)). - -nodeparts(Node) when is_atom(Node) -> - nodeparts(atom_to_list(Node)); -nodeparts(NodeStr) -> - case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of - {Prefix, []} -> {_, Suffix} = nodeparts(node()), - {Prefix, Suffix}; - {Prefix, Suffix} -> {Prefix, tl(Suffix)} - end. - -cookie_hash() -> - base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). - tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> list_to_atom( - lists:flatten( - io_lib:format("~w_~s:~w", - [Prefix, inet_parse:ntoa(IPAddress), Port]))). + format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). %% This is a modified version of Luke Gorrie's pmap - %% http://lukego.livejournal.com/6753.html - that doesn't care about @@ -541,6 +531,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), dirty_dump_log1(LH, disk_log:chunk(LH, K)). +format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). + format_stderr(Fmt, Args) -> case os:type() of {unix, _} -> @@ -636,7 +628,7 @@ pid_to_string(Pid) when is_pid(Pid) -> <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). + format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]). %% inverse of above string_to_pid(Str) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 0f33a38a..60dd0770 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% @@ -23,8 +23,8 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, - delete_previously_running_nodes/0, running_nodes_filename/0, - is_disc_node/0, on_node_down/1, on_node_up/1]). + running_nodes_filename/0, is_disc_node/0, on_node_down/1, + on_node_up/1]). -export([table_names/0]). @@ -64,7 +64,6 @@ -spec(read_cluster_nodes_config/0 :: () -> [node()]). -spec(record_running_nodes/0 :: () -> 'ok'). -spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -spec(is_disc_node/0 :: () -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). @@ -104,6 +103,7 @@ init() -> %% Mnesia is up. In fact that's not guaranteed to be the case - let's %% make it so. ok = global:sync(), + ok = delete_previously_running_nodes(), ok. is_db_empty() -> @@ -622,10 +622,9 @@ move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), - BackupDir = lists:flatten( - io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", - [MnesiaDir, - Year, Month, Day, Hour, Minute, Second])), + BackupDir = rabbit_misc:format( + "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", + [MnesiaDir, Year, Month, Day, Hour, Minute, Second]), case file:rename(MnesiaDir, BackupDir) of ok -> %% NB: we cannot use rabbit_log here since it may not have diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index b7de27d4..f685b109 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_file). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 495e2976..56265136 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store). diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index d6dc5568..9c31439f 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_ets_index). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 77f1f04e..3b61ed0b 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_gc). diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl index ef8b7cdf..2f36256c 100644 --- a/src/rabbit_msg_store_index.erl +++ b/src/rabbit_msg_store_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_index). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index fef8ae88..02889b93 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_net). @@ -151,10 +151,9 @@ connection_string(Sock, Direction) -> end, case {From(Sock), To(Sock)} of {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> - {ok, lists:flatten( - io_lib:format("~s:~p -> ~s:~p", - [rabbit_misc:ntoab(FromAddress), FromPort, - rabbit_misc:ntoab(ToAddress), ToPort]))}; + {ok, rabbit_misc:format("~s:~p -> ~s:~p", + [rabbit_misc:ntoab(FromAddress), FromPort, + rabbit_misc:ntoab(ToAddress), ToPort])}; {{error, _Reason} = Error, _} -> Error; {_, {error, _Reason} = Error} -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 7355704a..825d1bb1 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_networking). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8aa24ab5..4fc91860 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_node_monitor). diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl new file mode 100644 index 00000000..329c07dc --- /dev/null +++ b/src/rabbit_nodes.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_nodes). + +-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]). + +-define(EPMD_TIMEOUT, 30000). + +%%---------------------------------------------------------------------------- +%% Specs +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(names/1 :: (string()) -> rabbit_types:ok_or_error2( + [{string(), integer()}], term())). +-spec(diagnostics/1 :: ([node()]) -> string()). +-spec(make/1 :: ({string(), string()} | string()) -> node()). +-spec(parts/1 :: (node() | string()) -> {string(), string()}). +-spec(cookie_hash/0 :: () -> string()). + +-endif. + +%%---------------------------------------------------------------------------- + +names(Hostname) -> + Self = self(), + process_flag(trap_exit, true), + Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), + Res = receive + {names, Names} -> Names; + {'EXIT', Pid, Reason} -> {error, Reason} + end, + process_flag(trap_exit, false), + Res. + +diagnostics(Nodes) -> + Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]), + NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n" + "nodes in question: ~p~n~n" + "hosts, their running nodes and ports:", [Nodes]}] ++ + [diagnostics_host(Host) || Host <- Hosts] ++ + diagnostics0(), + lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags, + {F, A} <- [NodeDiag]]). + +diagnostics0() -> + [{"~ncurrent node details:~n- node name: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- home dir: ~s", [Home]}; + Other -> {"- no home dir: ~p", [Other]} + end, + {"- cookie hash: ~s", [cookie_hash()]}]. + +diagnostics_host(Host) -> + case names(Host) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [Host, EpmdReason]}; + {ok, NamePorts} -> + {"- ~s: ~p", + [Host, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end. + +make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix])); +make(NodeStr) -> make(parts(NodeStr)). + +parts(Node) when is_atom(Node) -> + parts(atom_to_list(Node)); +parts(NodeStr) -> + case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of + {Prefix, []} -> {_, Suffix} = parts(node()), + {Prefix, Suffix}; + {Prefix, Suffix} -> {Prefix, tl(Suffix)} + end. + +cookie_hash() -> + base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index de2ba8ad..7b85ab15 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. %% -module(rabbit_plugins). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 50444dc4..162d44f1 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_prelaunch). @@ -22,7 +22,6 @@ -define(BaseApps, [rabbit]). -define(ERROR_CODE, 1). --define(EPMD_TIMEOUT, 30000). %%---------------------------------------------------------------------------- %% Specs @@ -244,16 +243,15 @@ duplicate_node_check([]) -> %% Ignore running node while installing windows service ok; duplicate_node_check(NodeStr) -> - Node = rabbit_misc:makenode(NodeStr), - {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case names(NodeHost) of + Node = rabbit_nodes:make(NodeStr), + {NodeName, NodeHost} = rabbit_nodes:parts(Node), + case rabbit_nodes:names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of true -> io:format("node with name ~p " "already running on ~p~n", [NodeName, NodeHost]), - [io:format(Fmt ++ "~n", Args) || - {Fmt, Args} <- rabbit_control:diagnostics(Node)], + io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"), terminate(?ERROR_CODE); false -> ok end; @@ -279,15 +277,3 @@ terminate(Status) -> after infinity -> ok end end. - -names(Hostname) -> - Self = self(), - process_flag(trap_exit, true), - Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), - timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - Res = receive - {names, Names} -> Names; - {'EXIT', Pid, Reason} -> {error, Reason} - end, - process_flag(trap_exit, false), - Res. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 9b45e798..df957d88 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_collector). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0a12b289..6ab2de89 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_queue_index). @@ -496,7 +496,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), - lists:flatten(io_lib:format("~.36B", [Num])). + rabbit_misc:format("~.36B", [Num]). queues_dir() -> filename:join(rabbit_mnesia:dir(), "queues"). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 5ebe65c9..908a279c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_reader). @@ -265,10 +265,9 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], buf_len = BufLen + size(Data), pending_recv = false}); - closed -> if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) + closed -> case State#v1.connection_state of + closed -> State; + _ -> throw(connection_closed_abruptly) end; {error, Reason} -> throw({inet_error, Reason}); {other, Other} -> handle_other(Other, Deb, State) @@ -500,23 +499,21 @@ handle_frame(Type, Channel, Payload, case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - AnalyzedFrame -> - case get({channel, Channel}) of - {ChPid, AState} -> - NewAState = process_channel_frame( - AnalyzedFrame, Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, - control_throttle(State)); - undefined -> - case ?IS_RUNNING(State) of - true -> send_to_new_channel( - Channel, AnalyzedFrame, State); - false -> throw({channel_frame_while_starting, - Channel, State#v1.connection_state, - AnalyzedFrame}) - end - end + AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State) + end. + +process_frame(Frame, Channel, State) -> + case get({channel, Channel}) of + {ChPid, AState} -> + NewAState = process_channel_frame(Frame, Channel, ChPid, AState), + put({channel, Channel}, {ChPid, NewAState}), + post_process_frame(Frame, ChPid, State); + undefined when ?IS_RUNNING(State) -> + ok = create_channel(Channel, State), + process_frame(Frame, Channel, State); + undefined -> + throw({channel_frame_while_starting, + Channel, State#v1.connection_state, Frame}) end. post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> @@ -527,11 +524,11 @@ post_process_frame({method, MethodName, _}, _ChPid, protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - maybe_block(State); - false -> State + maybe_block(control_throttle(State)); + false -> control_throttle(State) end; post_process_frame(_Frame, _ChPid, State) -> - State. + control_throttle(State). handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( @@ -896,7 +893,7 @@ cert_info(F, Sock) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, State) -> +create_channel(Channel, State) -> #v1{sock = Sock, queue_collector = Collector, channel_sup_sup_pid = ChanSupSup, connection = #connection{protocol = Protocol, @@ -909,10 +906,9 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), - put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), - State. + put({channel, Channel}, {ChPid, AState}), + ok. process_channel_frame(Frame, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 9821ae7b..8c0ebcbe 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_registry). diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index 1a08efed..237ab78c 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_restartable_sup). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 219833b7..f4bbda0f 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_router). diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index 963294d9..e8beecfe 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sasl_report_file_h). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e524446e..3025d981 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_ssl). @@ -72,9 +72,8 @@ peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { tbsCertificate = #'OTPTBSCertificate' { validity = {'Validity', Start, End} }}) -> - lists:flatten( - io_lib:format("~s - ~s", [format_asn1_value(Start), - format_asn1_value(End)])) + rabbit_misc:format("~s - ~s", [format_asn1_value(Start), + format_asn1_value(End)]) end, Cert). %%-------------------------------------------------------------------------- @@ -155,8 +154,7 @@ escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 -> %% purposes it's handy to escape all non-printable chars. All non-ASCII %% characters get converted to UTF-8 sequences and then escaped. We've %% already got a UTF-8 sequence here, so just escape it. - lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++ - escape_rdn_value(S, middle); + rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle); escape_rdn_value([C | S], middle) -> [C | escape_rdn_value(S, middle)]. diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index 802ea5e2..0965e3b3 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_sup). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 343e79e5..7a96af26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests). @@ -59,7 +59,7 @@ all_tests() -> passed. maybe_run_cluster_dependent_tests() -> - SecondaryNode = rabbit_misc:makenode("hare"), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = run_cluster_dependent_tests(SecondaryNode); @@ -859,7 +859,7 @@ test_cluster_management() -> "invalid2@invalid"]), ok = assert_ram_node(), - SecondaryNode = rabbit_misc:makenode("hare"), + SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); pang -> io:format("Skipping clustering tests with node ~p~n", @@ -1786,10 +1786,10 @@ test_msg_store() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), {Cap, MSCState} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref), - Ref2 = rabbit_guid:guid(), + Ref2 = rabbit_guid:gen(), {Cap2, MSC2State} = msg_store_client_init_capture( ?PERSISTENT_MSG_STORE, Ref2), %% check we don't contain any of the msgs we're about to publish @@ -1941,7 +1941,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) -> passed. test_msg_store_confirm_timer() -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), MSCState = rabbit_msg_store:client_init( @@ -1970,7 +1970,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) -> test_msg_store_client_delete_and_terminate() -> restart_msg_store_empty(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), ok = msg_store_write(MsgIds, MSCState), %% test the 'dying client' fast path for writes @@ -1986,7 +1986,7 @@ test_queue() -> init_test_queue() -> TestQueue = test_queue(), Terms = rabbit_queue_index:shutdown_terms(TestQueue), - PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), + PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()), PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef), Res = rabbit_queue_index:recover( TestQueue, Terms, false, @@ -2020,7 +2020,7 @@ restart_app() -> rabbit:start(). queue_index_publish(SeqIds, Persistent, Qi) -> - Ref = rabbit_guid:guid(), + Ref = rabbit_guid:gen(), MsgStore = case Persistent of true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE @@ -2029,7 +2029,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {A, B = [{_SeqId, LastMsgIdWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), QiM = rabbit_queue_index:publish( MsgId, SeqId, #message_properties{}, Persistent, QiN), ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), @@ -2052,7 +2052,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - MsgId = rabbit_guid:guid(), + MsgId = rabbit_guid:gen(), Props = #message_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0), {[{MsgId, 1, Props, _, _}], Qi2} = diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl index abcbe0b6..72c07b51 100644 --- a/src/rabbit_tests_event_receiver.erl +++ b/src/rabbit_tests_event_receiver.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_tests_event_receiver). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 58079ccf..3a5b96de 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_trace). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index ae2b5d3f..732c29b6 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_types). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 717d94a8..80f50b38 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index f164035e..9f2535bd 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade_functions). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3e605f04..7f4fdb67 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_variable_queue). @@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, Terms1} = case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:guid(), []}; + undefined -> {rabbit_guid:gen(), []}; PRef1 -> {PRef1, Terms} end, PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, @@ -865,7 +865,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> - msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback). + msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, + Callback). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index f6bcbb7f..7545d813 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_version). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 38bb76b0..5548ef6d 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_vhost). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f6062e06..dc74b2f5 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(rabbit_writer). @@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, ok = internal_send_command_async(MethodRecord, Content, State), rabbit_amqqueue:notify_sent(QPid, ChPid), State; +handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> + rabbit_amqqueue:notify_sent_queue_down(QPid), + State; handle_message({inet_reply, _, ok}, State) -> State; handle_message({inet_reply, _, Status}, _State) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 26ea502c..a2f4fae9 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -41,7 +41,7 @@ %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% -%% All modifications are (C) 2010-2011 VMware, Inc. +%% All modifications are (C) 2010-2012 VMware, Inc. %% %% %CopyrightBegin% %% diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 88da74c5..43a6bc99 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor). diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index cb3dd02c..d8844441 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_acceptor_sup). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index e5db4c9f..fb01c792 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener). diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 74297b6d..9ee921b4 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(tcp_listener_sup). diff --git a/src/test_sup.erl b/src/test_sup.erl index 5feb146f..7f4b5049 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(test_sup). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 8973a4f7..fca55f02 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% %% In practice Erlang shouldn't be allowed to grow to more than a half diff --git a/src/worker_pool.erl b/src/worker_pool.erl index fcb07a16..c9ecccd6 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool). diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index d37c3a0f..ff356366 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_sup). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index b42530e2..1ddcebb2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% -module(worker_pool_worker). |