summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-25 12:41:31 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-25 12:41:31 +0000
commit3a9022f793632b7ab779cc0dedfe1083f6b85cd9 (patch)
tree5f4448cb5e4fc9d160d7c6c7e3642fe6ae711420
parent53f39af6608ee9787c67a2db6e07813e22472b1f (diff)
parentd1562e9de47255303213793205f648c64aa542d1 (diff)
downloadrabbitmq-server-3a9022f793632b7ab779cc0dedfe1083f6b85cd9.tar.gz
Merge in default.
-rw-r--r--LICENSE-MPL-RabbitMQ2
-rwxr-xr-xcheck_xref2
-rw-r--r--codegen.py4
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl4
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--packaging/common/LICENSE.tail4
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf2
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rw-r--r--scripts/rabbitmq-defaults2
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat8
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-xscripts/rabbitmqctl.bat2
-rw-r--r--src/app_utils.erl2
-rw-r--r--src/background_gc.erl2
-rw-r--r--src/credit_flow.erl2
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/dtree.erl2
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/gm.erl2
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/mnesia_sync.erl2
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/pmon.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl61
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl93
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl5
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl5
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_backing_queue_qc.erl13
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl104
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl2
-rw-r--r--src/rabbit_control_main.erl2
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_disk_monitor.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_fanout.erl5
-rw-r--r--src/rabbit_exchange_type_headers.erl5
-rw-r--r--src/rabbit_exchange_type_invalid.erl5
-rw-r--r--src/rabbit_exchange_type_topic.erl5
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl2
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl85
-rw-r--r--src/rabbit_mirror_queue_slave.erl29
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_mnesia.erl58
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl2
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_nodes.erl2
-rw-r--r--src/rabbit_parameter_validation.erl16
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_plugins_main.erl2
-rw-r--r--src/rabbit_policy.erl7
-rw-r--r--src/rabbit_policy_validator.erl2
-rw-r--r--src/rabbit_prelaunch.erl2
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl66
-rw-r--r--src/rabbit_reader.erl163
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_router.erl2
-rw-r--r--src/rabbit_runtime_parameter.erl5
-rw-r--r--src/rabbit_runtime_parameters.erl24
-rw-r--r--src/rabbit_runtime_parameters_test.erl8
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl2
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_table.erl2
-rw-r--r--src/rabbit_tests.erl77
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl65
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl6
-rw-r--r--src/rabbit_vm.erl2
-rw-r--r--src/rabbit_writer.erl2
-rw-r--r--src/supervisor2.erl2
-rw-r--r--src/supervisor2_tests.erl2
-rw-r--r--src/tcp_acceptor.erl2
-rw-r--r--src/tcp_acceptor_sup.erl2
-rw-r--r--src/tcp_listener.erl2
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl2
143 files changed, 643 insertions, 550 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index d50e32ef..4cdf783b 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/check_xref b/check_xref
index ea0102ee..df019311 100755
--- a/check_xref
+++ b/check_xref
@@ -15,7 +15,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
main(["-h"]) ->
diff --git a/codegen.py b/codegen.py
index 5624658b..bf6b70d5 100644
--- a/codegen.py
+++ b/codegen.py
@@ -11,7 +11,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
from __future__ import nested_scopes
@@ -106,7 +106,7 @@ def printFileHeader():
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%"""
def genErl(spec):
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 16dfd196..ad961a44 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -10,7 +10,7 @@
rabbit_sup,
rabbit_tcp_client_sup,
rabbit_direct_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
@@ -27,7 +27,7 @@
{frame_max, 131072},
{heartbeat, 600},
{msg_store_file_size_limit, 16777216},
- {queue_index_max_journal_entries, 262144},
+ {queue_index_max_journal_entries, 65536},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index a317e63b..b3dd6615 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 78763045..eeee799e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -86,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(ERTS_MINIMUM, "5.6.3").
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index f7c10bd8..8665dc55 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail
index b9c2629b..431ddeb0 100644
--- a/packaging/common/LICENSE.tail
+++ b/packaging/common/LICENSE.tail
@@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
MOZILLA PUBLIC LICENSE
@@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index e832aed6..b9c6ffbf 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Escape spaces and quotes, because shell is revolting.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 14557286..ba9579b6 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
##
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 1e4bf755..c197915d 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -28,7 +28,7 @@ package: clean
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
- echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2013, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index f5257040..b351430e 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2013 VMware, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults
index 4763f086..db1d4f2b 100644
--- a/scripts/rabbitmq-defaults
+++ b/scripts/rabbitmq-defaults
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2012-2013 VMware, Inc. All rights reserved.
##
### next line potentially updated in package install steps
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 23224943..3721f6c7 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Determine where this script is really located (if this script is
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 97c74791..43f450c0 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 341f871a..4b4dbe47 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
@@ -23,8 +23,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index e1686627..184ae931 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 3aea4c07..9fa304e6 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 4758c861..9c30e74e 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index a5fade72..00fffa9f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index d8b1eaf1..a6d85552 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index fdf6ed41..8da436c0 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(app_utils).
diff --git a/src/background_gc.erl b/src/background_gc.erl
index 3dbce330..d684d6ea 100644
--- a/src/background_gc.erl
+++ b/src/background_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(background_gc).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 102c353f..106179fd 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(credit_flow).
diff --git a/src/delegate.erl b/src/delegate.erl
index 96b8ba31..e833b819 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 2a8b915b..30400b3e 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/dtree.erl b/src/dtree.erl
index ca2d30cf..45eea506 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% A dual-index tree.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3260d369..d2d4d295 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 29d2d713..0c257a84 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 20de79c2..c82327a2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2012 VMware, Inc.
+%% All modifications are (C) 2009-2013 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
diff --git a/src/gm.erl b/src/gm.erl
index 2057b1f5..76b535e6 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm).
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5fbfc223..1034ee2f 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index 84d4ab2f..3fe3b182 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index a9c0ba90..efb87a4c 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index c4e046b5..e2ab2380 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 24c3ebd0..33d09f7f 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index f8cbd853..ea6b82c8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
index a3773d90..41a349be 100644
--- a/src/mnesia_sync.erl
+++ b/src/mnesia_sync.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(mnesia_sync).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index e2e82f1f..7377fbf0 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2012 VMware, Inc.
+%% All modifications are (C) 2010-2013 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/pmon.erl b/src/pmon.erl
index 37898119..ed32b8b2 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(pmon).
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 780fa2e9..02a0a1df 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0e6c970f..f3ba022a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -355,6 +355,8 @@ handle_app_error(App, Reason) ->
throw({could_not_start, App, Reason}).
start_it(StartFun) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_boot, Marker),
try
StartFun()
catch
@@ -363,11 +365,17 @@ start_it(StartFun) ->
_:Reason ->
boot_error(Reason, erlang:get_stacktrace())
after
+ unlink(Marker),
+ Marker ! stop,
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
+ case whereis(rabbit_boot) of
+ undefined -> ok;
+ _ -> await_startup()
+ end,
rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
@@ -435,8 +443,9 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n",
- [Vsn, erlang:system_info(otp_release)]),
+ error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
@@ -699,10 +708,12 @@ force_event_refresh() ->
log_broker_started(Plugins) ->
rabbit_misc:with_local_io(
fun() ->
+ PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
+ || P <- Plugins]),
error_logger:info_msg(
- "Server startup complete; plugins are: ~p~n", [Plugins]),
- io:format("~n Broker running with ~p plugins.~n",
- [length(Plugins)])
+ "Server startup complete; ~b plugins started.~n~s",
+ [length(Plugins), PluginList]),
+ io:format(" completed with ~p plugins.~n", [length(Plugins)])
end).
erts_version_check() ->
@@ -716,40 +727,38 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~n## ## ~s ~s. ~s~n## ## ~s~n########## ~n",
- [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- io:format("###### ## Logs: ~s~n########## ~s~n",
- [log_location(kernel), log_location(sasl)]).
+ io:format("~n ~s ~s. ~s"
+ "~n ## ## ~s"
+ "~n ## ##"
+ "~n ########## Logs: ~s"
+ "~n ###### ## ~s"
+ "~n ##########"
+ "~n Starting broker...",
+ [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
+ log_location(kernel), log_location(sasl)]).
log_banner() ->
- {ok, Product} = application:get_key(id),
- {ok, Version} = application:get_key(vsn),
Settings = [{"node", node()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
{"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(otp_release)}],
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = fun (K, V) ->
rabbit_misc:format(
"~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V])
end,
Banner = iolist_to_binary(
- rabbit_misc:format(
- "~s ~s~n~s~n~s~n",
- [Product, Version, ?COPYRIGHT_MESSAGE,
- ?INFORMATION_MESSAGE]) ++
- [case S of
- {"config file(s)" = K, []} ->
- Format(K, "(none)");
- {"config file(s)" = K, [V0 | Vs]} ->
- Format(K, V0), [Format("", V) || V <- Vs];
- {K, V} ->
- Format(K, V)
- end || S <- Settings]),
+ [case S of
+ {"config file(s)" = K, []} ->
+ Format(K, "(none)");
+ {"config file(s)" = K, [V0 | Vs]} ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ {K, V} ->
+ Format(K, V)
+ end || S <- Settings]),
error_logger:info_msg("~s", [Banner]).
home_dir() ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 75c53511..16387268 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index f813eab8..362b11aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 21b6bb92..ae7fe5c5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5,
- delete_immediately/1, delete/3, purge/1]).
+ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -135,6 +135,7 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
+-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -591,6 +592,24 @@ internal_delete(QueueName) ->
end
end).
+forget_all_durable(Node) ->
+ %% Note rabbit is not running so we avoid e.g. the worker pool. Also why
+ %% we don't invoke the return from rabbit_binding:process_deletions/1.
+ {atomic, ok} =
+ mnesia:sync_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_'}, write),
+ [rabbit_binding:process_deletions(
+ internal_delete1(Name)) ||
+ #amqqueue{name = Name, pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node,
+ rabbit_policy:get(<<"ha-mode">>, Q)
+ =:= {error, not_found}],
+ ok
+ end),
+ ok.
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4517aade..1d332f67 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -94,7 +94,6 @@
messages_unacknowledged,
messages,
consumers,
- active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
@@ -350,9 +349,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -474,9 +474,7 @@ deliver_msg_to_consumer(DeliverFun,
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State1),
- {Result, BQ:is_empty(BQS), State2}.
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -524,12 +522,10 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ is_empty(State), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -563,20 +559,32 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3 = State2#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State3;
+ {true, undefined} -> State3;
+ {true, _} -> drop_expired_msgs(State3)
+ end
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -667,13 +675,8 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() -> consumer_count(fun (_) -> false end).
-
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
-consumer_count(Exclude) ->
- lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
- not Exclude(C)]).
+consumer_count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
is_unused(_State) -> consumer_count() == 0.
@@ -720,9 +723,14 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{backing_queue_state = BQS,
- backing_queue = BQ }) ->
- Now = now_micros(),
+drop_expired_msgs(State) ->
+ case is_empty(State) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
with_dlx(
@@ -731,8 +739,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS,
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
{Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
- undefined -> undefined;
- #message_properties{expiry = Exp} -> Exp
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
end, State1).
with_dlx(undefined, _With, Without) -> Without();
@@ -924,8 +932,6 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1069,7 +1075,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1093,7 +1099,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1102,18 +1108,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
@@ -1142,8 +1140,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ ensure_expiry_timer(State),
+ reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1221,8 +1219,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index a4305e5f..d7257a69 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index c9475efd..72f81707 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 919be3f3..2dc1cad3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index c7d74dc3..99e4468e 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index c0d86cd1..847a38f5 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
@@ -33,8 +33,7 @@
%% referring generically to "SASL security mechanism", i.e. the above.
description() ->
- [{name, <<"AMQPLAIN">>},
- {description, <<"QPid AMQPLAIN mechanism">>}].
+ [{description, <<"QPid AMQPLAIN mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index 5df1d5d7..4b08e4be 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
@@ -37,8 +37,7 @@
%% SECURE-OK: "My password is ~s", [Password]
description() ->
- [{name, <<"RABBIT-CR-DEMO">>},
- {description, <<"RabbitMQ Demo challenge-response authentication "
+ [{description, <<"RabbitMQ Demo challenge-response authentication "
"mechanism">>}].
should_offer(_Sock) ->
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 423170e1..a35a133a 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
@@ -36,8 +36,7 @@
%% matching and will thus be much faster.
description() ->
- [{name, <<"PLAIN">>},
- {description, <<"SASL PLAIN authentication mechanism">>}].
+ [{description, <<"SASL PLAIN authentication mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 4245f7e2..2f247448 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,15 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 5b3b8aa8..052db3a5 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
@@ -334,7 +334,7 @@ postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
{_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
{stop, Acc};
({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
- FoldFun(Msg, MsgProps, Acc)
+ FoldFun(Msg, MsgProps, false, Acc)
end, {cont, Acc0}, gb_trees:to_list(Messages)),
true = Model =:= Res;
@@ -397,10 +397,11 @@ rand_choice(List, Selection, N) ->
N - 1).
makefoldfun(Size) ->
- fun (Msg, _MsgProps, Acc) ->
- case length(Acc) > Size of
- false -> {cont, [Msg | Acc]};
- true -> {stop, Acc}
+ fun (Msg, _MsgProps, Unacked, Acc) ->
+ case {length(Acc) > Size, Unacked} of
+ {false, false} -> {cont, [Msg | Acc]};
+ {false, true} -> {cont, Acc};
+ {true, _} -> {stop, Acc}
end
end.
foldacc() -> [].
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9bd1fad9..c42289c7 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index a333c1ce..05040485 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 53878d6a..9407dd2e 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 2d486651..6096e07b 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1c63e96b..7b185568 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -262,7 +262,7 @@ handle_cast({method, Method, Content, Flow},
end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ ok = send(Reply, NewState),
noreply(NewState);
{noreply, NewState} ->
noreply(NewState);
@@ -288,15 +288,16 @@ handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
-handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
- State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(consumer_monitor(ConsumerTag, State));
+handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) ->
+ ok = send(Msg, State),
+ noreply(consumer_monitor(CTag, State));
-handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
+handle_cast({command, Msg}, State) ->
+ ok = send(Msg, State),
noreply(State);
+handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+ noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -397,6 +398,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+send(_Command, #ch{state = closing}) ->
+ ok;
+send(Command, #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Command).
+
handle_exception(Reason, State = #ch{protocol = Protocol,
channel = Channel,
writer_pid = WriterPid,
@@ -540,16 +546,17 @@ check_name(_Kind, NameBin) ->
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case sets:is_element(QPid, Blocking) of
false -> State;
- true -> Blocking1 = sets:del_element(QPid, Blocking),
- ok = case sets:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ true -> maybe_send_flow_ok(
+ State#ch{blocking = sets:del_element(QPid, Blocking)})
end.
+maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
+ case sets:size(Blocking) of
+ 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
+ _ -> ok
+ end,
+ State.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -574,14 +581,25 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
stop;
-handle_method(#'channel.close'{}, _, State = #ch{state = closing}) ->
- {reply, #'channel.close_ok'{}, State};
+handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
+ state = closing}) ->
+ ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ {noreply, State};
handle_method(_Method, _, State = #ch{state = closing}) ->
{noreply, State};
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
@@ -821,12 +839,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
- {noreply, State2 = #ch{writer_pid = WriterPid}} =
- handle_method(#'basic.recover_async'{requeue = Requeue},
- Content,
- State),
- ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
- {noreply, State2};
+ {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content, State),
+ {reply, #'basic.recover_ok'{}, State1};
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
@@ -1081,12 +1096,9 @@ handle_method(#'channel.flow'{active = false}, _,
end,
State1 = State#ch{limiter = Limiter1},
ok = rabbit_limiter:block(Limiter1),
- case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
- end;
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1136,17 +1148,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- writer_pid = WriterPid}) ->
+ queue_consumers = QCons}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
ConsumerMapping1 =
gb_sets:fold(fun (CTag, CMap) ->
- Cancel = #'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ State),
dict:erase(CTag, CMap)
end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
@@ -1408,12 +1419,17 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
send_nacks([], State) ->
State;
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
+ State;
send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
+send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).
@@ -1432,9 +1448,10 @@ send_confirms(State) ->
send_confirms([], State) ->
State;
-send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = MsgSeqNo}),
+send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], State) ->
+ ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
State;
send_confirms(Cs, State) ->
coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
@@ -1442,8 +1459,7 @@ send_confirms(Cs, State) ->
multiple = Multiple}
end, State).
-coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
@@ -1452,11 +1468,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
- _ -> ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(lists:last(Ms), true))
+ _ -> ok = send(MkMsgFun(lists:last(Ms), true), State)
end,
- [ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
+ [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
State.
ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
@@ -1473,7 +1487,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
end.
complete_tx(State = #ch{tx = committing}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ ok = send(#'tx.commit_ok'{}, State),
State#ch{tx = new_tx()};
complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 42459833..8ea44a81 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 995c41fb..16fd08be 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index c508f1b9..9602c512 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index adf6e417..a88bec3d 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index d9a4735c..31bc51b8 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index fc9c41a4..f5e70365 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_control_main).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 689e5d83..53144f3f 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 6330d555..b396b289 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_disk_monitor).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index a9af2d8a..1360c82a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 042ab23c..3efc9c0c 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 38d8cd54..a91a9916 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9339161f..88033f77 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 08819427..befbc462 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_decorator).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index c5583ffd..1fbcb2d8 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 9a5665c0..213b24c4 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
@@ -31,8 +31,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"direct">>},
- {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index d9a2f60f..5b17ed56 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
@@ -31,8 +31,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"fanout">>},
- {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 516b78e5..75899160 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
@@ -37,8 +37,7 @@
-endif.
description() ->
- [{name, <<"headers">>},
- {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index c5d781c2..6b07351a 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_invalid).
@@ -24,8 +24,7 @@
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
- [{name, <<"invalid">>},
- {description,
+ [{description,
<<"Dummy exchange type, to be used when the intended one is not found.">>
}].
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 644d9acf..bd8ad1ac 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
@@ -34,8 +34,7 @@
%%----------------------------------------------------------------------------
description() ->
- [{name, <<"topic">>},
- {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 26f74796..3ceb4989 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index a79188ab..93305483 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 8ee9ad5b..6c45deea 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 05aad8c9..e878f3bb 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b15498e..8a7d14fe 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8dfa89d3..74cdeb23 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index f22ad874..117ff95a 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index e1a21cf7..625e2f07 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c704804e..bcd4861a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 58f20476..4fb1fc3b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -32,6 +32,8 @@
[policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
{ok, SPid} when is_pid(SPid) ->
+ maybe_auto_sync(Q),
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
@@ -235,13 +238,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +252,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +299,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -302,6 +316,14 @@ is_mirrored(Q) ->
_ -> false
end.
+maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+ case policy(<<"ha-sync-mode">>, Q) of
+ <<"automatic">> ->
+ spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
+ _ ->
+ ok
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
- OldNodes = All(actual_queue_nodes(OldQ)),
- NewNodes = All(suggested_queue_nodes(NewQ)),
- add_mirrors(QName, NewNodes -- OldNodes),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ OldNodes = [OldMNode | OldSNodes],
+ NewNodes = [NewMNode | NewSNodes],
+ add_mirrors (QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
+ maybe_auto_sync(NewQ),
ok.
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)).
+ case validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)) of
+ ok -> case proplists:get_value(
+ <<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end;
+ E -> E
+ end.
validate_policy(<<"all">>, none) ->
ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 27b0326d..b435e0f3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
+%% [1] - the arrival of this newly synced slave may cause the master to die if
+%% the admin has requested a migration-type change to policy.
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q = #amqqueue { sync_slave_pids = SSPids }] ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
- ok
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q1, Q2}
+ end
+ end) of
+ ok -> ok;
+ {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1]
+ end.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index a2034876..be3924f0 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 73a4c922..c36fb147 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d5efffa5..c39e898c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mnesia).
@@ -41,10 +41,7 @@
]).
%% Used internally in rpc calls
--export([node_info/0,
- remove_node_if_mnesia_running/1,
- is_running_remote/0
- ]).
+-export([node_info/0, remove_node_if_mnesia_running/1]).
-include("rabbit.hrl").
@@ -278,16 +275,16 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
true -> ok;
false -> e(not_a_cluster_node)
end,
- case {RemoveWhenOffline, mnesia:system_info(is_running)} of
- {true, no} -> remove_node_offline_node(Node);
- {true, yes} -> e(online_node_offline_flag);
- {false, no} -> e(offline_node_no_offline_flag);
- {false, yes} -> rabbit_misc:local_info_msg(
- "Removing node ~p from cluster~n", [Node]),
- case remove_node_if_mnesia_running(Node) of
- ok -> ok;
- {error, _} = Err -> throw(Err)
- end
+ case {RemoveWhenOffline, is_running()} of
+ {true, false} -> remove_node_offline_node(Node);
+ {true, true} -> e(online_node_offline_flag);
+ {false, false} -> e(offline_node_no_offline_flag);
+ {false, true} -> rabbit_misc:local_info_msg(
+ "Removing node ~p from cluster~n", [Node]),
+ case remove_node_if_mnesia_running(Node) of
+ ok -> ok;
+ {error, _} = Err -> throw(Err)
+ end
end.
remove_node_offline_node(Node) ->
@@ -334,11 +331,11 @@ status() ->
end,
[{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
IfNonEmpty(ram, cluster_nodes(ram)))}] ++
- case mnesia:system_info(is_running) of
- yes -> RunningNodes = cluster_nodes(running),
- [{running_nodes, cluster_nodes(running)},
- {partitions, mnesia_partitions(RunningNodes)}];
- no -> []
+ case is_running() of
+ true -> RunningNodes = cluster_nodes(running),
+ [{running_nodes, RunningNodes},
+ {partitions, mnesia_partitions(RunningNodes)}];
+ false -> []
end.
mnesia_partitions(Nodes) ->
@@ -346,6 +343,8 @@ mnesia_partitions(Nodes) ->
Nodes, rabbit_node_monitor, partitions, []),
[Reply || Reply = {_, R} <- Replies, R =/= []].
+is_running() -> mnesia:system_info(is_running) =:= yes.
+
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
@@ -355,10 +354,10 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% the data from mnesia. Obviously it'll work only when mnesia is
%% running.
cluster_status_from_mnesia() ->
- case mnesia:system_info(is_running) of
- no ->
+ case is_running() of
+ false ->
{error, mnesia_not_running};
- yes ->
+ true ->
%% If the tables are not present, it means that
%% `init_db/3' hasn't been run yet. In other words, either
%% we are a virgin node or a restarted RAM node. In both
@@ -669,20 +668,21 @@ move_db() ->
ok.
remove_node_if_mnesia_running(Node) ->
- case mnesia:system_info(is_running) of
- yes ->
+ case is_running() of
+ false ->
+ {error, mnesia_not_running};
+ true ->
%% Deleting the the schema copy of the node will result in
%% the node being removed from the cluster, with that
%% change being propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
+ rabbit_amqqueue:forget_all_durable(Node),
rabbit_node_monitor:notify_left_cluster(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
- end;
- no ->
- {error, mnesia_not_running}
+ end
end.
leave_cluster() ->
@@ -732,8 +732,6 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}.
-
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index f685b109..81111061 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 485a3256..13b40a48 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 3defeaaf..bbc7db68 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 3b61ed0b..3881de23 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index 6cc0b2a7..f0096446 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 562fc197..b8b03f56 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 080e0987..4b6c7538 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 258ac0ce..71c2c80a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index c8d77b0f..c92e5963 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_nodes).
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index 24762a73..a4bd5042 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -11,12 +11,12 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_parameter_validation).
--export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -73,3 +73,15 @@ proplist(Name, Constraints, Term) when is_list(Term) ->
proplist(Name, _Constraints, Term) ->
{error, "~s not a list ~p", [Name, Term]}.
+
+enum(OptionsA) ->
+ Options = [list_to_binary(atom_to_list(O)) || O <- OptionsA],
+ fun (Name, Term) when is_binary(Term) ->
+ case lists:member(Term, Options) of
+ true -> ok;
+ false -> {error, "~s should be one of ~p, actually was ~p",
+ [Name, Options, Term]}
+ end;
+ (Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index d2f36590..58c906eb 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 2158d1da..308b80cd 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins_main).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index fa13c5dd..7398cd2d 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_policy).
@@ -26,7 +26,7 @@
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
@@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(_VHost, <<"policy">>, _Name) ->
- ok.
-
notify(VHost, <<"policy">>, _Name, _Term) ->
update_policies(VHost).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
index b59dec2b..75b88c39 100644
--- a/src/rabbit_policy_validator.erl
+++ b/src/rabbit_policy_validator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_policy_validator).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 404afe3c..3ce516d0 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6dad01cc..521cd78b 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 21f58154..ea70208f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -162,7 +162,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_msg_ids }).
+ max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -190,7 +190,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: gb_set()
+ unconfirmed :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false').
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -269,13 +269,16 @@ delete_and_terminate(State) ->
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ State = #qistate { unconfirmed = Unconfirmed })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- State #qistate {
- unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
+ case MsgProps#message_properties.needs_confirming of
+ true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
+ State #qistate { unconfirmed = Unconfirmed1 };
+ false -> State
+ end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -398,7 +407,7 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = gb_sets:new() }.
+ unconfirmed = gb_sets:new() }.
clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action,
end};
add_to_journal(RelSeq, Action, JEntries) ->
- Val = case array:get(RelSeq, JEntries) of
- undefined ->
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end;
- ({Pub, no_del, no_ack}) when Action == del ->
- {Pub, del, no_ack};
- ({Pub, Del, no_ack}) when Action == ack ->
- {Pub, Del, ack}
- end,
- array:set(RelSeq, Val, JEntries).
+ case array:get(RelSeq, JEntries) of
+ undefined ->
+ array:set(RelSeq,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end, JEntries);
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ ({no_pub, del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ ({?PUB, del, no_ack}) when Action == ack ->
+ array:reset(RelSeq, JEntries)
+ end.
maybe_flush_journal(State = #qistate { dirty_count = DCount,
max_journal_entries = MaxJournal })
@@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_msg_ids = gb_sets:new() }.
+notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State #qistate { unconfirmed = gb_sets:new() }
+ end.
%%----------------------------------------------------------------------------
%% segment manipulation
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b98818a6..ab952cd8 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -295,28 +295,37 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
- {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
- closed -> maybe_emit_stats(State),
- case State#v1.connection_state of
- closed -> State;
- _ -> throw(connection_closed_abruptly)
- end;
- {error, Reason} -> maybe_emit_stats(State),
- throw({inet_error, Reason});
- {other, Other} -> handle_other(Other, Deb, State)
+ {data, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ ok;
+ closed ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+ {error, Reason} ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason});
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb,
+handle_other({conserve_resources, Conserve},
State = #v1{throttle = Throttle}) ->
Throttle1 = Throttle#throttle{conserve_resources = Conserve},
- recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+ control_throttle(State#v1{throttle = Throttle1});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -329,59 +338,58 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% quickly.
maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
maybe_emit_stats(State),
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
- State;
-handle_other(handshake_timeout, Deb, State)
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
+ stop;
+handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, State = #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
maybe_emit_stats(State),
throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(ensure_stats, Deb, State) ->
- mainloop(Deb, ensure_stats_timer(State));
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, State) ->
+ control_throttle(State);
+handle_other(Other, State) ->
%% internal error -> something worth dying for
maybe_emit_stats(State),
exit({unexpected_message, Other}).
@@ -443,13 +451,13 @@ close_connection(State = #v1{queue_collector = Collector,
handle_dependent_exit(ChPid, Reason, State) ->
case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, uncontrolled} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- maybe_close(control_throttle(State));
- {Channel, uncontrolled} ->
- maybe_close(handle_exception(control_throttle(State),
- Channel, Reason))
+ {undefined, controlled} -> State;
+ {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_Channel, controlled} -> maybe_close(control_throttle(State));
+ {Channel, uncontrolled} -> State1 = handle_exception(
+ State, Channel, Reason),
+ maybe_close(control_throttle(State1))
end.
terminate_channels() ->
@@ -642,7 +650,10 @@ process_frame(Frame, Channel, State) ->
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -705,13 +716,8 @@ handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
%% ... and finally, the 1.0 spec is crystal clear! Note that the
-%% TLS uses a different protocol number, and would go here.
-handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) ->
- become_1_0(amqp, {0, 1, 0, 0}, State);
-
-%% 3 stands for "SASL"
-handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) ->
- become_1_0(sasl, {3, 1, 0, 0}, State);
+handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+ become_1_0(Id, State);
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_version, {A, B, C, D}});
@@ -742,10 +748,13 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
connection_state = starting},
frame_header, 7).
-refuse_connection(Sock, Exception) ->
- ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
throw(Exception).
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
ensure_stats_timer(State = #v1{connection_state = running}) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
@@ -1017,15 +1026,19 @@ emit_stats(State) ->
%% 1.0 stub
-ifdef(use_specs).
--spec(become_1_0/3 :: ('amqp' | 'sasl',
- {non_neg_integer(), non_neg_integer(),
- non_neg_integer(), non_neg_integer()},
- #v1{}) -> no_return()).
+-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()).
-endif.
-become_1_0(Mode, Version, State = #v1{sock = Sock}) ->
+become_1_0(Id, State = #v1{sock = Sock}) ->
case code:is_loaded(rabbit_amqp1_0_reader) of
- false -> refuse_connection(Sock, {bad_version, Version});
- _ -> throw({become, {rabbit_amqp1_0_reader, become,
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ throw({become, {rabbit_amqp1_0_reader, init,
[Mode, pack_for_1_0(State)]}})
end.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 32709d24..60419856 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 237ab78c..4c4ab2cf 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index f4bbda0f..2eaef9a7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index 18668049..6b62c974 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameter).
@@ -23,8 +23,6 @@
-callback validate(rabbit_types:vhost(), binary(), binary(),
term()) -> validate_results().
--callback validate_clear(rabbit_types:vhost(), binary(),
- binary()) -> validate_results().
-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
@@ -35,7 +33,6 @@
behaviour_info(callbacks) ->
[
{validate, 4},
- {validate_clear, 3},
{notify, 4},
{notify_clear, 3}
];
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 49060409..b1100b65 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameters).
@@ -120,21 +120,13 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case clear_any0(VHost, Component, Name) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
-
-clear_any0(VHost, Component, Name) ->
- case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Name)) of
- ok -> mnesia_clear(VHost, Component, Name),
- Mod:notify_clear(VHost, Component, Name),
- ok;
- E -> E
- end;
- E -> E
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case lookup_component(Component) of
+ {ok, Mod} -> Mod:notify_clear(VHost, Component, Name);
+ _ -> ok
+ end
end.
mnesia_clear(VHost, Component, Name) ->
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index d4d7271e..05c1dbc1 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -11,14 +11,14 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok;
validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(_, <<"test">>, <<"good">>) -> ok;
-validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
-validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index e8beecfe..566db9a9 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 22ff555f..b1238623 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index f142d233..6a6b2feb 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index fa1c5bbd..d1c0bb1e 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_table).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b0ff5af9..27807b62 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_runtime_parameters(),
passed = test_policy_validation(),
passed = test_server_status(),
+ passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed =
do_if_secondary_node(
@@ -911,10 +912,10 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, {OldM, OldSs}, All),
+ Policy, Params, CurrentState, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;
@@ -922,28 +923,36 @@ test_dynamic_mirroring() ->
end
end,
- Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
%% Add a node
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed
- Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
passed.
@@ -1061,7 +1070,11 @@ test_runtime_parameters() ->
ok = control_action(clear_parameter, ["test", "maybe"]),
{error_string, _} =
control_action(clear_parameter, ["test", "neverexisted"]),
+
+ %% We can delete for a component that no longer exists
+ Good(["test", "good", "\"ignore\""]),
rabbit_runtime_parameters_test:unregister(),
+ ok = control_action(clear_parameter, ["test", "good"]),
passed.
test_policy_validation() ->
@@ -1119,10 +1132,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
- [#listener{host = H, port = P} | _] =
- [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
- N =:= node()],
-
+ {H, P} = find_listener(),
{ok, C} = gen_tcp:connect(H, P, []),
gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>),
timer:sleep(100),
@@ -1161,6 +1171,25 @@ test_server_status() ->
passed.
+test_amqp_connection_refusal() ->
+ [passed = test_amqp_connection_refusal(V) ||
+ V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]],
+ passed.
+
+test_amqp_connection_refusal(Header) ->
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]),
+ ok = gen_tcp:send(C, Header),
+ {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100),
+ ok = gen_tcp:close(C),
+ passed.
+
+find_listener() ->
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+ {H, P}.
+
test_writer() -> test_writer(none).
test_writer(Pid) ->
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index 72c07b51..c52394c7 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 601656da..432055d4 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 5bc3d9f5..c6007061 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 455134da..fde0dbe1 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 21fdcd66..457b1567 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 34a4b52f..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -772,24 +768,18 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case TargetRamCount of
- infinity -> false;
- _ -> case
- reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -830,7 +820,8 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
-invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
+invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
@@ -882,8 +873,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
Msg = #basic_message {id = MsgId}, MsgProps) ->
@@ -1010,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1036,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1337,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 1cc7d6c8..f81a4021 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 58e77efb..2858cf58 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
@@ -97,9 +97,9 @@ internal_delete(VHostPath) ->
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
[ok = rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info),
- proplists:get_value(key, Info))
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index db674f91..b3e9ec66 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_vm).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 059d3839..2d15e6a2 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 5af38573..c98b528d 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -51,7 +51,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2012 VMware, Inc.
+%% All modifications are (C) 2010-2013 VMware, Inc.
%%
%% %CopyrightBegin%
%%
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
index e42ded7b..f19a53e6 100644
--- a/src/supervisor2_tests.erl
+++ b/src/supervisor2_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(supervisor2_tests).
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 344196d7..0248f878 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index d8844441..61c747c9 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index fb01c792..90e84f94 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 9ee921b4..7f850dbc 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 7f4b5049..3342adb5 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 5ce894a9..f70156b6 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index c9ecccd6..3bdeb377 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index ff356366..b9835f1e 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 1ddcebb2..56e4b7b3 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).