summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-02-21 13:44:59 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-02-21 13:44:59 +0000
commit97dada17a2770d5af602755a38bd6fbb812b8934 (patch)
tree8f562d881b598b74b9b55d71fbbd1ab6c7722489
parentecd7fb6e097f72060fcae8c49f5903fd8e569676 (diff)
parentd1562e9de47255303213793205f648c64aa542d1 (diff)
downloadrabbitmq-server-97dada17a2770d5af602755a38bd6fbb812b8934.tar.gz
Merged default into bug19375
-rw-r--r--LICENSE-MPL-RabbitMQ2
-rw-r--r--Makefile2
-rwxr-xr-xcheck_xref16
-rw-r--r--codegen.py4
-rw-r--r--docs/rabbitmqctl.1.xml3
-rw-r--r--ebin/rabbit_app.in4
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl5
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--packaging/common/LICENSE.tail4
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf2
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rw-r--r--scripts/rabbitmq-defaults2
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat8
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-xscripts/rabbitmqctl.bat2
-rw-r--r--src/app_utils.erl2
-rw-r--r--src/background_gc.erl2
-rw-r--r--src/credit_flow.erl2
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/dtree.erl2
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/gm.erl2
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/mnesia_sync.erl2
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/pmon.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl150
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_alarm.erl19
-rw-r--r--src/rabbit_amqqueue.erl26
-rw-r--r--src/rabbit_amqqueue_process.erl193
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl5
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl5
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_backing_queue.erl16
-rw-r--r--src/rabbit_backing_queue_qc.erl13
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl113
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl9
-rw-r--r--src/rabbit_control_main.erl2
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_disk_monitor.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl14
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl5
-rw-r--r--src/rabbit_exchange_type_fanout.erl5
-rw-r--r--src/rabbit_exchange_type_headers.erl5
-rw-r--r--src/rabbit_exchange_type_invalid.erl9
-rw-r--r--src/rabbit_exchange_type_topic.erl5
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl2
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl85
-rw-r--r--src/rabbit_mirror_queue_slave.erl82
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl57
-rw-r--r--src/rabbit_misc.erl21
-rw-r--r--src/rabbit_mnesia.erl73
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl15
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl2
-rw-r--r--src/rabbit_networking.erl22
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_nodes.erl2
-rw-r--r--src/rabbit_parameter_validation.erl16
-rw-r--r--src/rabbit_plugins.erl11
-rw-r--r--src/rabbit_plugins_main.erl2
-rw-r--r--src/rabbit_policy.erl20
-rw-r--r--src/rabbit_policy_validator.erl2
-rw-r--r--src/rabbit_prelaunch.erl2
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl66
-rw-r--r--src/rabbit_reader.erl233
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_router.erl2
-rw-r--r--src/rabbit_runtime_parameter.erl5
-rw-r--r--src/rabbit_runtime_parameters.erl24
-rw-r--r--src/rabbit_runtime_parameters_test.erl8
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl2
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_table.erl2
-rw-r--r--src/rabbit_tests.erl230
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl312
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl6
-rw-r--r--src/rabbit_vm.erl2
-rw-r--r--src/rabbit_writer.erl26
-rw-r--r--src/supervisor2.erl2
-rw-r--r--src/supervisor2_tests.erl2
-rw-r--r--src/tcp_acceptor.erl2
-rw-r--r--src/tcp_acceptor_sup.erl2
-rw-r--r--src/tcp_listener.erl2
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl2
146 files changed, 1208 insertions, 957 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index d50e32ef..4cdf783b 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/Makefile b/Makefile
index c63e3dfd..bf33b931 100644
--- a/Makefile
+++ b/Makefile
@@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
else \
dialyzer --output_plt $@ --build_plt \
--apps erts kernel stdlib compiler sasl os_mon mnesia tools \
- public_key crypto ssl; \
+ public_key crypto ssl xmerl; \
fi
clean:
diff --git a/check_xref b/check_xref
index 8f65f3b1..df019311 100755
--- a/check_xref
+++ b/check_xref
@@ -15,7 +15,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
main(["-h"]) ->
@@ -50,6 +50,7 @@ shutdown(Rc, LibDir) ->
check(Cwd, PluginsDir, LibDir, Checks) ->
{ok, Plugins} = file:list_dir(PluginsDir),
ok = file:make_dir(LibDir),
+ put({?MODULE, third_party}, []),
[begin
Source = filename:join(PluginsDir, Plugin),
Target = filename:join(LibDir, Plugin),
@@ -162,7 +163,8 @@ filters() ->
filter_chain(FnChain) ->
fun(AnalysisResult) ->
- lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ Result = cleanup(AnalysisResult),
+ lists:foldl(fun(F, false) -> F(Result);
(_F, true) -> true
end, false, FnChain)
end.
@@ -267,14 +269,8 @@ source_file(M) ->
store_third_party(App) ->
{ok, AppConfig} = application:get_all_key(App),
- case get({?MODULE, third_party}) of
- undefined ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig));
- Modules ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig) ++ Modules)
- end.
+ AppModules = proplists:get_value(modules, AppConfig),
+ put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})).
%% TODO: this ought not to be maintained in such a fashion
external_dependency(Path) ->
diff --git a/codegen.py b/codegen.py
index 5624658b..bf6b70d5 100644
--- a/codegen.py
+++ b/codegen.py
@@ -11,7 +11,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
from __future__ import nested_scopes
@@ -106,7 +106,7 @@ def printFileHeader():
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%"""
def genErl(spec):
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c7069aed..bbd2fe5b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -465,8 +465,7 @@
synchronise itself. The queue will block while
synchronisation takes place (all publishers to and
consumers from the queue will block). The queue must be
- mirrored, and must not have any pending unacknowledged
- messages for this command to succeed.
+ mirrored for this command to succeed.
</para>
<para>
Note that unsynchronised queues from which messages are
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 16dfd196..ad961a44 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -10,7 +10,7 @@
rabbit_sup,
rabbit_tcp_client_sup,
rabbit_direct_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
@@ -27,7 +27,7 @@
{frame_max, 131072},
{heartbeat, 600},
{msg_store_file_size_limit, 16777216},
- {queue_index_max_journal_entries, 262144},
+ {queue_index_max_journal_entries, 65536},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index a317e63b..b3dd6615 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 7385b4b3..eeee799e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -86,9 +86,8 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
--define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
%% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index f7c10bd8..8665dc55 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail
index b9c2629b..431ddeb0 100644
--- a/packaging/common/LICENSE.tail
+++ b/packaging/common/LICENSE.tail
@@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
MOZILLA PUBLIC LICENSE
@@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2013 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index e832aed6..b9c6ffbf 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Escape spaces and quotes, because shell is revolting.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 14557286..ba9579b6 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
##
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 1e4bf755..c197915d 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -28,7 +28,7 @@ package: clean
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
- echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2013, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index f5257040..b351430e 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2013 VMware, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults
index 4763f086..db1d4f2b 100644
--- a/scripts/rabbitmq-defaults
+++ b/scripts/rabbitmq-defaults
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2012-2013 VMware, Inc. All rights reserved.
##
### next line potentially updated in package install steps
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 23224943..3721f6c7 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Determine where this script is really located (if this script is
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 97c74791..43f450c0 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 341f871a..4b4dbe47 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
@@ -23,8 +23,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index e1686627..184ae931 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 3aea4c07..9fa304e6 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 4758c861..9c30e74e 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index a5fade72..00fffa9f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index d8b1eaf1..a6d85552 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index fdf6ed41..8da436c0 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(app_utils).
diff --git a/src/background_gc.erl b/src/background_gc.erl
index 3dbce330..d684d6ea 100644
--- a/src/background_gc.erl
+++ b/src/background_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(background_gc).
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 102c353f..106179fd 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(credit_flow).
diff --git a/src/delegate.erl b/src/delegate.erl
index 96b8ba31..e833b819 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 2a8b915b..30400b3e 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/dtree.erl b/src/dtree.erl
index ca2d30cf..45eea506 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% A dual-index tree.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3260d369..d2d4d295 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 29d2d713..0c257a84 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index dc55948b..c82327a2 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2012 VMware, Inc.
+%% All modifications are (C) 2009-2013 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -863,7 +863,7 @@ dispatch(Info, Mod, State) ->
common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
reply(From, Reply),
[];
-common_reply(Name, {To, Tag} = From, Reply, NState, Debug) ->
+common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) ->
reply(From, Reply),
sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
diff --git a/src/gm.erl b/src/gm.erl
index 2057b1f5..76b535e6 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm).
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5fbfc223..1034ee2f 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index 84d4ab2f..3fe3b182 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index a9c0ba90..efb87a4c 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index c4e046b5..e2ab2380 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 24c3ebd0..33d09f7f 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index f8cbd853..ea6b82c8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
index a3773d90..41a349be 100644
--- a/src/mnesia_sync.erl
+++ b/src/mnesia_sync.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(mnesia_sync).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index e2e82f1f..7377fbf0 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2012 VMware, Inc.
+%% All modifications are (C) 2010-2013 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/pmon.erl b/src/pmon.erl
index 37898119..ed32b8b2 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(pmon).
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 780fa2e9..02a0a1df 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7b8348fc..f3ba022a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -258,16 +258,28 @@
%%----------------------------------------------------------------------------
+%% HiPE compilation happens before we have log handlers - so we have
+%% to io:format/2, it's all we can do.
+
maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile();
- {true, false} -> io:format("Not HiPE compiling: HiPE not found in "
- "this Erlang installation.~n");
- {false, _} -> ok
+ {true, true} -> hipe_compile(),
+ true;
+ {true, false} -> false;
+ {false, _} -> true
end.
+warn_if_hipe_compilation_failed(true) ->
+ ok;
+warn_if_hipe_compilation_failed(false) ->
+ error_logger:warning_msg(
+ "Not HiPE compiling: HiPE not found in this Erlang installation.~n").
+
+%% HiPE compilation happens before we have log handlers and can take a
+%% long time, so make an exception to our no-stdout policy and display
+%% progress via stdout.
hipe_compile() ->
Count = length(?HIPE_WORTHY),
io:format("~nHiPE compiling: |~s|~n |",
@@ -311,14 +323,15 @@ start() ->
rabbit_mnesia:check_cluster_consistency(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
- ok = print_plugin_info(rabbit_plugins:active())
+ ok = log_broker_started(rabbit_plugins:active())
end).
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- maybe_hipe_compile(),
+ Success = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
+ warn_if_hipe_compilation_failed(Success),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
@@ -332,7 +345,7 @@ boot() ->
false),
ok = app_utils:start_applications(
StartupApps, fun handle_app_error/2),
- ok = print_plugin_info(Plugins)
+ ok = log_broker_started(Plugins)
end).
handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
@@ -342,6 +355,8 @@ handle_app_error(App, Reason) ->
throw({could_not_start, App, Reason}).
start_it(StartFun) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_boot, Marker),
try
StartFun()
catch
@@ -350,11 +365,17 @@ start_it(StartFun) ->
_:Reason ->
boot_error(Reason, erlang:get_stacktrace())
after
+ unlink(Marker),
+ Marker ! stop,
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
+ case whereis(rabbit_boot) of
+ undefined -> ok;
+ _ -> await_startup()
+ end,
rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
@@ -422,13 +443,14 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n",
- [Vsn, erlang:system_info(otp_release)]),
+ error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
+ log_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
- io:format("~nbroker running~n"),
{ok, SupPid};
Error ->
Error
@@ -457,22 +479,16 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({StepName, Attributes}) ->
- Description = case lists:keysearch(description, 1, Attributes) of
- {value, {_, D}} -> D;
- false -> StepName
- end,
+run_boot_step({_StepName, Attributes}) ->
case [MFA || {mfa, MFA} <- Attributes] of
[] ->
- io:format("-- ~s~n", [Description]);
+ ok;
MFAs ->
- io:format("starting ~-60s ...", [Description]),
[try
apply(M,F,A)
catch
_:Reason -> boot_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
- io:format("done~n"),
ok
end.
@@ -533,6 +549,9 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+-ifdef(use_specs).
+-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
+-endif.
boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
{Err, Nodes} =
@@ -552,13 +571,15 @@ boot_error(Reason, Stacktrace) ->
Args = [Reason, log_location(kernel), log_location(sasl)],
boot_error(Reason, Fmt, Args, Stacktrace).
+-ifdef(use_specs).
+-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
+ -> no_return()).
+-endif.
+boot_error(Reason, Fmt, Args, not_available) ->
+ basic_boot_error(Reason, Fmt, Args);
boot_error(Reason, Fmt, Args, Stacktrace) ->
- case Stacktrace of
- not_available -> basic_boot_error(Reason, Fmt, Args);
- _ -> basic_boot_error(Reason, Fmt ++
- "Stack trace:~n ~p~n~n",
- Args ++ [Stacktrace])
- end.
+ basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace]).
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
@@ -684,24 +705,17 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
%% misc
-print_plugin_info([]) ->
- ok;
-print_plugin_info(Plugins) ->
- %% This gets invoked by rabbitmqctl start_app, outside the context
- %% of the rabbit application
+log_broker_started(Plugins) ->
rabbit_misc:with_local_io(
fun() ->
- io:format("~n-- plugins running~n"),
- [print_plugin_info(
- AppName, element(2, application:get_key(AppName, vsn)))
- || AppName <- Plugins],
- ok
+ PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
+ || P <- Plugins]),
+ error_logger:info_msg(
+ "Server startup complete; ~b plugins started.~n~s",
+ [length(Plugins), PluginList]),
+ io:format(" completed with ~p plugins.~n", [length(Plugins)])
end).
-print_plugin_info(Plugin, Vsn) ->
- Len = 76 - length(Vsn),
- io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]).
-
erts_version_check() ->
FoundVer = erlang:system_info(version),
case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of
@@ -713,49 +727,39 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- ProductLen = string:len(Product),
- io:format("~n"
- "+---+ +---+~n"
- "| | | |~n"
- "| | | |~n"
- "| | | |~n"
- "| +---+ +-------+~n"
- "| |~n"
- "| ~s +---+ |~n"
- "| | | |~n"
- "| ~s +---+ |~n"
- "| |~n"
- "+-------------------+~n"
- "~s~n~s~n~s~n~n",
- [Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION,
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ io:format("~n ~s ~s. ~s"
+ "~n ## ## ~s"
+ "~n ## ##"
+ "~n ########## Logs: ~s"
+ "~n ###### ## ~s"
+ "~n ##########"
+ "~n Starting broker...",
+ [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
+ log_location(kernel), log_location(sasl)]).
+
+log_banner() ->
Settings = [{"node", node()},
- {"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
{"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(version)}],
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = fun (K, V) ->
- io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
- [K, V])
+ rabbit_misc:format(
+ "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V])
end,
- lists:foreach(fun ({"config file(s)" = K, []}) ->
- Format(K, "(none)");
- ({"config file(s)" = K, [V0 | Vs]}) ->
- Format(K, V0), [Format("", V) || V <- Vs];
- ({K, V}) ->
- Format(K, V)
- end, Settings),
- io:nl().
-
-app_location() ->
- {ok, Application} = application:get_application(),
- filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+ Banner = iolist_to_binary(
+ [case S of
+ {"config file(s)" = K, []} ->
+ Format(K, "(none)");
+ {"config file(s)" = K, [V0 | Vs]} ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ {K, V} ->
+ Format(K, V)
+ end || S <- Settings]),
+ error_logger:info_msg("~s", [Banner]).
home_dir() ->
case init:get_argument(home) of
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 75c53511..16387268 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d7d4d82a..362b11aa 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
@@ -67,9 +67,8 @@ start() ->
stop() -> ok.
-register(Pid, HighMemMFA) ->
- gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA},
- infinity).
+register(Pid, AlertMFA) ->
+ gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity).
set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}).
clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}).
@@ -94,9 +93,9 @@ init([]) ->
alarmed_nodes = dict:new(),
alarms = []}}.
-handle_call({register, Pid, HighMemMFA}, State) ->
+handle_call({register, Pid, AlertMFA}, State) ->
{ok, 0 < dict:size(State#alarms.alarmed_nodes),
- internal_register(Pid, HighMemMFA, State)};
+ internal_register(Pid, AlertMFA, State)};
handle_call(get_alarms, State = #alarms{alarms = Alarms}) ->
{ok, Alarms, State};
@@ -121,8 +120,8 @@ handle_event({node_up, Node}, State) ->
handle_event({node_down, Node}, State) ->
{ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)};
-handle_event({register, Pid, HighMemMFA}, State) ->
- {ok, internal_register(Pid, HighMemMFA, State)};
+handle_event({register, Pid, AlertMFA}, State) ->
+ {ok, internal_register(Pid, AlertMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
@@ -198,14 +197,14 @@ alert(Alertees, Source, Alert, NodeComparator) ->
end
end, ok, Alertees).
-internal_register(Pid, {M, F, A} = HighMemMFA,
+internal_register(Pid, {M, F, A} = AlertMFA,
State = #alarms{alertees = Alertees}) ->
_MRef = erlang:monitor(process, Pid),
case dict:find(node(), State#alarms.alarmed_nodes) of
{ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources];
error -> ok
end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ NewAlertees = dict:store(Pid, AlertMFA, Alertees),
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6a31b24d..82ac74fa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5,
- delete_immediately/1, delete/3, purge/1]).
+ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -135,6 +135,7 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
+-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -174,8 +175,7 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
--spec(sync_mirrors/1 :: (pid()) ->
- 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')).
-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
@@ -600,6 +600,24 @@ internal_delete(QueueName) ->
end
end).
+forget_all_durable(Node) ->
+ %% Note rabbit is not running so we avoid e.g. the worker pool. Also why
+ %% we don't invoke the return from rabbit_binding:process_deletions/1.
+ {atomic, ok} =
+ mnesia:sync_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_'}, write),
+ [rabbit_binding:process_deletions(
+ internal_delete1(Name)) ||
+ #amqqueue{name = Name, pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node,
+ rabbit_policy:get(<<"ha-mode">>, Q)
+ =:= {error, not_found}],
+ ok
+ end),
+ ok.
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 589e8289..72d6ab43 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -95,7 +95,6 @@
messages_unacknowledged,
messages,
consumers,
- active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
@@ -261,7 +260,11 @@ init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
- stop_sync_timer(stop_rate_timer(State)),
+ lists:foldl(fun (F, S) -> F(S) end, State,
+ [fun stop_sync_timer/1,
+ fun stop_rate_timer/1,
+ fun stop_expiry_timer/1,
+ fun stop_ttl_timer/1]),
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
@@ -296,36 +299,18 @@ backing_queue_module(Q) ->
true -> rabbit_mirror_queue_master
end.
-ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State#q{sync_timer_ref = TRef};
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #q.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
-stop_sync_timer(State = #q{sync_timer_ref = undefined}) ->
- State;
-stop_sync_timer(State = #q{sync_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{sync_timer_ref = undefined}.
-
-ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- TRef = erlang:send_after(
- ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
- State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State) ->
- State.
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref).
-stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
- State;
-stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{rate_timer_ref = undefined}.
+ensure_rate_timer(State) ->
+ rabbit_misc:ensure_timer(State, #q.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
- State;
-stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
- erlang:cancel_timer(TRef),
- State#q{expiry_timer_ref = undefined}.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% We wish to expire only when there are no consumers *and* the expiry
%% hasn't been refreshed (by queue.declare or basic.get) for the
@@ -335,17 +320,41 @@ ensure_expiry_timer(State = #q{expires = undefined}) ->
ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
true -> NewState = stop_expiry_timer(State),
- TRef = erlang:send_after(Expires, self(), maybe_expire),
- NewState#q{expiry_timer_ref = TRef};
+ rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
+ Expires, maybe_expire);
false -> State
end.
+stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
+
+ensure_ttl_timer(undefined, State) ->
+ State;
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
+ After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
+ State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
+ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
+ ttl_timer_expiry = TExpiry})
+ when Expiry + 1000 < TExpiry ->
+ case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
+ end;
+ensure_ttl_timer(_Expiry, State) ->
+ State.
+
+stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
+
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -467,9 +476,7 @@ deliver_msg_to_consumer(DeliverFun,
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State1),
- {Result, BQ:is_empty(BQS), State2}.
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -517,12 +524,10 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ is_empty(State), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -558,9 +563,21 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_drop_head(State2),
+ IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State3#q{backing_queue_state = BQS1})
+ State4 = State3#q{backing_queue_state = BQS1},
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that IFF the new message ends up at the head
+ %% of the queue (because the queue was empty) and has an
+ %% expiry. Only then may it need expiring straight away,
+ %% or, if expiry is not due yet, the expiry timer may need
+ %% (re)scheduling.
+ case {IsEmpty, Props#message_properties.expiry} of
+ {false, _} -> State4;
+ {true, undefined} -> State4;
+ {true, _} -> drop_expired_msgs(State4)
+ end
end.
maybe_drop_head(State = #q{max_length = undefined}) ->
@@ -582,12 +599,12 @@ maybe_drop_head(State = #q{max_length = MaxLen,
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -678,13 +695,8 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-consumer_count() -> consumer_count(fun (_) -> false end).
-
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
-consumer_count(Exclude) ->
- lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
- not Exclude(C)]).
+consumer_count() ->
+ lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
is_unused(_State) -> consumer_count() == 0.
@@ -731,9 +743,14 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{backing_queue_state = BQS,
- backing_queue = BQ }) ->
- Now = now_micros(),
+drop_expired_msgs(State) ->
+ case is_empty(State) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
with_dlx(
@@ -742,8 +759,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS,
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
{Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
- undefined -> undefined;
- #message_properties{expiry = Exp} -> Exp
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
end, State1).
with_dlx(undefined, _With, Without) -> Without();
@@ -767,7 +784,7 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
State1.
dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) ->
- {ok State1} =
+ {ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS1) ->
{{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1),
@@ -798,25 +815,6 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
queue_monitors = QMons1,
backing_queue_state = BQS2}}.
-ensure_ttl_timer(undefined, State) ->
- State;
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
- After = (case Expiry - now_micros() of
- V when V > 0 -> V + 999; %% always fire later
- _ -> 0
- end) div 1000,
- TRef = erlang:send_after(After, self(), drop_expired),
- State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
-ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
- ttl_timer_expiry = TExpiry})
- when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
- false -> State;
- _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
- end;
-ensure_ttl_timer(_Expiry, State) ->
- State.
-
dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
@@ -963,8 +961,6 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1108,7 +1104,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
@@ -1132,7 +1128,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1141,18 +1137,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
@@ -1181,8 +1169,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ ensure_expiry_timer(State),
+ reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1204,7 +1192,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, _From,
- State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
HandleInfo = fun (Status) ->
@@ -1220,13 +1208,9 @@ handle_call(sync_mirrors, _From,
State, #q.stats_timer,
fun() -> emit_stats(State#q{status = Status}) end)
end,
- case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(
- HandleInfo, EmitStats, BQS) of
- {ok, BQS1} -> reply(ok, S(BQS1));
- {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
- end;
- _ -> reply({error, pending_acks}, State)
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
handle_call(sync_mirrors, _From, State) ->
@@ -1264,8 +1248,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index a4305e5f..d7257a69 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index c9475efd..72f81707 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 919be3f3..2dc1cad3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index c7d74dc3..99e4468e 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index c0d86cd1..847a38f5 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
@@ -33,8 +33,7 @@
%% referring generically to "SASL security mechanism", i.e. the above.
description() ->
- [{name, <<"AMQPLAIN">>},
- {description, <<"QPid AMQPLAIN mechanism">>}].
+ [{description, <<"QPid AMQPLAIN mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index 5df1d5d7..4b08e4be 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
@@ -37,8 +37,7 @@
%% SECURE-OK: "My password is ~s", [Password]
description() ->
- [{name, <<"RABBIT-CR-DEMO">>},
- {description, <<"RabbitMQ Demo challenge-response authentication "
+ [{description, <<"RabbitMQ Demo challenge-response authentication "
"mechanism">>}].
should_offer(_Sock) ->
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 423170e1..a35a133a 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
@@ -36,8 +36,7 @@
%% matching and will thus be much faster.
description() ->
- [{name, <<"PLAIN">>},
- {description, <<"SASL PLAIN authentication mechanism">>}].
+ [{description, <<"SASL PLAIN authentication mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 99b5946e..2f247448 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,15 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
@@ -71,10 +69,14 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
-%% Remove all messages in the queue, but not messages which have been
-%% fetched and are pending acks.
+%% Remove all 'fetchable' messages from the queue, i.e. all messages
+%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean(), pid(),
@@ -164,7 +166,7 @@
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
rabbit_types:message_properties(),
- A) -> {('stop' | 'cont'), A}),
+ boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
@@ -226,7 +228,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 5b3b8aa8..052db3a5 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
@@ -334,7 +334,7 @@ postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
{_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
{stop, Acc};
({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
- FoldFun(Msg, MsgProps, Acc)
+ FoldFun(Msg, MsgProps, false, Acc)
end, {cont, Acc0}, gb_trees:to_list(Messages)),
true = Model =:= Res;
@@ -397,10 +397,11 @@ rand_choice(List, Selection, N) ->
N - 1).
makefoldfun(Size) ->
- fun (Msg, _MsgProps, Acc) ->
- case length(Acc) > Size of
- false -> {cont, [Msg | Acc]};
- true -> {stop, Acc}
+ fun (Msg, _MsgProps, Unacked, Acc) ->
+ case {length(Acc) > Size, Unacked} of
+ {false, false} -> {cont, [Msg | Acc]};
+ {false, true} -> {cont, Acc};
+ {true, _} -> {stop, Acc}
end
end.
foldacc() -> [].
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9bd1fad9..c42289c7 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index a333c1ce..05040485 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 53878d6a..9407dd2e 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 2d486651..6096e07b 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e3dfc5..0510afa9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -262,7 +262,7 @@ handle_cast({method, Method, Content, Flow},
end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
- ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
+ ok = send(Reply, NewState),
noreply(NewState);
{noreply, NewState} ->
noreply(NewState);
@@ -284,18 +284,20 @@ handle_cast(ready_for_close, State = #ch{state = closing,
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State) ->
+handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
-handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
- State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(consumer_monitor(ConsumerTag, State));
+handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) ->
+ ok = send(Msg, State),
+ noreply(consumer_monitor(CTag, State));
-handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Msg),
+handle_cast({command, Msg}, State) ->
+ ok = send(Msg, State),
noreply(State);
+handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+ noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -394,6 +396,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+send(_Command, #ch{state = closing}) ->
+ ok;
+send(Command, #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Command).
+
handle_exception(Reason, State = #ch{protocol = Protocol,
channel = Channel,
writer_pid = WriterPid,
@@ -412,8 +419,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+-ifdef(use_specs).
+-spec(precondition_failed/1 :: (string()) -> no_return()).
+-endif.
precondition_failed(Format) -> precondition_failed(Format, []).
+-ifdef(use_specs).
+-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
+-endif.
precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
@@ -531,16 +544,17 @@ check_name(_Kind, NameBin) ->
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case sets:is_element(QPid, Blocking) of
false -> State;
- true -> Blocking1 = sets:del_element(QPid, Blocking),
- ok = case sets:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ true -> maybe_send_flow_ok(
+ State#ch{blocking = sets:del_element(QPid, Blocking)})
end.
+maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
+ case sets:size(Blocking) of
+ 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
+ _ -> ok
+ end,
+ State.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -565,14 +579,25 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
stop;
-handle_method(#'channel.close'{}, _, State = #ch{state = closing}) ->
- {reply, #'channel.close_ok'{}, State};
+handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
+ state = closing}) ->
+ ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
+ {noreply, State};
handle_method(_Method, _, State = #ch{state = closing}) ->
{noreply, State};
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
@@ -812,12 +837,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
- {noreply, State2 = #ch{writer_pid = WriterPid}} =
- handle_method(#'basic.recover_async'{requeue = Requeue},
- Content,
- State),
- ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
- {noreply, State2};
+ {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue},
+ Content, State),
+ {reply, #'basic.recover_ok'{}, State1};
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
@@ -1072,12 +1094,9 @@ handle_method(#'channel.flow'{active = false}, _,
end,
State1 = State#ch{limiter = Limiter1},
ok = rabbit_limiter:block(Limiter1),
- case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
- end;
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1127,17 +1146,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- writer_pid = WriterPid}) ->
+ queue_consumers = QCons}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
ConsumerMapping1 =
gb_sets:fold(fun (CTag, CMap) ->
- Cancel = #'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ State),
dict:erase(CTag, CMap)
end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
@@ -1399,12 +1417,17 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
send_nacks([], State) ->
State;
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
+ State;
send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
+send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
+ State#ch{tx = failed};
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).
@@ -1423,9 +1446,10 @@ send_confirms(State) ->
send_confirms([], State) ->
State;
-send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = MsgSeqNo}),
+send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
+ State;
+send_confirms([MsgSeqNo], State) ->
+ ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
State;
send_confirms(Cs, State) ->
coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
@@ -1433,8 +1457,7 @@ send_confirms(Cs, State) ->
multiple = Multiple}
end, State).
-coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
@@ -1443,11 +1466,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
- _ -> ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(lists:last(Ms), true))
+ _ -> ok = send(MkMsgFun(lists:last(Ms), true), State)
end,
- [ok = rabbit_writer:send_command(
- WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
+ [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
State.
ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
@@ -1464,7 +1485,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
end.
complete_tx(State = #ch{tx = committing}) ->
- ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ ok = send(#'tx.commit_ok'{}, State),
State#ch{tx = new_tx()};
complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 42459833..8ea44a81 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 995c41fb..16fd08be 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index c508f1b9..9602c512 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index adf6e417..a88bec3d 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 12a532b6..31bc51b8 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
@@ -42,16 +42,11 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
- {ok, ChannelSupSupPid} =
- supervisor2:start_child(
- SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
SupPid,
{reader, {rabbit_reader, start_link,
- [ChannelSupSupPid, Collector,
+ [SupPid, Collector,
rabbit_heartbeat:start_heartbeat_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}),
{ok, SupPid, ReaderPid}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index fc9c41a4..f5e70365 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_control_main).
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 689e5d83..53144f3f 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 6330d555..b396b289 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_disk_monitor).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index a9af2d8a..1360c82a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 042ab23c..3efc9c0c 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 7d91b6fa..a91a9916 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
@@ -110,18 +110,18 @@ ensure_stats_timer(C, P, Msg) ->
stop_stats_timer(C, P) ->
case element(P, C) of
- #state{level = Level, timer = TRef} = State
- when Level =/= none andalso TRef =/= undefined ->
- erlang:cancel_timer(TRef),
- setelement(P, C, State#state{timer = undefined});
+ #state{timer = TRef} = State when TRef =/= undefined ->
+ case erlang:cancel_timer(TRef) of
+ false -> C;
+ _ -> setelement(P, C, State#state{timer = undefined})
+ end;
#state{} ->
C
end.
reset_stats_timer(C, P) ->
case element(P, C) of
- #state{timer = TRef} = State
- when TRef =/= undefined ->
+ #state{timer = TRef} = State when TRef =/= undefined ->
setelement(P, C, State#state{timer = undefined});
#state{} ->
C
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9339161f..88033f77 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 08819427..befbc462 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_decorator).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index c5583ffd..1fbcb2d8 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 9a5665c0..213b24c4 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
@@ -31,8 +31,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"direct">>},
- {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index d9a2f60f..5b17ed56 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
@@ -31,8 +31,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"fanout">>},
- {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 516b78e5..75899160 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
@@ -37,8 +37,7 @@
-endif.
description() ->
- [{name, <<"headers">>},
- {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 101fe434..6b07351a 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_invalid).
@@ -24,13 +24,16 @@
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
- [{name, <<"invalid">>},
- {description,
+ [{description,
<<"Dummy exchange type, to be used when the intended one is not found.">>
}].
serialise_events() -> false.
+-ifdef(use_specs).
+-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> no_return()).
+-endif.
route(#exchange{name = Name, type = Type}, _) ->
rabbit_misc:protocol_error(
precondition_failed,
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 644d9acf..bd8ad1ac 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
@@ -34,8 +34,7 @@
%%----------------------------------------------------------------------------
description() ->
- [{name, <<"topic">>},
- {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
serialise_events() -> false.
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 26f74796..3ceb4989 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index a79188ab..93305483 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 8ee9ad5b..6c45deea 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 05aad8c9..e878f3bb 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b15498e..8a7d14fe 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8dfa89d3..74cdeb23 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index f22ad874..117ff95a 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index e1a21cf7..625e2f07 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b5f72cad..bcd4861a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/4,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -198,6 +198,8 @@ purge(State = #state { gm = GM,
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1 }}.
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
+
publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 58f20476..4fb1fc3b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -32,6 +32,8 @@
[policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
{ok, SPid} when is_pid(SPid) ->
+ maybe_auto_sync(Q),
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
@@ -235,13 +238,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +252,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +299,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -302,6 +316,14 @@ is_mirrored(Q) ->
_ -> false
end.
+maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+ case policy(<<"ha-sync-mode">>, Q) of
+ <<"automatic">> ->
+ spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
+ _ ->
+ ok
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
- OldNodes = All(actual_queue_nodes(OldQ)),
- NewNodes = All(suggested_queue_nodes(NewQ)),
- add_mirrors(QName, NewNodes -- OldNodes),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ OldNodes = [OldMNode | OldSNodes],
+ NewNodes = [NewMNode | NewSNodes],
+ add_mirrors (QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
+ maybe_auto_sync(NewQ),
ok.
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)).
+ case validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)) of
+ ok -> case proplists:get_value(
+ <<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end;
+ E -> E
+ end.
validate_policy(<<"all">>, none) ->
ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index feddf45a..b435e0f3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -227,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer},
backing_queue = BQ,
backing_queue_state = BQS }) ->
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
- S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
- rate_timer_ref = TRefN,
- backing_queue_state = BQSN} end,
- %% [0] We can only sync when there are no pending acks
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
case rabbit_mirror_queue_sync:slave(
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
@@ -240,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer},
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
- {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
{failed, Res} -> noreply(S(Res));
{stop, Reason, Res} -> {stop, Reason, S(Res)}
end;
@@ -469,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
@@ -589,30 +588,18 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) ->
backing_queue_timeout(State = #state { backing_queue = BQ }) ->
run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout),
- State #state { sync_timer_ref = TRef };
ensure_sync_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.sync_timer_ref,
+ ?SYNC_INTERVAL, sync_timeout).
+
+stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref).
-stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { sync_timer_ref = undefined }.
-
-ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
- self(), update_ram_duration),
- State #state { rate_timer_ref = TRef };
ensure_rate_timer(State) ->
- State.
+ rabbit_misc:ensure_timer(State, #state.rate_timer_ref,
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ update_ram_duration).
-stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
- State;
-stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #state { rate_timer_ref = undefined }.
+stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.
@@ -843,16 +830,21 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
+%% [1] - the arrival of this newly synced slave may cause the master to die if
+%% the admin has requested a migration-type change to policy.
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q = #amqqueue { sync_slave_pids = SSPids }] ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
- ok
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q1, Q2}
+ end
+ end) of
+ ok -> ok;
+ {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1]
+ end.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index a2034876..be3924f0 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f2ab67cd..b8cfe4a9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,6 +57,9 @@
-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
-type(bq() :: atom()).
-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
@@ -69,8 +72,8 @@
-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
'denied' |
- {'ok' | 'failed', {timer:tref(), bqs()}} |
- {'stop', any(), {timer:tref(), bqs()}}).
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
-endif.
@@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Acc) ->
- master_send(Msg, MsgProps, Args, Acc)
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {I, Last}) ->
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
@@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
@@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps} ->
+ {msg, Ref, Msg, MsgProps, Unacked} ->
SPids1 = wait_for_credit(SPids),
[begin
credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps}
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
@@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
Syncer ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
- rabbit_misc:get_parent()}, TRef, BQS1).
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
- TRef, BQS) ->
+ State = {MA, TRef, BQS}) ->
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
credit_flow:peer_down(Syncer),
- {failed, {TRef, BQS1}};
+ {failed, {[], TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef, [flush]),
credit_flow:peer_down(Syncer),
- {ok, {TRef, BQS}};
+ {ok, State};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ slave_sync_loop(Args, {MA, TRef, BQS1});
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Args, TRef1, BQS1);
- {sync_msg, Ref, Msg, Props} ->
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
+ {sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
- {stop, Reason, {TRef, BQS}};
+ {stop, Reason, State};
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}}
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ce3e3802..c36fb147 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -67,6 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
+-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
%% Horrible macro to use in guards
@@ -242,6 +243,8 @@
-spec(interval_operation/4 ::
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
+-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
+-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
-endif.
@@ -1047,6 +1050,22 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
round(LastInterval / 1.5)])
end}.
+ensure_timer(State, Idx, After, Msg) ->
+ case element(Idx, State) of
+ undefined -> TRef = erlang:send_after(After, self(), Msg),
+ setelement(Idx, State, TRef);
+ _ -> State
+ end.
+
+stop_timer(State, Idx) ->
+ case element(Idx, State) of
+ undefined -> State;
+ TRef -> case erlang:cancel_timer(TRef) of
+ false -> State;
+ _ -> setelement(Idx, State, undefined)
+ end
+ end.
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6a442fec..c39e898c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_mnesia).
@@ -41,10 +41,7 @@
]).
%% Used internally in rpc calls
--export([node_info/0,
- remove_node_if_mnesia_running/1,
- is_running_remote/0
- ]).
+-export([node_info/0, remove_node_if_mnesia_running/1]).
-include("rabbit.hrl").
@@ -278,16 +275,16 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
true -> ok;
false -> e(not_a_cluster_node)
end,
- case {RemoveWhenOffline, mnesia:system_info(is_running)} of
- {true, no} -> remove_node_offline_node(Node);
- {true, yes} -> e(online_node_offline_flag);
- {false, no} -> e(offline_node_no_offline_flag);
- {false, yes} -> rabbit_misc:local_info_msg(
- "Removing node ~p from cluster~n", [Node]),
- case remove_node_if_mnesia_running(Node) of
- ok -> ok;
- {error, _} = Err -> throw(Err)
- end
+ case {RemoveWhenOffline, is_running()} of
+ {true, false} -> remove_node_offline_node(Node);
+ {true, true} -> e(online_node_offline_flag);
+ {false, false} -> e(offline_node_no_offline_flag);
+ {false, true} -> rabbit_misc:local_info_msg(
+ "Removing node ~p from cluster~n", [Node]),
+ case remove_node_if_mnesia_running(Node) of
+ ok -> ok;
+ {error, _} = Err -> throw(Err)
+ end
end.
remove_node_offline_node(Node) ->
@@ -334,11 +331,11 @@ status() ->
end,
[{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
IfNonEmpty(ram, cluster_nodes(ram)))}] ++
- case mnesia:system_info(is_running) of
- yes -> RunningNodes = cluster_nodes(running),
- [{running_nodes, cluster_nodes(running)},
- {partitions, mnesia_partitions(RunningNodes)}];
- no -> []
+ case is_running() of
+ true -> RunningNodes = cluster_nodes(running),
+ [{running_nodes, RunningNodes},
+ {partitions, mnesia_partitions(RunningNodes)}];
+ false -> []
end.
mnesia_partitions(Nodes) ->
@@ -346,6 +343,8 @@ mnesia_partitions(Nodes) ->
Nodes, rabbit_node_monitor, partitions, []),
[Reply || Reply = {_, R} <- Replies, R =/= []].
+is_running() -> mnesia:system_info(is_running) =:= yes.
+
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
@@ -355,10 +354,10 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% the data from mnesia. Obviously it'll work only when mnesia is
%% running.
cluster_status_from_mnesia() ->
- case mnesia:system_info(is_running) of
- no ->
+ case is_running() of
+ false ->
{error, mnesia_not_running};
- yes ->
+ true ->
%% If the tables are not present, it means that
%% `init_db/3' hasn't been run yet. In other words, either
%% we are a virgin node or a restarted RAM node. In both
@@ -601,19 +600,16 @@ discover_cluster(Nodes) when is_list(Nodes) ->
lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
(Node, {error, _}) -> discover_cluster(Node)
end, {error, no_nodes_provided}, Nodes);
+discover_cluster(Node) when Node == node() ->
+ {error, {cannot_discover_cluster, "Cannot cluster node with itself"}};
discover_cluster(Node) ->
OfflineError =
{error, {cannot_discover_cluster,
"The nodes provided are either offline or not running"}},
- case node() of
- Node -> {error, {cannot_discover_cluster,
- "Cannot cluster node with itself"}};
- _ -> case rpc:call(Node,
- rabbit_mnesia, cluster_status_from_mnesia, []) of
- {badrpc, _Reason} -> OfflineError;
- {error, mnesia_not_running} -> OfflineError;
- {ok, Res} -> {ok, Res}
- end
+ case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
end.
schema_ok_or_move() ->
@@ -672,20 +668,21 @@ move_db() ->
ok.
remove_node_if_mnesia_running(Node) ->
- case mnesia:system_info(is_running) of
- yes ->
+ case is_running() of
+ false ->
+ {error, mnesia_not_running};
+ true ->
%% Deleting the the schema copy of the node will result in
%% the node being removed from the cluster, with that
%% change being propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
+ rabbit_amqqueue:forget_all_durable(Node),
rabbit_node_monitor:notify_left_cluster(Node),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
- end;
- no ->
- {error, mnesia_not_running}
+ end
end.
leave_cluster() ->
@@ -735,8 +732,6 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}.
-
check_consistency(OTP, Rabbit) ->
rabbit_misc:sequence_error(
[check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index f685b109..81111061 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c2e55022..13b40a48 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
@@ -943,15 +943,12 @@ next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
_ -> {State, 0}
end.
-start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync),
- State #msstate { sync_timer_ref = TRef }.
+start_sync_timer(State) ->
+ rabbit_misc:ensure_timer(State, #msstate.sync_timer_ref,
+ ?SYNC_INTERVAL, sync).
-stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
- State;
-stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
- erlang:cancel_timer(TRef),
- State #msstate { sync_timer_ref = undefined }.
+stop_sync_timer(State) ->
+ rabbit_misc:stop_timer(State, #msstate.sync_timer_ref).
internal_sync(State = #msstate { current_file_handle = CurHdl,
cref_to_msg_ids = CTM }) ->
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 3defeaaf..bbc7db68 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 3b61ed0b..3881de23 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index 6cc0b2a7..f0096446 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 562fc197..b8b03f56 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 31eeef73..4b6c7538 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,14 +11,15 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
stop_tcp_listener/1, on_node_down/1, active_listeners/0,
- node_listeners/1, connections/0, connection_info_keys/0,
+ node_listeners/1, register_connection/1, unregister_connection/1,
+ connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
@@ -65,6 +66,8 @@
-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(register_connection/1 :: (pid()) -> ok).
+-spec(unregister_connection/1 :: (pid()) -> ok).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
-spec(connections_local/0 :: () -> [rabbit_types:connection()]).
-spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -294,20 +297,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
+
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
+
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() ->
- [Reader ||
- {_, ConnSup, supervisor, _}
- <- supervisor:which_children(rabbit_tcp_client_sup),
- Reader <- [try
- rabbit_connection_sup:reader(ConnSup)
- catch exit:{noproc, _} ->
- noproc
- end],
- Reader =/= noproc].
+connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 258ac0ce..71c2c80a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index c8d77b0f..c92e5963 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_nodes).
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index 24762a73..a4bd5042 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -11,12 +11,12 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_parameter_validation).
--export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -73,3 +73,15 @@ proplist(Name, Constraints, Term) when is_list(Term) ->
proplist(Name, _Constraints, Term) ->
{error, "~s not a list ~p", [Name, Term]}.
+
+enum(OptionsA) ->
+ Options = [list_to_binary(atom_to_list(O)) || O <- OptionsA],
+ fun (Name, Term) when is_binary(Term) ->
+ case lists:member(Term, Options) of
+ true -> ok;
+ false -> {error, "~s should be one of ~p, actually was ~p",
+ [Name, Options, Term]}
+ end;
+ (Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}
+ end.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 9f94af7d..58c906eb 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
@@ -64,8 +64,8 @@ list(PluginsDir) ->
[plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]),
case Problems of
[] -> ok;
- _ -> io:format("Warning: Problem reading some plugins: ~p~n",
- [Problems])
+ _ -> error_logger:warning_msg(
+ "Problem reading some plugins: ~p~n", [Problems])
end,
Plugins.
@@ -112,8 +112,9 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
case Enabled -- plugin_names(ToUnpackPlugins) of
[] -> ok;
- Missing -> io:format("Warning: the following enabled plugins were "
- "not found: ~p~n", [Missing])
+ Missing -> error_logger:warning_msg(
+ "The following enabled plugins were not found: ~p~n",
+ [Missing])
end,
%% Eliminate the contents of the destination directory
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 2158d1da..308b80cd 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins_main).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 2c997f16..7398cd2d 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_policy).
@@ -26,7 +26,7 @@
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
@@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(_VHost, <<"policy">>, _Name) ->
- ok.
-
notify(VHost, <<"policy">>, _Name, _Term) ->
update_policies(VHost).
@@ -218,10 +215,13 @@ validation(_Name, Terms) when is_list(Terms) ->
rabbit_registry:lookup_all(policy_validator)),
[] = dups(Keys), %% ASSERTION
Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
- {TermKeys, _} = lists:unzip(Terms),
- case dups(TermKeys) of
- [] -> validation0(Validators, Terms);
- Dup -> {error, "~p duplicate keys not allowed", [Dup]}
+ case is_proplist(Terms) of
+ true -> {TermKeys, _} = lists:unzip(Terms),
+ case dups(TermKeys) of
+ [] -> validation0(Validators, Terms);
+ Dup -> {error, "~p duplicate keys not allowed", [Dup]}
+ end;
+ false -> {error, "definition must be a dictionary: ~p", [Terms]}
end;
validation(_Name, Term) ->
{error, "parse error while reading policy: ~p", [Term]}.
@@ -249,3 +249,5 @@ validation0(Validators, Terms) ->
a2b(A) -> list_to_binary(atom_to_list(A)).
dups(L) -> L -- lists:usort(L).
+
+is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
index b59dec2b..75b88c39 100644
--- a/src/rabbit_policy_validator.erl
+++ b/src/rabbit_policy_validator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_policy_validator).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 404afe3c..3ce516d0 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6dad01cc..521cd78b 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 21f58154..ea70208f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -162,7 +162,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_msg_ids }).
+ max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -190,7 +190,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: gb_set()
+ unconfirmed :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false').
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -269,13 +269,16 @@ delete_and_terminate(State) ->
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ State = #qistate { unconfirmed = Unconfirmed })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- State #qistate {
- unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
+ case MsgProps#message_properties.needs_confirming of
+ true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
+ State #qistate { unconfirmed = Unconfirmed1 };
+ false -> State
+ end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -398,7 +407,7 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = gb_sets:new() }.
+ unconfirmed = gb_sets:new() }.
clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action,
end};
add_to_journal(RelSeq, Action, JEntries) ->
- Val = case array:get(RelSeq, JEntries) of
- undefined ->
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end;
- ({Pub, no_del, no_ack}) when Action == del ->
- {Pub, del, no_ack};
- ({Pub, Del, no_ack}) when Action == ack ->
- {Pub, Del, ack}
- end,
- array:set(RelSeq, Val, JEntries).
+ case array:get(RelSeq, JEntries) of
+ undefined ->
+ array:set(RelSeq,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end, JEntries);
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ ({no_pub, del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ ({?PUB, del, no_ack}) when Action == ack ->
+ array:reset(RelSeq, JEntries)
+ end.
maybe_flush_journal(State = #qistate { dirty_count = DCount,
max_journal_entries = MaxJournal })
@@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_msg_ids = gb_sets:new() }.
+notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State #qistate { unconfirmed = gb_sets:new() }
+ end.
%%----------------------------------------------------------------------------
%% segment manipulation
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 13e8feff..b8ff9c9f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/4, mainloop/2]).
+-export([init/4, mainloop/2, recvloop/2]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,7 +37,8 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+ conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun,
+ buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -64,6 +65,10 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
+-define(IS_STOPPING(State),
+ (State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed)).
+
%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -105,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) ->
shutdown(Pid, Explanation) ->
gen_server:call(Pid, {shutdown, Explanation}, infinity).
-init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) ->
+init(Parent, ConnSupPid, Collector, StartHeartbeatFun) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(
- Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock,
+ Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock,
SockTransform)
end.
@@ -199,7 +204,7 @@ name(Sock) ->
socket_ends(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
+start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
@@ -230,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
- channel_sup_sup_pid = ChannelSupSupPid,
+ conn_sup_pid = ConnSupPid,
+ channel_sup_sup_pid = none,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
@@ -240,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
- recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)),
+ run({?MODULE, recvloop,
+ [Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -259,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
%% accounting as accurate as possible we ought to close the
%% socket w/o delay before termination.
rabbit_net:fast_close(ClientSock),
+ rabbit_networking:unregister_connection(self()),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
+run({M, F, A}) ->
+ try apply(M, F, A)
+ catch {become, MFA} -> run(MFA)
+ end.
+
recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
@@ -282,26 +295,35 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
- {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
- closed -> case State#v1.connection_state of
- closed -> State;
- _ -> throw(connection_closed_abruptly)
- end;
- {error, Reason} -> throw({inet_error, Reason});
- {other, Other} -> handle_other(Other, Deb, State)
+ {data, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ ok;
+ closed ->
+ throw(connection_closed_abruptly);
+ {error, Reason} ->
+ throw({inet_error, Reason});
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb,
+handle_other({conserve_resources, Conserve},
State = #v1{throttle = Throttle}) ->
Throttle1 = Throttle#throttle{conserve_resources = Conserve},
- recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+ control_throttle(State#v1{throttle = Throttle1});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -313,59 +335,54 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% initiated by our parent it is probably more important to exit
%% quickly.
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, _State) ->
+ stop;
+handle_other(handshake_timeout, State)
+ when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
State;
-handle_other(handshake_timeout, Deb, State)
- when ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(ensure_stats, Deb, State) ->
- mainloop(Deb, ensure_stats_timer(State));
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+ control_throttle(State);
+handle_other(Other, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -426,13 +443,13 @@ close_connection(State = #v1{queue_collector = Collector,
handle_dependent_exit(ChPid, Reason, State) ->
case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, uncontrolled} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- maybe_close(control_throttle(State));
- {Channel, uncontrolled} ->
- maybe_close(handle_exception(control_throttle(State),
- Channel, Reason))
+ {undefined, controlled} -> State;
+ {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_Channel, controlled} -> maybe_close(control_throttle(State));
+ {Channel, uncontrolled} -> State1 = handle_exception(
+ State, Channel, Reason),
+ maybe_close(control_throttle(State1))
end.
terminate_channels() ->
@@ -572,17 +589,13 @@ all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
%%--------------------------------------------------------------------------
handle_frame(Type, 0, Payload,
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol}})
- when CS =:= closing; CS =:= closed ->
+ State = #v1{connection = #connection{protocol = Protocol}})
+ when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
-handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
- State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
@@ -600,6 +613,8 @@ handle_frame(Type, Channel, Payload,
heartbeat -> unexpected_frame(Type, Channel, Payload, State);
Frame -> process_frame(Frame, Channel, State)
end;
+handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) ->
+ State;
handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
@@ -627,7 +642,10 @@ process_frame(Frame, Channel, State) ->
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -689,8 +707,12 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+ become_1_0(Id, State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, A, B, C, D});
+ refuse_connection(Sock, {bad_version, {A, B, C, D}});
handle_input(handshake, Other, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
@@ -704,6 +726,7 @@ handle_input(Callback, Data, _State) ->
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
+ rabbit_networking:register_connection(self()),
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
@@ -717,10 +740,13 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
connection_state = starting},
frame_header, 7).
-refuse_connection(Sock, Exception) ->
- ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+refuse_connection(Sock, Exception, {A, B, C, D}) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end),
throw(Exception).
+refuse_connection(Sock, Exception) ->
+ refuse_connection(Sock, Exception, {0, 0, 9, 1}).
+
ensure_stats_timer(State = #v1{connection_state = running}) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
@@ -799,17 +825,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
+ conn_sup_pid = ConnSupPid,
sock = Sock,
throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ {ok, ChannelSupSupPid} =
+ supervisor2:start_child(
+ ConnSupPid,
+ {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
State1 = control_throttle(
- State#v1{connection_state = running,
- connection = NewConnection,
- throttle = Throttle#throttle{
- conserve_resources = Conserve}}),
+ State#v1{connection_state = running,
+ connection = NewConnection,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ throttle = Throttle1}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -820,10 +853,9 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
- State = #v1{connection_state = CS,
- connection = #connection{protocol = Protocol},
+ State = #v1{connection = #connection{protocol = Protocol},
sock = Sock})
- when CS =:= closing; CS =:= closed ->
+ when ?IS_STOPPING(State) ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
@@ -832,8 +864,7 @@ handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
State;
-handle_method0(_Method, State = #v1{connection_state = CS})
- when CS =:= closing; CS =:= closed ->
+handle_method0(_Method, State) when ?IS_STOPPING(State) ->
State;
handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
@@ -981,3 +1012,33 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+-ifdef(use_specs).
+-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()).
+-endif.
+become_1_0(Id, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled);
+ _ -> Mode = case Id of
+ 0 -> amqp;
+ 3 -> sasl;
+ _ -> refuse_connection(
+ Sock, {unsupported_amqp1_0_protocol_id, Id},
+ {3, 1, 0, 0})
+ end,
+ throw({become, {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(State)]}})
+ end.
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ conn_sup_pid = ConnSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF,
+ Buf, BufLen}.
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 32709d24..60419856 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 237ab78c..4c4ab2cf 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index f4bbda0f..2eaef9a7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index 18668049..6b62c974 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameter).
@@ -23,8 +23,6 @@
-callback validate(rabbit_types:vhost(), binary(), binary(),
term()) -> validate_results().
--callback validate_clear(rabbit_types:vhost(), binary(),
- binary()) -> validate_results().
-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
@@ -35,7 +33,6 @@
behaviour_info(callbacks) ->
[
{validate, 4},
- {validate_clear, 3},
{notify, 4},
{notify_clear, 3}
];
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 49060409..b1100b65 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameters).
@@ -120,21 +120,13 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case clear_any0(VHost, Component, Name) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
-
-clear_any0(VHost, Component, Name) ->
- case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Name)) of
- ok -> mnesia_clear(VHost, Component, Name),
- Mod:notify_clear(VHost, Component, Name),
- ok;
- E -> E
- end;
- E -> E
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case lookup_component(Component) of
+ {ok, Mod} -> Mod:notify_clear(VHost, Component, Name);
+ _ -> ok
+ end
end.
mnesia_clear(VHost, Component, Name) ->
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index d4d7271e..05c1dbc1 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -11,14 +11,14 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok;
validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(_, <<"test">>, <<"good">>) -> ok;
-validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
-validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index e8beecfe..566db9a9 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 22ff555f..b1238623 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index f142d233..6a6b2feb 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index fa1c5bbd..d1c0bb1e 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_table).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 09ed3d08..27807b62 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -61,6 +61,7 @@ all_tests() ->
passed = test_runtime_parameters(),
passed = test_policy_validation(),
passed = test_server_status(),
+ passed = test_amqp_connection_refusal(),
passed = test_confirms(),
passed =
do_if_secondary_node(
@@ -911,10 +912,10 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, {OldM, OldSs}, All),
+ Policy, Params, CurrentState, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;
@@ -922,28 +923,36 @@ test_dynamic_mirroring() ->
end
end,
- Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
%% Add a node
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed
- Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
passed.
@@ -1061,7 +1070,11 @@ test_runtime_parameters() ->
ok = control_action(clear_parameter, ["test", "maybe"]),
{error_string, _} =
control_action(clear_parameter, ["test", "neverexisted"]),
+
+ %% We can delete for a component that no longer exists
+ Good(["test", "good", "\"ignore\""]),
rabbit_runtime_parameters_test:unregister(),
+ ok = control_action(clear_parameter, ["test", "good"]),
passed.
test_policy_validation() ->
@@ -1086,7 +1099,7 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Writer = spawn(fun test_writer/0),
{ok, Ch} = rabbit_channel:start_link(
1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
user(<<"user">>), <<"/">>, [], self(),
@@ -1119,11 +1132,9 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
%% list connections
- [#listener{host = H, port = P} | _] =
- [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
- N =:= node()],
-
- {ok, _C} = gen_tcp:connect(H, P, []),
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, []),
+ gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>),
timer:sleep(100),
ok = info_action(list_connections,
rabbit_networking:connection_info_keys(), false),
@@ -1160,10 +1171,34 @@ test_server_status() ->
passed.
+test_amqp_connection_refusal() ->
+ [passed = test_amqp_connection_refusal(V) ||
+ V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]],
+ passed.
+
+test_amqp_connection_refusal(Header) ->
+ {H, P} = find_listener(),
+ {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]),
+ ok = gen_tcp:send(C, Header),
+ {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100),
+ ok = gen_tcp:close(C),
+ passed.
+
+find_listener() ->
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+ {H, P}.
+
+test_writer() -> test_writer(none).
+
test_writer(Pid) ->
receive
- shutdown -> ok;
- {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ {'$gen_call', From, flush} -> gen_server:reply(From, ok),
+ test_writer(Pid);
+ {send_command, Method} -> Pid ! Method,
+ test_writer(Pid);
+ shutdown -> ok
end.
test_spawn() ->
@@ -2227,10 +2262,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
- variable_queue_publish(IsPersistent, Count, PropFun,
+ variable_queue_publish(IsPersistent, 1, Count, PropFun,
fun (_N) -> <<>> end, VQ).
-variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
+variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2242,7 +2277,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
end},
PayloadFun(N)),
PropFun(N, #message_properties{}), false, self(), VQN)
- end, VQ, lists:seq(1, Count)).
+ end, VQ, lists:seq(Start, Start + Count - 1)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -2322,59 +2357,120 @@ test_variable_queue() ->
fun test_dropwhile_varying_ram_duration/1,
fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
passed.
test_variable_queue_fold(VQ0) ->
- Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(
- true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
- lists:foldl(
- fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
- VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
-
-test_variable_queue_fold(Cut, Count, VQ0) ->
+ {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Count = rabbit_variable_queue:depth(VQ1),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
- fun (M, _, A) ->
- case msg2int(M) =< Cut of
- true -> {cont, [M | A]};
+ fun (M, _, Pending, A) ->
+ MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
+ case MInt =< Cut of
+ true -> {cont, [MInt | A]};
false -> {stop, A}
end
end, [], VQ0),
- true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
- [msg2int(M) || M <- Acc],
+ Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs),
+ Expected = lists:reverse(Acc), %% assertion
VQ1.
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
binary_to_term(list_to_binary(lists:reverse(P))).
-test_variable_queue_requeue(VQ0) ->
- Interval = 50,
- Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+ack_subset(AckSeqs, Interval, Rem) ->
+ lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
+
+requeue_one_by_one(Acks, VQ) ->
+ lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], VQN),
+ VQM
+ end, VQ, Acks).
+
+%% Create a vq with messages in q1, delta, and q3, and holes (in the
+%% form of pending acks) in the latter two.
+variable_queue_with_holes(VQ0) ->
+ Interval = 64,
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
- Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
- [Ack | Acc];
- (_, Acc) ->
- Acc
- end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
- VQ5 = lists:foldl(fun (AckTag, VQN) ->
- {_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], VQN),
- VQM
- end, VQ4, Subset),
- VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag}, VQb} =
+ VQ2 = variable_queue_publish(
+ false, 1, Count,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
+ Acks = lists:reverse(AcksR),
+ AckSeqs = lists:zip(Acks, Seq),
+ [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] =
+ [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]],
+ %% we requeue in three phases in order to exercise requeuing logic
+ %% in various vq states
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(
+ Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
+ VQ5 = requeue_one_by_one(Subset1, VQ4),
+ %% by now we have some messages (and holes) in delt
+ VQ6 = requeue_one_by_one(Subset2, VQ5),
+ VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ %% add the q1 tail
+ VQ8 = variable_queue_publish(
+ true, Count + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
+ %% assertions
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ lists:member(K, [q1, delta, q3])],
+ Depth = Count + 64,
+ Depth = rabbit_variable_queue:depth(VQ8),
+ Len = Depth - length(Subset3),
+ Len = rabbit_variable_queue:len(VQ8),
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
+
+test_variable_queue_requeue(VQ0) ->
+ {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Msgs =
+ lists:zip(RequeuedMsgs,
+ lists:duplicate(length(RequeuedMsgs), true)) ++
+ lists:zip(FreshMsgs,
+ lists:duplicate(length(FreshMsgs), false)),
+ VQ2 = lists:foldl(fun ({I, Requeued}, VQa) ->
+ {{M, MRequeued, _}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
+ Requeued = MRequeued, %% assertion
+ I = msg2int(M), %% assertion
VQb
- end, VQ5, lists:reverse(Acks)),
- {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
- VQ7.
+ end, VQ1, Msgs),
+ {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
+ VQ3.
+
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
@@ -2426,7 +2522,7 @@ test_dropfetchwhile(VQ0) ->
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
- false, Count,
+ false, 1, Count,
fun (N, Props) -> Props#message_properties{expiry = N} end,
fun erlang:term_to_binary/1, VQ0),
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index 72c07b51..c52394c7 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 601656da..432055d4 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 5bc3d9f5..c6007061 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 455134da..fde0dbe1 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 21fdcd66..457b1567 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 90ee3439..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,12 +11,12 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -519,6 +515,8 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
@@ -527,7 +525,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
@@ -538,12 +535,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
end,
PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 })).
+ a(reduce_memory_use(
+ inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ unconfirmed = UC1 }))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
@@ -596,7 +593,7 @@ fetchwhile(Pred, Fun, Acc, State) ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
- true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ true -> {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(true, MsgStatus, State2),
fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
@@ -610,7 +607,7 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(AckRequired, MsgStatus, State2),
{{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
end.
@@ -672,30 +669,17 @@ ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
lists:foldl(fun(SeqId, {Acc0, State0}) ->
MsgStatus = lookup_pending_ack(SeqId, State0),
- {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
{MsgFun(Msg, SeqId, Acc0), State1}
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, #vqstate { q1 = Q1,
- q2 = Q2,
- delta = #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd },
- q3 = Q3,
- q4 = Q4 } = State) ->
- QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, false, State0),
- {StopGo, AccNext} =
- Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
- {StopGo, {AccNext, State1}}
- end,
- {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
- {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
- {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
- DeltaSeqId, DeltaSeqIdEnd, State2),
- {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
- {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
- {Acc5, State5}.
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -784,24 +768,18 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case TargetRamCount of
- infinity -> false;
- _ -> case
- reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -842,7 +820,8 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
-invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
+invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
@@ -894,15 +873,28 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message { id = MsgId }, MsgProps) ->
- #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = false, index_on_disk = false,
- msg_props = MsgProps }.
+ Msg = #basic_message {id = MsgId}, MsgProps) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = false,
+ index_on_disk = false,
+ msg_props = MsgProps}.
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps}.
trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
@@ -969,7 +961,7 @@ maybe_write_delivered(true, SeqId, IndexState) ->
betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
- fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
+ fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
{Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
@@ -977,21 +969,10 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
[SeqId | Acks1]};
false -> case (gb_trees:is_defined(SeqId, RPA) orelse
gb_trees:is_defined(SeqId, DPA)) of
- false ->
- {?QUEUE:in_r(
- m(#msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }), Filtered1),
- Delivers1, Acks1};
- true ->
- Acc
+ false -> {?QUEUE:in_r(m(beta_msg_status(M)),
+ Filtered1),
+ Delivers1, Acks1};
+ true -> Acc
end
end
end, {?QUEUE:new(), [], []}, List),
@@ -1019,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1045,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1078,9 +1057,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, true, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }
+ read_msg(MsgStatus, State),
+ inc_ram_msg_count(
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1096,18 +1076,20 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(#msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
- CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
+read_msg(#msg_status{msg = Msg}, State) ->
+ {Msg, State}.
+
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
- {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
- msg_store_clients = MSCState1 }};
-read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
- {Msg, State}.
+ {Msg, State #vqstate {msg_store_clients = MSCState1}}.
+
+inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
+ State#vqstate{ram_msg_count = RamMsgCount + 1}.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
@@ -1122,7 +1104,7 @@ remove(AckRequired, MsgStatus = #msg_status {
index_state = IndexState,
msg_store_clients = MSCState,
len = Len,
- persistent_count = PCount }) ->
+ persistent_count = PCount}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1151,11 +1133,11 @@ remove(AckRequired, MsgStatus = #msg_status {
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1}}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1343,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
@@ -1386,14 +1353,14 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue and fold
+%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, true, State),
- {MsgStatus#msg_status { msg = Msg }, State1};
-publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
- {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, inc_ram_msg_count(State)}.
publish_beta(MsgStatus, State) ->
{#msg_status { msg = Msg} = MsgStatus1,
@@ -1456,40 +1423,81 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
-qfoldl( Fun, {cont, Acc} = A, Q) ->
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
- {empty, _Q} -> A;
- {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
end.
-lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
-lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
-lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
-
-delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
- {stop, {Acc, State}};
-delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {cont, {Acc, State}};
-delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { index_state = IndexState,
- msg_store_clients = MSCState } = State) ->
- DeltaSeqId1 = lists:min(
- [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
- {Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}}
- end, {cont, {Acc, MSCState}}, List),
- delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }).
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
%%----------------------------------------------------------------------------
%% Phase changes
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 1cc7d6c8..f81a4021 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 0bb18f4c..d0f39221 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
@@ -95,9 +95,9 @@ internal_delete(VHostPath) ->
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
[ok = rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info),
- proplists:get_value(key, Info))
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index db674f91..b3e9ec66 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_vm).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index a7ea3d99..2d15e6a2 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
@@ -21,7 +21,8 @@
-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
- send_command_and_notify/4, send_command_and_notify/5]).
+ send_command_and_notify/4, send_command_and_notify/5,
+ flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
%% internal
@@ -69,6 +70,7 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:protocol())
@@ -130,7 +132,7 @@ mainloop1(State) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(flush(State))
+ ?MODULE:mainloop1(internal_flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) ->
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- State1 = flush(internal_send_command_async(MethodRecord, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, Content, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({'$gen_call', From, flush}, State) ->
+ State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
@@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
+flush(W) -> call(W, flush).
+
%%---------------------------------------------------------------------------
call(Pid, Msg) ->
@@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content,
maybe_flush(State = #wstate{pending = Pending}) ->
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
- true -> flush(State);
+ true -> internal_flush(State);
false -> State
end.
-flush(State = #wstate{pending = []}) ->
+internal_flush(State = #wstate{pending = []}) ->
State;
-flush(State = #wstate{sock = Sock, pending = Pending}) ->
+internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 5af38573..c98b528d 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -51,7 +51,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2012 VMware, Inc.
+%% All modifications are (C) 2010-2013 VMware, Inc.
%%
%% %CopyrightBegin%
%%
diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl
index e42ded7b..f19a53e6 100644
--- a/src/supervisor2_tests.erl
+++ b/src/supervisor2_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%
-module(supervisor2_tests).
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 344196d7..0248f878 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index d8844441..61c747c9 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index fb01c792..90e84f94 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 9ee921b4..7f850dbc 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 7f4b5049..3342adb5 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 5ce894a9..f70156b6 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index c9ecccd6..3bdeb377 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index ff356366..b9835f1e 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 1ddcebb2..56e4b7b3 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).