summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-09 16:53:24 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-09 16:53:24 +0000
commitff53141a67e48c7363e1910157904f698c01f61e (patch)
treed808139d5899ac7fd664c20a1d8d94a66c96cbe8
parentf8763d64de0c28f608bb10d22c64d8725deb1990 (diff)
parent54250f72aac333157ff6a4497ab69682a3bc27fb (diff)
downloadrabbitmq-server-ff53141a67e48c7363e1910157904f698c01f61e.tar.gz
Merged default
-rw-r--r--LICENSE-MPL-RabbitMQ2
-rw-r--r--codegen.py4
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl6
-rw-r--r--include/rabbit_auth_backend_spec.hrl2
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--include/rabbit_msg_store_index.hrl2
-rw-r--r--packaging/common/LICENSE.tail4
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf2
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rwxr-xr-xscripts/rabbitmq-env13
-rwxr-xr-xscripts/rabbitmq-plugins9
-rwxr-xr-xscripts/rabbitmq-plugins.bat2
-rwxr-xr-xscripts/rabbitmq-server39
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rwxr-xr-xscripts/rabbitmqctl8
-rwxr-xr-x[-rw-r--r--]scripts/rabbitmqctl.bat2
-rw-r--r--src/credit_flow.erl104
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/gm.erl2
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/mnesia_sync.erl77
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl25
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl2
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_basic.erl4
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl148
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl2
-rw-r--r--src/rabbit_control.erl28
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl2
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_exchange_type_topic.erl5
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl83
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl66
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl9
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_nodes.erl94
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_prelaunch.erl24
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_reader.erl54
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_router.erl2
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl10
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_tests.erl22
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_writer.erl5
-rw-r--r--src/supervisor2.erl2
-rw-r--r--src/tcp_acceptor.erl2
-rw-r--r--src/tcp_acceptor_sup.erl2
-rw-r--r--src/tcp_listener.erl2
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl2
129 files changed, 691 insertions, 472 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index 14bcc21d..d50e32ef 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/codegen.py b/codegen.py
index 494be73d..9483e854 100644
--- a/codegen.py
+++ b/codegen.py
@@ -11,7 +11,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
from __future__ import nested_scopes
@@ -118,7 +118,7 @@ def printFileHeader():
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%"""
def genErl(spec):
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index ee29706e..a317e63b 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index f6a8a303..faf3059a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -86,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
@@ -95,7 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(CREDIT_DISC_BOUND, {2000, 1500}).
+-define(CREDIT_DISC_BOUND, {2000, 500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
index 803bb75c..61a2e22a 100644
--- a/include/rabbit_auth_backend_spec.hrl
+++ b/include/rabbit_auth_backend_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
index 614a3eed..9a2f5e05 100644
--- a/include/rabbit_auth_mechanism_spec.hrl
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 4a657951..2a8cc13c 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-type(fetch_result(Ack) ::
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index f6283ef7..8f7e22d3 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index e9150a97..f7c10bd8 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index 2ae5b000..75d7eb71 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit_msg_store.hrl").
diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail
index 5d842cc1..b9c2629b 100644
--- a/packaging/common/LICENSE.tail
+++ b/packaging/common/LICENSE.tail
@@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
MOZILLA PUBLIC LICENSE
@@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0436f546..0e59c218 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Escape spaces and quotes, because shell is revolting.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index e6776eff..14557286 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
##
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 8696427e..79e9c1dd 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -34,7 +34,7 @@ package: clean
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
- echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 45f5c5c4..fb02cd6a 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
Package: rabbitmq-server
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 27e4e1dc..91510991 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index a2ef8d3c..1fd1339d 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Determine where this script is really located
@@ -36,7 +36,16 @@ RABBITMQ_HOME="${SCRIPT_DIR}/.."
[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
NODENAME=rabbit@${HOSTNAME%%.*}
-# Load configuration from the rabbitmq.conf file
+## Set (non-empty) default values for rabbitmq-env.conf variables to override
+SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
+-kernel inet_default_connect_options [{nodelay,true}]"
+CONFIG_FILE=/etc/rabbitmq/rabbitmq
+LOG_BASE=/var/log/rabbitmq
+MNESIA_BASE=/var/lib/rabbitmq/mnesia
+PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
+
+## Load configuration from the rabbitmq.conf file
if [ -f /etc/rabbitmq/rabbitmq.conf ] && \
[ ! -f /etc/rabbitmq/rabbitmq-env.conf ] ; then
echo -n "WARNING: ignoring /etc/rabbitmq/rabbitmq.conf -- "
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 4c6cb1fa..14a18d57 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -12,16 +12,19 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
-ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+##--- End of overridden <var_name> variables
exec erl \
-pa "${RABBITMQ_HOME}/ebin" \
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index ca874a7f..66a900a1 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 39a68c8e..0a5a4640 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -12,33 +12,23 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
-SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
--kernel inet_default_connect_options [{nodelay,true}]"
-CONFIG_FILE=/etc/rabbitmq/rabbitmq
-LOG_BASE=/var/log/rabbitmq
-MNESIA_BASE=/var/lib/rabbitmq/mnesia
-SERVER_START_ARGS=
-ENABLED_PLUGINS_FILE=/etc/rabbitmq/enabled_plugins
-
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
DEFAULT_NODE_IP_ADDRESS=auto
DEFAULT_NODE_PORT=5672
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ]
-then
- if [ "x" != "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
- fi
-else
- if [ "x" = "x$RABBITMQ_NODE_PORT" ]
- then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
- fi
-fi
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
+[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
+
+[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS}
+[ "x" != "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT}
+
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE}
@@ -48,13 +38,16 @@ fi
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
+
+[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${PID_FILE}
[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR}
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
-[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
@@ -62,6 +55,8 @@ fi
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
+##--- End of overridden <var_name> variables
+
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 44ce1ce1..ca49a5d8 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 1582bfb1..9e274840 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 9a11c3b3..4aad6b8f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -12,14 +12,20 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
+# Get default settings with user overrides for (RABBITMQ_)<var_name>
+# Non-empty defaults should be set in rabbitmq-env
. `dirname $0`/rabbitmq-env
+##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+##--- End of overridden <var_name> variables
+
exec erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index a74a91fd..f37fae48 100644..100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 397aa191..072f4d9d 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -11,23 +11,26 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(credit_flow).
-%% Credit starts at MaxCredit and goes down. Both sides keep
-%% track. When the receiver goes below MoreCreditAt it issues more
-%% credit by sending a message to the sender. The sender should pass
-%% this message in to handle_bump_msg/1. The sender should block when
-%% it goes below 0 (check by invoking blocked/0). If a process is both
-%% a sender and a receiver it will not grant any more credit to its
-%% senders when it is itself blocked - thus the only processes that
-%% need to check blocked/0 are ones that read from network sockets.
-
--define(DEFAULT_CREDIT, {200, 150}).
-
--export([ack/1, ack/2, handle_bump_msg/1, blocked/0, send/1, send/2]).
+%% Credit flow is controlled by a credit specification - a
+%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
+%% credit starts at InitialCredit and is decremented with every
+%% message sent. The message receiver grants more credit to the sender
+%% by sending it a {bump_credit, ...} control message after receiving
+%% MoreCreditAfter messages. The sender should pass this message in to
+%% handle_bump_msg/1. The sender should block when it goes below 0
+%% (check by invoking blocked/0). If a process is both a sender and a
+%% receiver it will not grant any more credit to its senders when it
+%% is itself blocked - thus the only processes that need to check
+%% blocked/0 are ones that read from network sockets.
+
+-define(DEFAULT_CREDIT, {200, 50}).
+
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -35,14 +38,14 @@
-ifdef(use_specs).
-opaque(bump_msg() :: {pid(), non_neg_integer()}).
--opaque(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(ack/1 :: (pid()) -> 'ok').
-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
-spec(blocked/0 :: () -> boolean()).
--spec(send/1 :: (pid()) -> 'ok').
--spec(send/2 :: (pid(), credit_spec()) -> 'ok').
-spec(peer_down/1 :: (pid()) -> 'ok').
-endif.
@@ -60,39 +63,32 @@
%% For any given pair of processes, ack/2 and send/2 must always be
%% called with the same credit_spec().
-ack(To) -> ack(To, ?DEFAULT_CREDIT).
+send(From) -> send(From, ?DEFAULT_CREDIT).
-ack(To, {MaxCredit, MoreCreditAt}) ->
- MoreCreditAt1 = MoreCreditAt + 1,
- Credit =
- case get({credit_to, To}, MaxCredit) of
- MoreCreditAt1 -> grant(To, MaxCredit - MoreCreditAt),
- MaxCredit;
- C -> C - 1
- end,
- put({credit_to, To}, Credit).
+send(From, {InitialCredit, _MoreCreditAfter}) ->
+ update({credit_from, From}, InitialCredit,
+ fun (1) -> block(From),
+ 0;
+ (C) -> C - 1
+ end).
-handle_bump_msg({From, MoreCredit}) ->
- Credit = get({credit_from, From}, 0) + MoreCredit,
- put({credit_from, From}, Credit),
- case Credit > 0 of
- true -> unblock(From),
- ok;
- false -> ok
- end.
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
-blocked() ->
- get(credit_blocked, []) =/= [].
+ack(To, {_InitialCredit, MoreCreditAfter}) ->
+ update({credit_to, To}, MoreCreditAfter,
+ fun (1) -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ (C) -> C - 1
+ end).
-send(From) -> send(From, ?DEFAULT_CREDIT).
+handle_bump_msg({From, MoreCredit}) ->
+ update({credit_from, From}, 0,
+ fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ (C) -> C + MoreCredit
+ end).
-send(From, {MaxCredit, _MoreCreditAt}) ->
- Credit = get({credit_from, From}, MaxCredit) - 1,
- case Credit of
- 0 -> block(From);
- _ -> ok
- end,
- put({credit_from, From}, Credit).
+blocked() -> get(credit_blocked, []) =/= [].
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -108,20 +104,18 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> Deferred = get(credit_deferred, []),
- put(credit_deferred, [{To, Msg} | Deferred])
+ true -> update(credit_deferred, [],
+ fun (Deferred) -> [{To, Msg} | Deferred] end)
end.
-block(From) ->
- put(credit_blocked, [From | get(credit_blocked, [])]).
+block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
unblock(From) ->
- NewBlocks = get(credit_blocked, []) -- [From],
- put(credit_blocked, NewBlocks),
- case NewBlocks of
- [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
- _ -> ok
+ update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ case blocked() of
+ false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ true -> ok
end.
get(Key, Default) ->
@@ -129,3 +123,5 @@ get(Key, Default) ->
undefined -> Default;
Value -> Value
end.
+
+update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index edb4eba4..d595e481 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 4c131a6c..2a8b915b 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index a362c94f..f3b4dbaf 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
diff --git a/src/gatherer.erl b/src/gatherer.erl
index fe976b50..98b36038 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 49913d26..f8537487 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2011 VMware, Inc.
+%% All modifications are (C) 2009-2012 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
diff --git a/src/gm.erl b/src/gm.erl
index 6c899122..6f9ff564 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm).
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5e5a3a5a..57217541 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index defb0f29..dad75bd4 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index ca0ffd64..0a2d4204 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 04b40706..c4e046b5 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 3ba8f50d..a599effa 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index d48a9ca5..e8baabe8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
new file mode 100644
index 00000000..a3773d90
--- /dev/null
+++ b/src/mnesia_sync.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(mnesia_sync).
+
+%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk
+%% at commit. This module is an attempt to minimise the risk of data loss by
+%% performing a coalesced log fsync. Unfortunately this is performed regardless
+%% of whether or not the log was appended to.
+
+-behaviour(gen_server).
+
+-export([sync/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {waiting, disc_node}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+sync() ->
+ gen_server:call(?SERVER, sync, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}.
+
+handle_call(sync, _From, #state{disc_node = false} = State) ->
+ {reply, ok, State};
+handle_call(sync, From, #state{waiting = Waiting} = State) ->
+ {noreply, State#state{waiting = [From | Waiting]}, 0};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(timeout, #state{waiting = Waiting} = State) ->
+ ok = disk_log:sync(latest_log),
+ [gen_server:reply(From, ok) || From <- Waiting],
+ {noreply, State#state{waiting = []}};
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/pg_local.erl b/src/pg_local.erl
index c9c3a3a7..e2e82f1f 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4fc8b469..780fa2e9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3dcd4938..0a0ca90a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -45,6 +45,12 @@
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({database_sync,
+ [{description, "database sync"},
+ {mfa, {rabbit_sup, start_child, [mnesia_sync]}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -360,10 +366,8 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
- ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
-
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
@@ -501,6 +505,19 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ {Err, Nodes} =
+ case rabbit_mnesia:read_previously_running_nodes() of
+ [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
+ " shut down forcefully~nit cannot determine which nodes"
+ " are timing out. Details on all nodes will~nfollow.~n",
+ rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ Ns -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes: ~p.~n", [Ns]),
+ Ns}
+ end,
+ boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
+
boot_step_error(Reason, Stacktrace) ->
boot_error("Error description:~n ~p~n~n"
"Log files (may contain more information):~n ~s~n ~s~n~n"
@@ -662,7 +679,7 @@ print_banner() ->
{"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
- {"cookie hash", rabbit_misc:cookie_hash()},
+ {"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ec9affa6..75c53511 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 517dd4ec..187ec1ab 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 94a99a49..a7dfd535 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
@@ -25,7 +25,7 @@
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -137,6 +139,7 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
@@ -461,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f63a09d3..b3a620fa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) ->
{_, _} -> ok
end.
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ {true, State};
+deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case PredFun(FunAcc, State) of
- false -> {FunAcc, State};
- true -> case queue:out(ActiveConsumers) of
- {empty, _} ->
- {FunAcc, State};
- {{value, QEntry}, Tail} ->
- {FunAcc1, State1} =
- deliver_msg_to_consumer(
+ case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {false, State};
+ {{value, QEntry}, Tail} ->
+ {Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry,
- FunAcc, State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(Funs, FunAcc1, State1)
- end
+ State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {FunAcc, State};
+ {false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
- {FunAcc, State};
+ {false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C, FunAcc,
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end.
@@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
+ State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, Stop, State1} =
+ DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {FunAcc1, State1}.
-
-deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
+ {Stop, State1}.
-deliver_from_queue_deliver(AckRequired, false, State) ->
+deliver_from_queue_deliver(AckRequired, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
@@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
- Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
- IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = ChPid,
@@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false,
- State1 = #q{backing_queue_state = BQS2}) ->
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
%% we don't need an expiry here because
%% messages are not being enqueued, so we use
%% an empty message_properties.
@@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State1#q{backing_queue_state = BQS3}}
end,
{Delivered, State2} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ deliver_msgs_to_consumers(DeliverFun, false,
State#q{backing_queue_state = BQS1}),
{Delivered, Confirm, State2};
{Duplicate, BQS1} ->
@@ -830,7 +823,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -1064,11 +1057,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 7b3ebcf2..a4305e5f 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index ade158bb..e0e252b8 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 086a90b4..3ef81d32 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 897199ee..0c8251b8 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index b8682a46..3de6e7a6 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index acbb6e48..64b01d8e 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 2448acb6..19fb5875 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c3b322ee..364eb8f6 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index c61184a6..7b00fa5f 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b116821c..b8211d43 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
@@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 494f3203..d69376fb 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index f3ca4e98..5f0016b6 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 655bbb73..bb44797e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d8f55085..a101886f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -295,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -695,38 +686,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -746,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -975,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -1083,10 +1066,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1094,8 +1082,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1259,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1305,7 +1323,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1395,11 +1414,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a19b6bfd..dc262b49 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index e2561c80..995c41fb 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 4ba01b4f..c508f1b9 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index a0953eab..adf6e417 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b2aba2ee..12a532b6 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 22b57b1a..6a775adf 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1]).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -49,7 +49,6 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
--spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -67,7 +66,7 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
_ -> {K, V}
end || {K, V} <- Opts],
Command = list_to_atom(Command0),
@@ -143,26 +142,7 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
-
-diagnostics(Node) ->
- {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [{"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
+ fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
stop() ->
ok.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 6f9a4650..e2928cae 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 6e29ace7..f1672f4e 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7b6e07c1..042ab23c 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 5ae40c78..4ec141cf 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 68c0d988..83e28c44 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index ab3d00dc..44a08e24 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index b485e31f..4bce42d4 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 3c029722..cc3fb87c 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index f09e4aae..de9979b4 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 91c7b5d3..84f4f8a9 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
@@ -250,7 +250,7 @@ remove_all(Table, Pattern) ->
mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
- rabbit_guid:guid().
+ rabbit_guid:gen().
split_topic_key(Key) ->
split_topic_key(Key, [], []).
@@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
-
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 5cb8e7b6..59df14f3 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index da1a6a49..a79188ab 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2d0f5014..f4c425ca 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([guid/0, string_guid/1, binstring_guid/1]).
+-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,10 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
+-spec(gen/0 :: () -> guid()).
+-spec(gen_secure/0 :: () -> guid()).
+-spec(string/2 :: (guid(), any()) -> string()).
+-spec(binary/2 :: (guid(), any()) -> binary()).
-endif.
@@ -65,11 +66,8 @@ update_disk_serial() ->
end,
Serial.
-%% generate a GUID.
-%%
-%% The id is only unique within a single cluster and as long as the
-%% serial store hasn't been deleted.
-guid() ->
+%% Generate an un-hashed guid.
+fresh() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
%% restart, or ids were generated at a high rate (which causes
@@ -78,29 +76,74 @@ guid() ->
%%
%% A persisted serial number, the node, and a unique reference
%% (per node incarnation) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID.
- G = case get(guid) of
- undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
- {{Serial, node(), make_ref()}, 0};
+ %% and time.
+ Serial = gen_server:call(?SERVER, serial, infinity),
+ {Serial, node(), make_ref()}.
+
+advance_blocks({B1, B2, B3, B4}, I) ->
+ %% To produce a new set of blocks, we create a new 32bit block
+ %% hashing {B5, I}. The new hash is used as last block, and the
+ %% other three blocks are XORed with it.
+ %%
+ %% Doing this is convenient because it avoids cascading conflits,
+ %% while being very fast. The conflicts are avoided by propagating
+ %% the changes through all the blocks at each round by XORing, so
+ %% the only occasion in which a collision will take place is when
+ %% all 4 blocks are the same and the counter is the same.
+ %%
+ %% The range (2^32) is provided explicitly since phash uses 2^27
+ %% by default.
+ B5 = erlang:phash2({B1, I}, 4294967296),
+ {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
+
+blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
+
+%% generate a GUID. This function should be used when performance is a
+%% priority and predictability is not an issue. Otherwise use
+%% gen_secure/0.
+gen() ->
+ %% We hash a fresh GUID with md5, split it in 4 blocks, and each
+ %% time we need a new guid we rotate them producing a new hash
+ %% with the aid of the counter. Look at the comments in
+ %% advance_blocks/2 for details.
+ {BS, I} = case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
+ erlang:md5(term_to_binary(fresh())),
+ {{B1,B2,B3,B4}, 0};
+ {BS0, I0} -> advance_blocks(BS0, I0)
+ end,
+ put(guid, {BS, I}),
+ blocks_to_binary(BS).
+
+%% generate a non-predictable GUID.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% serial store hasn't been deleted.
+%%
+%% If you are not concerned with predictability, gen/0 is faster.
+gen_secure() ->
+ %% Here instead of hashing once we hash the GUID and the counter
+ %% each time, so that the GUID is not predictable.
+ G = case get(guid_secure) of
+ undefined -> {fresh(), 0};
{S, I} -> {S, I+1}
end,
- put(guid, G),
+ put(guid_secure, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
-string_guid(Prefix) ->
+string(G, Prefix) ->
Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(guid())).
+ end, [], base64:encode_to_string(G)).
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
+binary(G, Prefix) ->
+ list_to_binary(string(G, Prefix)).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 177ae868..80b4e768 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a08d4b6..9fa6213b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 83cead6e..a6b4eeb0 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index c25a177b..f22ad874 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index ee64b5a8..d0b5bab7 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index f60562ef..64a4a737 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
@@ -280,8 +280,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
-status(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS).
+status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:status(BQS) ++
+ [ {mirror_seen, dict:size(State #state.seen_status)},
+ {mirror_senders, sets:size(State #state.known_senders)} ].
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 226fbea0..db7d8ecc 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1fd927dd..9bf89bce 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index fc04ec79..8eacb1f3 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0578cf7d..9a6879b1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -34,11 +34,11 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
+-export([tcp_name/3]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -141,9 +141,6 @@
-spec(execute_mnesia_tx_with_tail/1 ::
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> node()).
--spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
--spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
@@ -155,6 +152,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -222,7 +220,7 @@ frame_error(MethodName, BinaryFields) ->
protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
amqp_error(Name, ExplanationFormat, Params, Method) ->
- Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ Explanation = format(ExplanationFormat, Params),
#amqp_error{name = Name, explanation = Explanation, method = Method}.
protocol_error(Name, ExplanationFormat, Params) ->
@@ -276,8 +274,7 @@ val({Type, Value}) ->
true -> "~s";
false -> "~w"
end,
- lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'",
- [Value, Type])).
+ format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]).
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive due to general mnesia overheads (figuring out table types
@@ -320,8 +317,7 @@ r_arg(VHostPath, Kind, Table, Key) ->
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
- lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
- [Kind, Name, VHostPath])).
+ format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]).
enable_cover() -> enable_cover(["."]).
@@ -337,7 +333,7 @@ enable_cover(Dirs) ->
end, ok, Dirs).
start_cover(NodesS) ->
- {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ {ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]),
ok.
report_cover() -> report_cover(["."]).
@@ -418,12 +414,25 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> Result;
- {aborted, Reason} -> throw({error, Reason})
+ case worker_pool:submit(
+ fun () ->
+ case mnesia:is_transaction() of
+ false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
+ Res = mnesia:sync_transaction(TxFun),
+ DiskLogAfter = mnesia_dumper:get_log_writes(),
+ case DiskLogAfter == DiskLogBefore of
+ true -> Res;
+ false -> {sync, Res}
+ end;
+ true -> mnesia:sync_transaction(TxFun)
+ end
+ end) of
+ {sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
+ {sync, {aborted, Reason}} -> throw({error, Reason});
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
end.
-
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
%% commit function
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
@@ -450,29 +459,10 @@ execute_mnesia_tx_with_tail(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-makenode({Prefix, Suffix}) ->
- list_to_atom(lists:append([Prefix, "@", Suffix]));
-makenode(NodeStr) ->
- makenode(nodeparts(NodeStr)).
-
-nodeparts(Node) when is_atom(Node) ->
- nodeparts(atom_to_list(Node));
-nodeparts(NodeStr) ->
- case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
- {Prefix, []} -> {_, Suffix} = nodeparts(node()),
- {Prefix, Suffix};
- {Prefix, Suffix} -> {Prefix, tl(Suffix)}
- end.
-
-cookie_hash() ->
- base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
-
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
- lists:flatten(
- io_lib:format("~w_~s:~w",
- [Prefix, inet_parse:ntoa(IPAddress), Port]))).
+ format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
@@ -541,6 +531,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -636,7 +628,7 @@ pid_to_string(Pid) when is_pid(Pid) ->
<<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
+ format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
string_to_pid(Str) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 0f33a38a..60dd0770 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
@@ -23,8 +23,8 @@
empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_nodes/0, read_previously_running_nodes/0,
- delete_previously_running_nodes/0, running_nodes_filename/0,
- is_disc_node/0, on_node_down/1, on_node_up/1]).
+ running_nodes_filename/0, is_disc_node/0, on_node_down/1,
+ on_node_up/1]).
-export([table_names/0]).
@@ -64,7 +64,6 @@
-spec(read_cluster_nodes_config/0 :: () -> [node()]).
-spec(record_running_nodes/0 :: () -> 'ok').
-spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(delete_previously_running_nodes/0 :: () -> 'ok').
-spec(running_nodes_filename/0 :: () -> file:filename()).
-spec(is_disc_node/0 :: () -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -104,6 +103,7 @@ init() ->
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
ok = global:sync(),
+ ok = delete_previously_running_nodes(),
ok.
is_db_empty() ->
@@ -622,10 +622,9 @@ move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
- BackupDir = lists:flatten(
- io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
- [MnesiaDir,
- Year, Month, Day, Hour, Minute, Second])),
+ BackupDir = rabbit_misc:format(
+ "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
+ [MnesiaDir, Year, Month, Day, Hour, Minute, Second]),
case file:rename(MnesiaDir, BackupDir) of
ok ->
%% NB: we cannot use rabbit_log here since it may not have
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index b7de27d4..f685b109 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 495e2976..56265136 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index d6dc5568..9c31439f 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 77f1f04e..3b61ed0b 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index ef8b7cdf..2f36256c 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index fef8ae88..02889b93 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
@@ -151,10 +151,9 @@ connection_string(Sock, Direction) ->
end,
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
- {ok, lists:flatten(
- io_lib:format("~s:~p -> ~s:~p",
- [rabbit_misc:ntoab(FromAddress), FromPort,
- rabbit_misc:ntoab(ToAddress), ToPort]))};
+ {ok, rabbit_misc:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort])};
{{error, _Reason} = Error, _} ->
Error;
{_, {error, _Reason} = Error} ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 7355704a..825d1bb1 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8aa24ab5..4fc91860 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
new file mode 100644
index 00000000..329c07dc
--- /dev/null
+++ b/src/rabbit_nodes.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_nodes).
+
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]).
+
+-define(EPMD_TIMEOUT, 30000).
+
+%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(names/1 :: (string()) -> rabbit_types:ok_or_error2(
+ [{string(), integer()}], term())).
+-spec(diagnostics/1 :: ([node()]) -> string()).
+-spec(make/1 :: ({string(), string()} | string()) -> node()).
+-spec(parts/1 :: (node() | string()) -> {string(), string()}).
+-spec(cookie_hash/0 :: () -> string()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+names(Hostname) ->
+ Self = self(),
+ process_flag(trap_exit, true),
+ Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ Res = receive
+ {names, Names} -> Names;
+ {'EXIT', Pid, Reason} -> {error, Reason}
+ end,
+ process_flag(trap_exit, false),
+ Res.
+
+diagnostics(Nodes) ->
+ Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
+ NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
+ "nodes in question: ~p~n~n"
+ "hosts, their running nodes and ports:", [Nodes]}] ++
+ [diagnostics_host(Host) || Host <- Hosts] ++
+ diagnostics0(),
+ lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags,
+ {F, A} <- [NodeDiag]]).
+
+diagnostics0() ->
+ [{"~ncurrent node details:~n- node name: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- home dir: ~s", [Home]};
+ Other -> {"- no home dir: ~p", [Other]}
+ end,
+ {"- cookie hash: ~s", [cookie_hash()]}].
+
+diagnostics_host(Host) ->
+ case names(Host) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [Host, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- ~s: ~p",
+ [Host, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end.
+
+make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
+make(NodeStr) -> make(parts(NodeStr)).
+
+parts(Node) when is_atom(Node) ->
+ parts(atom_to_list(Node));
+parts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = parts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
+
+cookie_hash() ->
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index de2ba8ad..7b85ab15 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 50444dc4..162d44f1 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
@@ -22,7 +22,6 @@
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
--define(EPMD_TIMEOUT, 30000).
%%----------------------------------------------------------------------------
%% Specs
@@ -244,16 +243,15 @@ duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
duplicate_node_check(NodeStr) ->
- Node = rabbit_misc:makenode(NodeStr),
- {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case names(NodeHost) of
+ Node = rabbit_nodes:make(NodeStr),
+ {NodeName, NodeHost} = rabbit_nodes:parts(Node),
+ case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
true -> io:format("node with name ~p "
"already running on ~p~n",
[NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
terminate(?ERROR_CODE);
false -> ok
end;
@@ -279,15 +277,3 @@ terminate(Status) ->
after infinity -> ok
end
end.
-
-names(Hostname) ->
- Self = self(),
- process_flag(trap_exit, true),
- Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
- timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- Res = receive
- {names, Names} -> Names;
- {'EXIT', Pid, Reason} -> {error, Reason}
- end,
- process_flag(trap_exit, false),
- Res.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 9b45e798..df957d88 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a12b289..6ab2de89 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -496,7 +496,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
- lists:flatten(io_lib:format("~.36B", [Num])).
+ rabbit_misc:format("~.36B", [Num]).
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 5ebe65c9..908a279c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -265,10 +265,9 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
- closed -> if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
+ closed -> case State#v1.connection_state of
+ closed -> State;
+ _ -> throw(connection_closed_abruptly)
end;
{error, Reason} -> throw({inet_error, Reason});
{other, Other} -> handle_other(Other, Deb, State)
@@ -500,23 +499,21 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, AState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid,
- control_throttle(State));
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
+ end.
+
+process_frame(Frame, Channel, State) ->
+ case get({channel, Channel}) of
+ {ChPid, AState} ->
+ NewAState = process_channel_frame(Frame, Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ undefined when ?IS_RUNNING(State) ->
+ ok = create_channel(Channel, State),
+ process_frame(Frame, Channel, State);
+ undefined ->
+ throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state, Frame})
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -527,11 +524,11 @@ post_process_frame({method, MethodName, _}, _ChPid,
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- maybe_block(State);
- false -> State
+ maybe_block(control_throttle(State));
+ false -> control_throttle(State)
end;
post_process_frame(_Frame, _ChPid, State) ->
- State.
+ control_throttle(State).
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
@@ -896,7 +893,7 @@ cert_info(F, Sock) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
@@ -909,10 +906,9 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
- State.
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 9821ae7b..8c0ebcbe 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 1a08efed..237ab78c 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 219833b7..f4bbda0f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 963294d9..e8beecfe 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e524446e..3025d981 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
@@ -72,9 +72,8 @@ peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
validity = {'Validity', Start, End} }}) ->
- lists:flatten(
- io_lib:format("~s - ~s", [format_asn1_value(Start),
- format_asn1_value(End)]))
+ rabbit_misc:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)])
end, Cert).
%%--------------------------------------------------------------------------
@@ -155,8 +154,7 @@ escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
%% purposes it's handy to escape all non-printable chars. All non-ASCII
%% characters get converted to UTF-8 sequences and then escaped. We've
%% already got a UTF-8 sequence here, so just escape it.
- lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
- escape_rdn_value(S, middle);
+ rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
[C | escape_rdn_value(S, middle)].
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 802ea5e2..0965e3b3 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 343e79e5..7a96af26 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -59,7 +59,7 @@ all_tests() ->
passed.
maybe_run_cluster_dependent_tests() ->
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = run_cluster_dependent_tests(SecondaryNode);
@@ -859,7 +859,7 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",
@@ -1786,10 +1786,10 @@ test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
- Ref2 = rabbit_guid:guid(),
+ Ref2 = rabbit_guid:gen(),
{Cap2, MSC2State} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
@@ -1941,7 +1941,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
passed.
test_msg_store_confirm_timer() ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
MSCState = rabbit_msg_store:client_init(
@@ -1970,7 +1970,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
test_msg_store_client_delete_and_terminate() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
%% test the 'dying client' fast path for writes
@@ -1986,7 +1986,7 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
@@ -2020,7 +2020,7 @@ restart_app() ->
rabbit:start().
queue_index_publish(SeqIds, Persistent, Qi) ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgStore = case Persistent of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
@@ -2029,7 +2029,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
MsgId, SeqId, #message_properties{}, Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
@@ -2052,7 +2052,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index abcbe0b6..72c07b51 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 58079ccf..3a5b96de 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ae2b5d3f..732c29b6 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 717d94a8..80f50b38 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index f164035e..9f2535bd 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3e605f04..7f4fdb67 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
@@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, Terms1} =
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:guid(), []};
+ undefined -> {rabbit_guid:gen(), []};
PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
@@ -865,7 +865,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index f6bcbb7f..7545d813 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 38bb76b0..5548ef6d 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f6062e06..dc74b2f5 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 26ea502c..a2f4fae9 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -41,7 +41,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%%
%% %CopyrightBegin%
%%
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 88da74c5..43a6bc99 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index cb3dd02c..d8844441 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index e5db4c9f..fb01c792 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 74297b6d..9ee921b4 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 5feb146f..7f4b5049 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 8973a4f7..fca55f02 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index fcb07a16..c9ecccd6 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index d37c3a0f..ff356366 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index b42530e2..1ddcebb2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).