diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-02-21 13:44:59 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-02-21 13:44:59 +0000 |
commit | 97dada17a2770d5af602755a38bd6fbb812b8934 (patch) | |
tree | 8f562d881b598b74b9b55d71fbbd1ab6c7722489 | |
parent | ecd7fb6e097f72060fcae8c49f5903fd8e569676 (diff) | |
parent | d1562e9de47255303213793205f648c64aa542d1 (diff) | |
download | rabbitmq-server-97dada17a2770d5af602755a38bd6fbb812b8934.tar.gz |
Merged default into bug19375
146 files changed, 1208 insertions, 957 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ index d50e32ef..4cdf783b 100644 --- a/LICENSE-MPL-RabbitMQ +++ b/LICENSE-MPL-RabbitMQ @@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2013 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should @@ -162,7 +162,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) else \ dialyzer --output_plt $@ --build_plt \ --apps erts kernel stdlib compiler sasl os_mon mnesia tools \ - public_key crypto ssl; \ + public_key crypto ssl xmerl; \ fi clean: @@ -15,7 +15,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% main(["-h"]) -> @@ -50,6 +50,7 @@ shutdown(Rc, LibDir) -> check(Cwd, PluginsDir, LibDir, Checks) -> {ok, Plugins} = file:list_dir(PluginsDir), ok = file:make_dir(LibDir), + put({?MODULE, third_party}, []), [begin Source = filename:join(PluginsDir, Plugin), Target = filename:join(LibDir, Plugin), @@ -162,7 +163,8 @@ filters() -> filter_chain(FnChain) -> fun(AnalysisResult) -> - lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult)); + Result = cleanup(AnalysisResult), + lists:foldl(fun(F, false) -> F(Result); (_F, true) -> true end, false, FnChain) end. @@ -267,14 +269,8 @@ source_file(M) -> store_third_party(App) -> {ok, AppConfig} = application:get_all_key(App), - case get({?MODULE, third_party}) of - undefined -> - put({?MODULE, third_party}, - proplists:get_value(modules, AppConfig)); - Modules -> - put({?MODULE, third_party}, - proplists:get_value(modules, AppConfig) ++ Modules) - end. + AppModules = proplists:get_value(modules, AppConfig), + put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})). %% TODO: this ought not to be maintained in such a fashion external_dependency(Path) -> @@ -11,7 +11,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## from __future__ import nested_scopes @@ -106,7 +106,7 @@ def printFileHeader(): %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %%""" def genErl(spec): diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c7069aed..bbd2fe5b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -465,8 +465,7 @@ synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be - mirrored, and must not have any pending unacknowledged - messages for this command to succeed. + mirrored for this command to succeed. </para> <para> Note that unsynchronised queues from which messages are diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 16dfd196..ad961a44 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -10,7 +10,7 @@ rabbit_sup, rabbit_tcp_client_sup, rabbit_direct_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, @@ -27,7 +27,7 @@ {frame_max, 131072}, {heartbeat, 600}, {msg_store_file_size_limit, 16777216}, - {queue_index_max_journal_entries, 262144}, + {queue_index_max_journal_entries, 65536}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index a317e63b..b3dd6615 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -ifdef(use_specs). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 7385b4b3..eeee799e 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -record(user, {username, @@ -86,9 +86,8 @@ %%---------------------------------------------------------------------------- --define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc."). +-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2013 VMware, Inc."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). --define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). %% EMPTY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index f7c10bd8..8665dc55 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -include("rabbit.hrl"). diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail index b9c2629b..431ddeb0 100644 --- a/packaging/common/LICENSE.tail +++ b/packaging/common/LICENSE.tail @@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2012 VMware, Inc. All rights reserved. + Copyright (c) 2007-2013 VMware, Inc. All rights reserved. MOZILLA PUBLIC LICENSE @@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License. The Original Code is RabbitMQ. The Initial Developer of the Original Code is VMware, Inc. - Copyright (c) 2007-2012 VMware, Inc. All rights reserved.'' + Copyright (c) 2007-2013 VMware, Inc. All rights reserved.'' [NOTE: The text of this Exhibit A may differ slightly from the text of the notices in the Source Code files of the Original Code. You should diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index e832aed6..b9c6ffbf 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## # Escape spaces and quotes, because shell is revolting. diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index 14557286..ba9579b6 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## ## diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 1e4bf755..c197915d 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -28,7 +28,7 @@ package: clean chmod a+x $(UNPACKED_DIR)/debian/rules echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright - echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright + echo "\n\nThe Debian packaging is (C) 2007-2013, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) rm -rf $(UNPACKED_DIR) diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in index f5257040..b351430e 100644 --- a/packaging/windows-exe/rabbitmq_nsi.in +++ b/packaging/windows-exe/rabbitmq_nsi.in @@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" "" VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc" ;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ? -VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved." +VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2013 VMware, Inc. All rights reserved." VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server" VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%" diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults index 4763f086..db1d4f2b 100644 --- a/scripts/rabbitmq-defaults +++ b/scripts/rabbitmq-defaults @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2012 VMware, Inc. All rights reserved. +## Copyright (c) 2012-2013 VMware, Inc. All rights reserved. ## ### next line potentially updated in package install steps diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 23224943..3721f6c7 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## # Determine where this script is really located (if this script is diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 97c74791..43f450c0 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## # Get default settings with user overrides for (RABBITMQ_)<var_name> diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 341f871a..4b4dbe47 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
@@ -23,8 +23,12 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index e1686627..184ae931 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## # Get default settings with user overrides for (RABBITMQ_)<var_name> diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 3aea4c07..9fa304e6 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 4758c861..9c30e74e 100755 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index a5fade72..00fffa9f 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -12,7 +12,7 @@ ## The Original Code is RabbitMQ. ## ## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +## Copyright (c) 2007-2013 VMware, Inc. All rights reserved. ## # Get default settings with user overrides for (RABBITMQ_)<var_name> diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index d8b1eaf1..a6d85552 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -12,7 +12,7 @@ REM REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl index fdf6ed41..8da436c0 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(app_utils). diff --git a/src/background_gc.erl b/src/background_gc.erl index 3dbce330..d684d6ea 100644 --- a/src/background_gc.erl +++ b/src/background_gc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(background_gc). diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 102c353f..106179fd 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(credit_flow). diff --git a/src/delegate.erl b/src/delegate.erl index 96b8ba31..e833b819 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(delegate). diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 2a8b915b..30400b3e 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(delegate_sup). diff --git a/src/dtree.erl b/src/dtree.erl index ca2d30cf..45eea506 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% %% A dual-index tree. diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 3260d369..d2d4d295 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(file_handle_cache). diff --git a/src/gatherer.erl b/src/gatherer.erl index 29d2d713..0c257a84 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(gatherer). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index dc55948b..c82327a2 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -73,7 +73,7 @@ %% but where the second argument is specifically the priority_queue %% which contains the prioritised message_queue. -%% All modifications are (C) 2009-2012 VMware, Inc. +%% All modifications are (C) 2009-2013 VMware, Inc. %% ``The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -863,7 +863,7 @@ dispatch(Info, Mod, State) -> common_reply(_Name, From, Reply, _NState, [] = _Debug) -> reply(From, Reply), []; -common_reply(Name, {To, Tag} = From, Reply, NState, Debug) -> +common_reply(Name, {To, _Tag} = From, Reply, NState, Debug) -> reply(From, Reply), sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}). @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(gm). diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 5fbfc223..1034ee2f 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(gm_soak_test). diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index 84d4ab2f..3fe3b182 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(gm_speed_test). diff --git a/src/gm_tests.erl b/src/gm_tests.erl index a9c0ba90..efb87a4c 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(gm_tests). diff --git a/src/lqueue.erl b/src/lqueue.erl index c4e046b5..e2ab2380 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(lqueue). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 24c3ebd0..33d09f7f 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor). diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index f8cbd853..ea6b82c8 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(mirrored_supervisor_tests). diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl index a3773d90..41a349be 100644 --- a/src/mnesia_sync.erl +++ b/src/mnesia_sync.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(mnesia_sync). diff --git a/src/pg_local.erl b/src/pg_local.erl index e2e82f1f..7377fbf0 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -13,7 +13,7 @@ %% versions of Erlang/OTP. The remaining type specs have been %% removed. -%% All modifications are (C) 2010-2012 VMware, Inc. +%% All modifications are (C) 2010-2013 VMware, Inc. %% %CopyrightBegin% %% diff --git a/src/pmon.erl b/src/pmon.erl index 37898119..ed32b8b2 100644 --- a/src/pmon.erl +++ b/src/pmon.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(pmon). diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 780fa2e9..02a0a1df 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% %% Priority queues have essentially the same interface as ordinary diff --git a/src/rabbit.erl b/src/rabbit.erl index 7b8348fc..f3ba022a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit). @@ -258,16 +258,28 @@ %%---------------------------------------------------------------------------- +%% HiPE compilation happens before we have log handlers - so we have +%% to io:format/2, it's all we can do. + maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), Can = code:which(hipe) =/= non_existing, case {Want, Can} of - {true, true} -> hipe_compile(); - {true, false} -> io:format("Not HiPE compiling: HiPE not found in " - "this Erlang installation.~n"); - {false, _} -> ok + {true, true} -> hipe_compile(), + true; + {true, false} -> false; + {false, _} -> true end. +warn_if_hipe_compilation_failed(true) -> + ok; +warn_if_hipe_compilation_failed(false) -> + error_logger:warning_msg( + "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). + +%% HiPE compilation happens before we have log handlers and can take a +%% long time, so make an exception to our no-stdout policy and display +%% progress via stdout. hipe_compile() -> Count = length(?HIPE_WORTHY), io:format("~nHiPE compiling: |~s|~n |", @@ -311,14 +323,15 @@ start() -> rabbit_mnesia:check_cluster_consistency(), ok = app_utils:start_applications( app_startup_order(), fun handle_app_error/2), - ok = print_plugin_info(rabbit_plugins:active()) + ok = log_broker_started(rabbit_plugins:active()) end). boot() -> start_it(fun() -> ok = ensure_application_loaded(), - maybe_hipe_compile(), + Success = maybe_hipe_compile(), ok = ensure_working_log_handlers(), + warn_if_hipe_compilation_failed(Success), rabbit_node_monitor:prepare_cluster_status_files(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), %% It's important that the consistency check happens after @@ -332,7 +345,7 @@ boot() -> false), ok = app_utils:start_applications( StartupApps, fun handle_app_error/2), - ok = print_plugin_info(Plugins) + ok = log_broker_started(Plugins) end). handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> @@ -342,6 +355,8 @@ handle_app_error(App, Reason) -> throw({could_not_start, App, Reason}). start_it(StartFun) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_boot, Marker), try StartFun() catch @@ -350,11 +365,17 @@ start_it(StartFun) -> _:Reason -> boot_error(Reason, erlang:get_stacktrace()) after + unlink(Marker), + Marker ! stop, %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> + case whereis(rabbit_boot) of + undefined -> ok; + _ -> await_startup() + end, rabbit_log:info("Stopping RabbitMQ~n"), ok = app_utils:stop_applications(app_shutdown_order()). @@ -422,13 +443,14 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, Vsn} = application:get_key(rabbit, vsn), - error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n", - [Vsn, erlang:system_info(otp_release)]), + error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", + [Vsn, erlang:system_info(otp_release), + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), + log_banner(), [ok = run_boot_step(Step) || Step <- boot_steps()], - io:format("~nbroker running~n"), {ok, SupPid}; Error -> Error @@ -457,22 +479,16 @@ app_shutdown_order() -> %%--------------------------------------------------------------------------- %% boot step logic -run_boot_step({StepName, Attributes}) -> - Description = case lists:keysearch(description, 1, Attributes) of - {value, {_, D}} -> D; - false -> StepName - end, +run_boot_step({_StepName, Attributes}) -> case [MFA || {mfa, MFA} <- Attributes] of [] -> - io:format("-- ~s~n", [Description]); + ok; MFAs -> - io:format("starting ~-60s ...", [Description]), [try apply(M,F,A) catch _:Reason -> boot_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], - io:format("done~n"), ok end. @@ -533,6 +549,9 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +-ifdef(use_specs). +-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()). +-endif. boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> AllNodes = rabbit_mnesia:cluster_nodes(all), {Err, Nodes} = @@ -552,13 +571,15 @@ boot_error(Reason, Stacktrace) -> Args = [Reason, log_location(kernel), log_location(sasl)], boot_error(Reason, Fmt, Args, Stacktrace). +-ifdef(use_specs). +-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()]) + -> no_return()). +-endif. +boot_error(Reason, Fmt, Args, not_available) -> + basic_boot_error(Reason, Fmt, Args); boot_error(Reason, Fmt, Args, Stacktrace) -> - case Stacktrace of - not_available -> basic_boot_error(Reason, Fmt, Args); - _ -> basic_boot_error(Reason, Fmt ++ - "Stack trace:~n ~p~n~n", - Args ++ [Stacktrace]) - end. + basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n", + Args ++ [Stacktrace]). basic_boot_error(Reason, Format, Args) -> io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), @@ -684,24 +705,17 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- %% misc -print_plugin_info([]) -> - ok; -print_plugin_info(Plugins) -> - %% This gets invoked by rabbitmqctl start_app, outside the context - %% of the rabbit application +log_broker_started(Plugins) -> rabbit_misc:with_local_io( fun() -> - io:format("~n-- plugins running~n"), - [print_plugin_info( - AppName, element(2, application:get_key(AppName, vsn))) - || AppName <- Plugins], - ok + PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) + || P <- Plugins]), + error_logger:info_msg( + "Server startup complete; ~b plugins started.~n~s", + [length(Plugins), PluginList]), + io:format(" completed with ~p plugins.~n", [length(Plugins)]) end). -print_plugin_info(Plugin, Vsn) -> - Len = 76 - length(Vsn), - io:format("~-" ++ integer_to_list(Len) ++ "s ~s~n", [Plugin, Vsn]). - erts_version_check() -> FoundVer = erlang:system_info(version), case rabbit_misc:version_compare(?ERTS_MINIMUM, FoundVer, lte) of @@ -713,49 +727,39 @@ erts_version_check() -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - ProductLen = string:len(Product), - io:format("~n" - "+---+ +---+~n" - "| | | |~n" - "| | | |~n" - "| | | |~n" - "| +---+ +-------+~n" - "| |~n" - "| ~s +---+ |~n" - "| | | |~n" - "| ~s +---+ |~n" - "| |~n" - "+-------------------+~n" - "~s~n~s~n~s~n~n", - [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION, - ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + io:format("~n ~s ~s. ~s" + "~n ## ## ~s" + "~n ## ##" + "~n ########## Logs: ~s" + "~n ###### ## ~s" + "~n ##########" + "~n Starting broker...", + [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, + log_location(kernel), log_location(sasl)]). + +log_banner() -> Settings = [{"node", node()}, - {"app descriptor", app_location()}, {"home dir", home_dir()}, {"config file(s)", config_files()}, {"cookie hash", rabbit_nodes:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}, - {"erlang version", erlang:system_info(version)}], + {"database dir", rabbit_mnesia:dir()}], DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), Format = fun (K, V) -> - io:format("~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", - [K, V]) + rabbit_misc:format( + "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V]) end, - lists:foreach(fun ({"config file(s)" = K, []}) -> - Format(K, "(none)"); - ({"config file(s)" = K, [V0 | Vs]}) -> - Format(K, V0), [Format("", V) || V <- Vs]; - ({K, V}) -> - Format(K, V) - end, Settings), - io:nl(). - -app_location() -> - {ok, Application} = application:get_application(), - filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + Banner = iolist_to_binary( + [case S of + {"config file(s)" = K, []} -> + Format(K, "(none)"); + {"config file(s)" = K, [V0 | Vs]} -> + Format(K, V0), [Format("", V) || V <- Vs]; + {K, V} -> + Format(K, V) + end || S <- Settings]), + error_logger:info_msg("~s", [Banner]). home_dir() -> case init:get_argument(home) of diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 75c53511..16387268 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_access_control). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index d7d4d82a..362b11aa 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_alarm). @@ -67,9 +67,8 @@ start() -> stop() -> ok. -register(Pid, HighMemMFA) -> - gen_event:call(?SERVER, ?MODULE, {register, Pid, HighMemMFA}, - infinity). +register(Pid, AlertMFA) -> + gen_event:call(?SERVER, ?MODULE, {register, Pid, AlertMFA}, infinity). set_alarm(Alarm) -> gen_event:notify(?SERVER, {set_alarm, Alarm}). clear_alarm(Alarm) -> gen_event:notify(?SERVER, {clear_alarm, Alarm}). @@ -94,9 +93,9 @@ init([]) -> alarmed_nodes = dict:new(), alarms = []}}. -handle_call({register, Pid, HighMemMFA}, State) -> +handle_call({register, Pid, AlertMFA}, State) -> {ok, 0 < dict:size(State#alarms.alarmed_nodes), - internal_register(Pid, HighMemMFA, State)}; + internal_register(Pid, AlertMFA, State)}; handle_call(get_alarms, State = #alarms{alarms = Alarms}) -> {ok, Alarms, State}; @@ -121,8 +120,8 @@ handle_event({node_up, Node}, State) -> handle_event({node_down, Node}, State) -> {ok, maybe_alert(fun dict_unappend_all/3, Node, [], State)}; -handle_event({register, Pid, HighMemMFA}, State) -> - {ok, internal_register(Pid, HighMemMFA, State)}; +handle_event({register, Pid, AlertMFA}, State) -> + {ok, internal_register(Pid, AlertMFA, State)}; handle_event(_Event, State) -> {ok, State}. @@ -198,14 +197,14 @@ alert(Alertees, Source, Alert, NodeComparator) -> end end, ok, Alertees). -internal_register(Pid, {M, F, A} = HighMemMFA, +internal_register(Pid, {M, F, A} = AlertMFA, State = #alarms{alertees = Alertees}) -> _MRef = erlang:monitor(process, Pid), case dict:find(node(), State#alarms.alarmed_nodes) of {ok, Sources} -> [apply(M, F, A ++ [Pid, R, true]) || R <- Sources]; error -> ok end, - NewAlertees = dict:store(Pid, HighMemMFA, Alertees), + NewAlertees = dict:store(Pid, AlertMFA, Alertees), State#alarms{alertees = NewAlertees}. handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6a31b24d..82ac74fa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -11,13 +11,13 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, - delete_immediately/1, delete/3, purge/1]). + delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -135,6 +135,7 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). +-spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> @@ -174,8 +175,7 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). --spec(sync_mirrors/1 :: (pid()) -> - 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. @@ -600,6 +600,24 @@ internal_delete(QueueName) -> end end). +forget_all_durable(Node) -> + %% Note rabbit is not running so we avoid e.g. the worker pool. Also why + %% we don't invoke the return from rabbit_binding:process_deletions/1. + {atomic, ok} = + mnesia:sync_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_'}, write), + [rabbit_binding:process_deletions( + internal_delete1(Name)) || + #amqqueue{name = Name, pid = Pid} = Q <- Qs, + node(Pid) =:= Node, + rabbit_policy:get(<<"ha-mode">>, Q) + =:= {error, not_found}], + ok + end), + ok. + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 589e8289..72d6ab43 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_process). @@ -95,7 +95,6 @@ messages_unacknowledged, messages, consumers, - active_consumers, memory, slave_pids, synchronised_slave_pids, @@ -261,7 +260,11 @@ init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = - stop_sync_timer(stop_rate_timer(State)), + lists:foldl(fun (F, S) -> F(S) end, State, + [fun stop_sync_timer/1, + fun stop_rate_timer/1, + fun stop_expiry_timer/1, + fun stop_ttl_timer/1]), case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), @@ -296,36 +299,18 @@ backing_queue_module(Q) -> true -> rabbit_mirror_queue_master end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), - State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> - State. + rabbit_misc:ensure_timer(State, #q.sync_timer_ref, + ?SYNC_INTERVAL, sync_timeout). -stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> - State; -stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - erlang:cancel_timer(TRef), - State#q{sync_timer_ref = undefined}. - -ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - TRef = erlang:send_after( - ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), - State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State) -> - State. +stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref). -stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> - State; -stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - erlang:cancel_timer(TRef), - State#q{rate_timer_ref = undefined}. +ensure_rate_timer(State) -> + rabbit_misc:ensure_timer(State, #q.rate_timer_ref, + ?RAM_DURATION_UPDATE_INTERVAL, + update_ram_duration). -stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> - State; -stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> - erlang:cancel_timer(TRef), - State#q{expiry_timer_ref = undefined}. +stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref). %% We wish to expire only when there are no consumers *and* the expiry %% hasn't been refreshed (by queue.declare or basic.get) for the @@ -335,17 +320,41 @@ ensure_expiry_timer(State = #q{expires = undefined}) -> ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of true -> NewState = stop_expiry_timer(State), - TRef = erlang:send_after(Expires, self(), maybe_expire), - NewState#q{expiry_timer_ref = TRef}; + rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref, + Expires, maybe_expire); false -> State end. +stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref). + +ensure_ttl_timer(undefined, State) -> + State; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> + After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), + State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; +ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, + ttl_timer_expiry = TExpiry}) + when Expiry + 1000 < TExpiry -> + case erlang:cancel_timer(TRef) of + false -> State; + _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) + end; +ensure_ttl_timer(_Expiry, State) -> + State. + +stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). + ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(#q{active_consumers = AC, - backing_queue = BQ, backing_queue_state = BQS}) -> - true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). +assert_invariant(State = #q{active_consumers = AC}) -> + true = (queue:is_empty(AC) orelse is_empty(State)). + +is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -467,9 +476,7 @@ deliver_msg_to_consumer(DeliverFun, deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State1), - {Result, BQ:is_empty(BQS), State2}. + {Result, is_empty(State1), State1}. confirm_messages([], State) -> State; @@ -517,12 +524,10 @@ discard(#delivery{sender = SenderPid, State1#q{backing_queue_state = BQS1}. run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + is_empty(State), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -558,9 +563,21 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_drop_head(State2), + IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State3#q{backing_queue_state = BQS1}) + State4 = State3#q{backing_queue_state = BQS1}, + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that IFF the new message ends up at the head + %% of the queue (because the queue was empty) and has an + %% expiry. Only then may it need expiring straight away, + %% or, if expiry is not due yet, the expiry timer may need + %% (re)scheduling. + case {IsEmpty, Props#message_properties.expiry} of + {false, _} -> State4; + {true, undefined} -> State4; + {true, _} -> drop_expired_msgs(State4) + end end. maybe_drop_head(State = #q{max_length = undefined}) -> @@ -582,12 +599,12 @@ maybe_drop_head(State = #q{max_length = MaxLen, requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -678,13 +695,8 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -consumer_count() -> consumer_count(fun (_) -> false end). - -active_consumer_count() -> consumer_count(fun is_ch_blocked/1). - -consumer_count(Exclude) -> - lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), - not Exclude(C)]). +consumer_count() -> + lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). is_unused(_State) -> consumer_count() == 0. @@ -731,9 +743,14 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_msgs(State = #q{backing_queue_state = BQS, - backing_queue = BQ }) -> - Now = now_micros(), +drop_expired_msgs(State) -> + case is_empty(State) of + true -> State; + false -> drop_expired_msgs(now_micros(), State) + end. + +drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, + backing_queue = BQ }) -> ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = with_dlx( @@ -742,8 +759,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS, fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), {Next, State#q{backing_queue_state = BQS1}} end), ensure_ttl_timer(case Props of - undefined -> undefined; - #message_properties{expiry = Exp} -> Exp + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp end, State1). with_dlx(undefined, _With, Without) -> Without(); @@ -767,7 +784,7 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> State1. dead_letter_maxlen_msgs(X, State = #q{backing_queue = BQ}) -> - {ok State1} = + {ok, State1} = dead_letter_msgs( fun (DLFun, Acc, BQS1) -> {{Msg, _, AckTag}, BQS2} = BQ:fetch(true, BQS1), @@ -798,25 +815,6 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, queue_monitors = QMons1, backing_queue_state = BQS2}}. -ensure_ttl_timer(undefined, State) -> - State; -ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> - After = (case Expiry - now_micros() of - V when V > 0 -> V + 999; %% always fire later - _ -> 0 - end) div 1000, - TRef = erlang:send_after(After, self(), drop_expired), - State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; -ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, - ttl_timer_expiry = TExpiry}) - when Expiry + 1000 < TExpiry -> - case erlang:cancel_timer(TRef) of - false -> State; - _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) - end; -ensure_ttl_timer(_Expiry, State) -> - State. - dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), @@ -963,8 +961,6 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); -i(active_consumers, _) -> - active_consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1108,7 +1104,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> @@ -1132,7 +1128,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply({error, exclusive_consume_unavailable}, State); ok -> C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1141,18 +1137,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - E = {ChPid, Consumer}, - State2 = - case is_ch_blocked(C1) of - true -> block_consumer(C1, E), - State1; - false -> update_ch_record(C1), - AC1 = queue:in(E, State1#q.active_consumers), - run_message_queue(State1#q{active_consumers = AC1}) - end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State2)), - reply(ok, State2) + not NoAck, qname(State1)), + AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, @@ -1181,8 +1169,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), - reply({ok, BQ:len(BQS), active_consumer_count()}, State1); + ensure_expiry_timer(State), + reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1204,7 +1192,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, _From, - State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + State = #q{backing_queue = rabbit_mirror_queue_master, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, HandleInfo = fun (Status) -> @@ -1220,13 +1208,9 @@ handle_call(sync_mirrors, _From, State, #q.stats_timer, fun() -> emit_stats(State#q{status = Status}) end) end, - case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors( - HandleInfo, EmitStats, BQS) of - {ok, BQS1} -> reply(ok, S(BQS1)); - {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} - end; - _ -> reply({error, pending_acks}, State) + case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; handle_call(sync_mirrors, _From, State) -> @@ -1264,8 +1248,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - noreply(run_message_queue( - State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); + noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl index a4305e5f..d7257a69 100644 --- a/src/rabbit_amqqueue_sup.erl +++ b/src/rabbit_amqqueue_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_amqqueue_sup). diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index c9475efd..72f81707 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend). diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 919be3f3..2dc1cad3 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_backend_internal). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index c7d74dc3..99e4468e 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism). diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index c0d86cd1..847a38f5 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_amqplain). @@ -33,8 +33,7 @@ %% referring generically to "SASL security mechanism", i.e. the above. description() -> - [{name, <<"AMQPLAIN">>}, - {description, <<"QPid AMQPLAIN mechanism">>}]. + [{description, <<"QPid AMQPLAIN mechanism">>}]. should_offer(_Sock) -> true. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index 5df1d5d7..4b08e4be 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_cr_demo). @@ -37,8 +37,7 @@ %% SECURE-OK: "My password is ~s", [Password] description() -> - [{name, <<"RABBIT-CR-DEMO">>}, - {description, <<"RabbitMQ Demo challenge-response authentication " + [{description, <<"RabbitMQ Demo challenge-response authentication " "mechanism">>}]. should_offer(_Sock) -> diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 423170e1..a35a133a 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_auth_mechanism_plain). @@ -36,8 +36,7 @@ %% matching and will thus be much faster. description() -> - [{name, <<"PLAIN">>}, - {description, <<"SASL PLAIN authentication mechanism">>}]. + [{description, <<"SASL PLAIN authentication mechanism">>}]. should_offer(_Sock) -> true. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 99b5946e..2f247448 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -11,15 +11,13 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue). -ifdef(use_specs). --export_type([async_callback/0]). - %% We can't specify a per-queue ack/state with callback signatures -type(ack() :: any()). -type(state() :: any()). @@ -71,10 +69,14 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). -%% Remove all messages in the queue, but not messages which have been -%% fetched and are pending acks. +%% Remove all 'fetchable' messages from the queue, i.e. all messages +%% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + %% Publish a message. -callback publish(rabbit_types:basic_message(), rabbit_types:message_properties(), boolean(), pid(), @@ -164,7 +166,7 @@ %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), rabbit_types:message_properties(), - A) -> {('stop' | 'cont'), A}), + boolean(), A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? @@ -226,7 +228,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 5b3b8aa8..052db3a5 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(rabbit_backing_queue_qc). @@ -334,7 +334,7 @@ postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) -> {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) -> {stop, Acc}; ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) -> - FoldFun(Msg, MsgProps, Acc) + FoldFun(Msg, MsgProps, false, Acc) end, {cont, Acc0}, gb_trees:to_list(Messages)), true = Model =:= Res; @@ -397,10 +397,11 @@ rand_choice(List, Selection, N) -> N - 1). makefoldfun(Size) -> - fun (Msg, _MsgProps, Acc) -> - case length(Acc) > Size of - false -> {cont, [Msg | Acc]}; - true -> {stop, Acc} + fun (Msg, _MsgProps, Unacked, Acc) -> + case {length(Acc) > Size, Unacked} of + {false, false} -> {cont, [Msg | Acc]}; + {false, true} -> {cont, Acc}; + {true, _} -> {stop, Acc} end end. foldacc() -> []. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9bd1fad9..c42289c7 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_basic). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index a333c1ce..05040485 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_binary_generator). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 53878d6a..9407dd2e 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_binary_parser). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 2d486651..6096e07b 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_binding). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e3dfc5..0510afa9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_channel). @@ -262,7 +262,7 @@ handle_cast({method, Method, Content, Flow}, end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> - ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + ok = send(Reply, NewState), noreply(NewState); {noreply, NewState} -> noreply(NewState); @@ -284,18 +284,20 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; -handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, - State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), - noreply(consumer_monitor(ConsumerTag, State)); +handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) -> + ok = send(Msg, State), + noreply(consumer_monitor(CTag, State)); -handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, Msg), +handle_cast({command, Msg}, State) -> + ok = send(Msg, State), noreply(State); +handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> + noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -394,6 +396,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +send(_Command, #ch{state = closing}) -> + ok; +send(Command, #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Command). + handle_exception(Reason, State = #ch{protocol = Protocol, channel = Channel, writer_pid = WriterPid, @@ -412,8 +419,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol, {stop, normal, State1} end. +-ifdef(use_specs). +-spec(precondition_failed/1 :: (string()) -> no_return()). +-endif. precondition_failed(Format) -> precondition_failed(Format, []). +-ifdef(use_specs). +-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()). +-endif. precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). @@ -531,16 +544,17 @@ check_name(_Kind, NameBin) -> queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case sets:is_element(QPid, Blocking) of false -> State; - true -> Blocking1 = sets:del_element(QPid, Blocking), - ok = case sets:size(Blocking1) of - 0 -> rabbit_writer:send_command( - State#ch.writer_pid, - #'channel.flow_ok'{active = false}); - _ -> ok - end, - State#ch{blocking = Blocking1} + true -> maybe_send_flow_ok( + State#ch{blocking = sets:del_element(QPid, Blocking)}) end. +maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> + case sets:size(Blocking) of + 0 -> ok = send(#'channel.flow_ok'{active = false}, State); + _ -> ok + end, + State. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -565,14 +579,25 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; -handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> - {reply, #'channel.close_ok'{}, State}; +handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid, + state = closing}) -> + ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + {noreply, State}; handle_method(_Method, _, State = #ch{state = closing}) -> {noreply, State}; handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> {ok, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. ReaderPid ! {channel_closing, self()}, {noreply, State1}; @@ -812,12 +837,9 @@ handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> - {noreply, State2 = #ch{writer_pid = WriterPid}} = - handle_method(#'basic.recover_async'{requeue = Requeue}, - Content, - State), - ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), - {noreply, State2}; + {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue}, + Content, State), + {reply, #'basic.recover_ok'{}, State1}; handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, @@ -1072,12 +1094,9 @@ handle_method(#'channel.flow'{active = false}, _, end, State1 = State#ch{limiter = Limiter1}, ok = rabbit_limiter:block(Limiter1), - case consumer_queues(Consumers) of - [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State2} - end; + QPids = consumer_queues(Consumers), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})}; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -1127,17 +1146,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - writer_pid = WriterPid}) -> + queue_consumers = QCons}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> - Cancel = #'basic.cancel'{consumer_tag = CTag, - nowait = true}, - ok = rabbit_writer:send_command(WriterPid, Cancel), + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, + State), dict:erase(CTag, CMap) end, ConsumerMapping, ConsumerTags), State#ch{consumer_mapping = ConsumerMapping1, @@ -1399,12 +1417,17 @@ process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> send_nacks([], State) -> State; +send_nacks(_MXs, State = #ch{state = closing, + tx = none}) -> %% optimisation + State; send_nacks(MXs, State = #ch{tx = none}) -> coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, multiple = Multiple} end, State); +send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation + State#ch{tx = failed}; send_nacks(_, State) -> maybe_complete_tx(State#ch{tx = failed}). @@ -1423,9 +1446,10 @@ send_confirms(State) -> send_confirms([], State) -> State; -send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = MsgSeqNo}), +send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation + State; +send_confirms([MsgSeqNo], State) -> + ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), State; send_confirms(Cs, State) -> coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> @@ -1433,8 +1457,7 @@ send_confirms(Cs, State) -> multiple = Multiple} end, State). -coalesce_and_send(MsgSeqNos, MkMsgFun, - State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> +coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) -> SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case dtree:is_empty(UC) of true -> lists:last(SMsgSeqNos) + 1; @@ -1443,11 +1466,9 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; - _ -> ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(lists:last(Ms), true)) + _ -> ok = send(MkMsgFun(lists:last(Ms), true), State) end, - [ok = rabbit_writer:send_command( - WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], + [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss], State. ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L]; @@ -1464,7 +1485,7 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) -> end. complete_tx(State = #ch{tx = committing}) -> - ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}), + ok = send(#'tx.commit_ok'{}, State), State#ch{tx = new_tx()}; complete_tx(State = #ch{tx = failed}) -> {noreply, State1} = handle_exception( diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 42459833..8ea44a81 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup). diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 995c41fb..16fd08be 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_channel_sup_sup). diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index c508f1b9..9602c512 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_client_sup). diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl index adf6e417..a88bec3d 100644 --- a/src/rabbit_command_assembler.erl +++ b/src/rabbit_command_assembler.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_command_assembler). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 12a532b6..31bc51b8 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_connection_sup). @@ -42,16 +42,11 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), - {ok, ChannelSupSupPid} = - supervisor2:start_child( - SupPid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, ReaderPid} = supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, + [SupPid, Collector, rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index fc9c41a4..f5e70365 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_control_main). diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 689e5d83..53144f3f 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_direct). diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 6330d555..b396b289 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_disk_monitor). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index a9af2d8a..1360c82a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 042ab23c..3efc9c0c 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_error_logger_file_h). diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 7d91b6fa..a91a9916 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_event). @@ -110,18 +110,18 @@ ensure_stats_timer(C, P, Msg) -> stop_stats_timer(C, P) -> case element(P, C) of - #state{level = Level, timer = TRef} = State - when Level =/= none andalso TRef =/= undefined -> - erlang:cancel_timer(TRef), - setelement(P, C, State#state{timer = undefined}); + #state{timer = TRef} = State when TRef =/= undefined -> + case erlang:cancel_timer(TRef) of + false -> C; + _ -> setelement(P, C, State#state{timer = undefined}) + end; #state{} -> C end. reset_stats_timer(C, P) -> case element(P, C) of - #state{timer = TRef} = State - when TRef =/= undefined -> + #state{timer = TRef} = State when TRef =/= undefined -> setelement(P, C, State#state{timer = undefined}); #state{} -> C diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 9339161f..88033f77 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange). diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index 08819427..befbc462 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_decorator). diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index c5583ffd..1fbcb2d8 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 9a5665c0..213b24c4 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_direct). @@ -31,8 +31,7 @@ {enables, kernel_ready}]}). description() -> - [{name, <<"direct">>}, - {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP direct exchange, as per the AMQP specification">>}]. serialise_events() -> false. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index d9a2f60f..5b17ed56 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_fanout). @@ -31,8 +31,7 @@ {enables, kernel_ready}]}). description() -> - [{name, <<"fanout">>}, - {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. serialise_events() -> false. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 516b78e5..75899160 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_headers). @@ -37,8 +37,7 @@ -endif. description() -> - [{name, <<"headers">>}, - {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP headers exchange, as per the AMQP specification">>}]. serialise_events() -> false. diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 101fe434..6b07351a 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_invalid). @@ -24,13 +24,16 @@ add_binding/3, remove_bindings/3, assert_args_equivalence/2]). description() -> - [{name, <<"invalid">>}, - {description, + [{description, <<"Dummy exchange type, to be used when the intended one is not found.">> }]. serialise_events() -> false. +-ifdef(use_specs). +-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) + -> no_return()). +-endif. route(#exchange{name = Name, type = Type}, _) -> rabbit_misc:protocol_error( precondition_failed, diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 644d9acf..bd8ad1ac 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_exchange_type_topic). @@ -34,8 +34,7 @@ %%---------------------------------------------------------------------------- description() -> - [{name, <<"topic">>}, - {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP topic exchange, as per the AMQP specification">>}]. serialise_events() -> false. diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 26f74796..3ceb4989 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(rabbit_file). diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl index a79188ab..93305483 100644 --- a/src/rabbit_framing.erl +++ b/src/rabbit_framing.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% %% TODO auto-generate diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 8ee9ad5b..6c45deea 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_guid). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 05aad8c9..e878f3bb 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_heartbeat). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2b15498e..8a7d14fe 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_limiter). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8dfa89d3..74cdeb23 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_log). diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index f22ad874..117ff95a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index e1a21cf7..625e2f07 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_coordinator). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b5f72cad..bcd4861a 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -11,13 +11,13 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/5, publish_delivered/4, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -198,6 +198,8 @@ purge(State = #state { gm = GM, {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1 }}. +purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). + publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 58f20476..4fb1fc3b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_misc). @@ -32,6 +32,8 @@ [policy_validator, <<"ha-mode">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of {ok, SPid} when is_pid(SPid) -> + maybe_auto_sync(Q), rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), {ok, started}; @@ -235,13 +238,13 @@ suggested_queue_nodes(Q) -> %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. suggested_queue_nodes(Q, PossibleNodes) -> - {MNode0, SNodes} = actual_queue_nodes(Q), + {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, PossibleNodes). + {MNode, SNodes, SSNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,15 +252,20 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> - {MNode, Possible -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> + {MNode, Poss -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is currently not in the nodes specified, - %% act like it is for the purposes below - otherwise we will not - %% return it in the results... - Nodes = lists:usort([MNode | Nodes1]), - Unavailable = Nodes -- Possible, + %% If the current master is not in the nodes specified, then what we want + %% to do depends on whether there are any synchronised slaves. If there + %% are then we can just kill the current master - the admin has asked for + %% a migration and we should give it to them. If there are not however + %% then we must keep the master around so as not to lose messages. + Nodes = case SSNodes of + [] -> lists:usort([MNode | Nodes1]); + _ -> Nodes1 + end, + Unavailable = Nodes -- Poss, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> {MNode, []}; _ -> case lists:member(MNode, Available) of true -> {MNode, Available -- [MNode]}; - false -> promote_slave(Available) + false -> %% Make sure the new master is synced! In order to + %% get here SSNodes must not be empty. + [NewMNode | _] = SSNodes, + {NewMNode, Available -- [NewMNode]} end end; %% When we need to add nodes, we randomise our candidate list as a %% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - but -%% that would fail to take account of synchronisation... -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> +%% randomise the list of ones to remove when we have too many - we +%% would have to take account of synchronisation though. +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), + true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; -suggested_queue_nodes(_, _, {MNode, _}, _) -> +suggested_queue_nodes(_, _, {MNode, _, _}, _) -> {MNode, []}. shuffle(L) -> @@ -288,11 +299,14 @@ shuffle(L) -> {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. -actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> +actual_queue_nodes(#amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids}) -> + Nodes = fun (L) -> [node(Pid) || Pid <- L] end, {case MPid of none -> none; _ -> node(MPid) - end, [node(Pid) || Pid <- SPids]}. + end, Nodes(SPids), Nodes(SSPids)}. is_mirrored(Q) -> case policy(<<"ha-mode">>, Q) of @@ -302,6 +316,14 @@ is_mirrored(Q) -> _ -> false end. +maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> + case policy(<<"ha-sync-mode">>, Q) of + <<"automatic">> -> + spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end); + _ -> + ok + end. + update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of @@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, update_mirrors0(OldQ = #amqqueue{name = QName}, NewQ = #amqqueue{name = QName}) -> - All = fun ({A,B}) -> [A|B] end, - OldNodes = All(actual_queue_nodes(OldQ)), - NewNodes = All(suggested_queue_nodes(NewQ)), - add_mirrors(QName, NewNodes -- OldNodes), + {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + OldNodes = [OldMNode | OldSNodes], + NewNodes = [NewMNode | NewSNodes], + add_mirrors (QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), + maybe_auto_sync(NewQ), ok. %%---------------------------------------------------------------------------- validate_policy(KeyList) -> - validate_policy( - proplists:get_value(<<"ha-mode">>, KeyList), - proplists:get_value(<<"ha-params">>, KeyList, none)). + case validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList, none)) of + ok -> case proplists:get_value( + <<"ha-sync-mode">>, KeyList, <<"manual">>) of + <<"automatic">> -> ok; + <<"manual">> -> ok; + Mode -> {error, "ha-sync-mode must be \"manual\" " + "or \"automatic\", got ~p", [Mode]} + end; + E -> E + end. validate_policy(<<"all">>, none) -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index feddf45a..b435e0f3 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave). @@ -37,18 +37,10 @@ -include("rabbit.hrl"). -%%---------------------------------------------------------------------------- - -include("gm_specs.hrl"). --ifdef(use_specs). -%% Shut dialyzer up --spec(promote_me/2 :: (_, _) -> no_return()). --endif. - %%---------------------------------------------------------------------------- - -define(CREATION_EVENT_KEYS, [pid, name, @@ -79,6 +71,8 @@ depth_delta }). +%%---------------------------------------------------------------------------- + start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> @@ -227,10 +221,12 @@ handle_cast({sync_start, Ref, Syncer}, backing_queue = BQ, backing_queue_state = BQS }) -> State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State), - S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, - rate_timer_ref = TRefN, - backing_queue_state = BQSN} end, - %% [0] We can only sync when there are no pending acks + S = fun({MA, TRefN, BQSN}) -> + State1#state{depth_delta = undefined, + msg_id_ack = dict:from_list(MA), + rate_timer_ref = TRefN, + backing_queue_state = BQSN} + end, case rabbit_mirror_queue_sync:slave( DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> @@ -240,7 +236,7 @@ handle_cast({sync_start, Ref, Syncer}, {TRefN, BQSN1} end) of denied -> noreply(State1); - {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {ok, Res} -> noreply(set_delta(0, S(Res))); {failed, Res} -> noreply(S(Res)); {stop, Reason, Res} -> {stop, Reason, S(Res)} end; @@ -469,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. +-ifdef(use_specs). +-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()). +-endif. promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, @@ -589,30 +588,18 @@ next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> backing_queue_timeout(State = #state { backing_queue = BQ }) -> run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). -ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> - TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync_timeout), - State #state { sync_timer_ref = TRef }; ensure_sync_timer(State) -> - State. + rabbit_misc:ensure_timer(State, #state.sync_timer_ref, + ?SYNC_INTERVAL, sync_timeout). + +stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #state.sync_timer_ref). -stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> - State; -stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> - erlang:cancel_timer(TRef), - State #state { sync_timer_ref = undefined }. - -ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> - TRef = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL, - self(), update_ram_duration), - State #state { rate_timer_ref = TRef }; ensure_rate_timer(State) -> - State. + rabbit_misc:ensure_timer(State, #state.rate_timer_ref, + ?RAM_DURATION_UPDATE_INTERVAL, + update_ram_duration). -stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> - State; -stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> - erlang:cancel_timer(TRef), - State #state { rate_timer_ref = undefined }. +stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref). ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> State #state { known_senders = pmon:monitor(ChPid, KS) }. @@ -843,16 +830,21 @@ update_ram_duration(BQ, BQS) -> rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQ:set_ram_duration_target(DesiredDuration, BQS1). +%% [1] - the arrival of this newly synced slave may cause the master to die if +%% the admin has requested a migration-type change to policy. record_synchronised(#amqqueue { name = QName }) -> Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q = #amqqueue { sync_slave_pids = SSPids }] -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { sync_slave_pids = [Self | SSPids] }), - ok - end - end). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue { sync_slave_pids = SSPids }] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2), + {ok, Q1, Q2} + end + end) of + ok -> ok; + {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1] + end. diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl index a2034876..be3924f0 100644 --- a/src/rabbit_mirror_queue_slave_sup.erl +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2010-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mirror_queue_slave_sup). diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f2ab67cd..b8cfe4a9 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -57,6 +57,9 @@ -type(log_fun() :: fun ((string(), [any()]) -> 'ok')). -type(bq() :: atom()). -type(bqs() :: any()). +-type(ack() :: any()). +-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(), + bqs()}). -spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()). -spec(master_go/7 :: (pid(), reference(), log_fun(), @@ -69,8 +72,8 @@ -spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(), bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) -> 'denied' | - {'ok' | 'failed', {timer:tref(), bqs()}} | - {'stop', any(), {timer:tref(), bqs()}}). + {'ok' | 'failed', slave_sync_state()} | + {'stop', any(), slave_sync_state()}). -endif. @@ -91,16 +94,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Acc) -> - master_send(Msg, MsgProps, Args, Acc) + case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> + master_send(Msg, MsgProps, Unacked, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {I, Last}) -> +master_send(Msg, MsgProps, Unacked, + {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), @@ -119,7 +122,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} @@ -164,11 +167,11 @@ syncer(Ref, Log, MPid, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps} -> + {msg, Ref, Msg, MsgProps, Unacked} -> SPids1 = wait_for_credit(SPids), [begin credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps} + SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> @@ -204,12 +207,12 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), Syncer ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, - rabbit_misc:get_parent()}, TRef, BQS1). + rabbit_misc:get_parent()}, {[], TRef, BQS1}). slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, - TRef, BQS) -> + State = {MA, TRef, BQS}) -> receive {'DOWN', MRef, process, Syncer, _Reason} -> %% If the master dies half way we are not in the usual @@ -218,34 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, %% sync with a newly promoted master, or even just receive %% messages from it, we have a hole in the middle. So the %% only thing to do here is purge. - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), credit_flow:peer_down(Syncer), - {failed, {TRef, BQS1}}; + {failed, {[], TRef, BQS1}}; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {sync_complete, Ref} -> erlang:demonitor(MRef, [flush]), credit_flow:peer_down(Syncer), - {ok, {TRef, BQS}}; + {ok, State}; {'$gen_cast', {set_maximum_since_use, Age}} -> ok = file_handle_cache:set_maximum_since_use(Age), - slave_sync_loop(Args, TRef, BQS); + slave_sync_loop(Args, State); {'$gen_cast', {set_ram_duration_target, Duration}} -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - slave_sync_loop(Args, TRef, BQS1); + slave_sync_loop(Args, {MA, TRef, BQS1}); update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), - slave_sync_loop(Args, TRef1, BQS1); - {sync_msg, Ref, Msg, Props} -> + slave_sync_loop(Args, {MA, TRef1, BQS1}); + {sync_msg, Ref, Msg, Props, Unacked} -> credit_flow:ack(Syncer), Props1 = Props#message_properties{needs_confirming = false}, - BQS1 = BQ:publish(Msg, Props1, true, none, BQS), - slave_sync_loop(Args, TRef, BQS1); + {MA1, BQS1} = + case Unacked of + false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)}; + true -> {AckTag, BQS2} = BQ:publish_delivered( + Msg, Props1, none, BQS), + {[{Msg#basic_message.id, AckTag} | MA], BQS2} + end, + slave_sync_loop(Args, {MA1, TRef, BQS1}); {'EXIT', Parent, Reason} -> - {stop, Reason, {TRef, BQS}}; + {stop, Reason, State}; %% If the master throws an exception {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> BQ:delete_and_terminate(Reason, BQS), - {stop, Reason, {TRef, undefined}} + {stop, Reason, {[], TRef, undefined}} end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ce3e3802..c36fb147 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_misc). @@ -67,6 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). +-export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). %% Horrible macro to use in guards @@ -242,6 +243,8 @@ -spec(interval_operation/4 :: ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). +-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). +-spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). -endif. @@ -1047,6 +1050,22 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> round(LastInterval / 1.5)]) end}. +ensure_timer(State, Idx, After, Msg) -> + case element(Idx, State) of + undefined -> TRef = erlang:send_after(After, self(), Msg), + setelement(Idx, State, TRef); + _ -> State + end. + +stop_timer(State, Idx) -> + case element(Idx, State) of + undefined -> State; + TRef -> case erlang:cancel_timer(TRef) of + false -> State; + _ -> setelement(Idx, State, undefined) + end + end. + %% ------------------------------------------------------------------------- %% Begin copypasta from gen_server2.erl diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6a442fec..c39e898c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_mnesia). @@ -41,10 +41,7 @@ ]). %% Used internally in rpc calls --export([node_info/0, - remove_node_if_mnesia_running/1, - is_running_remote/0 - ]). +-export([node_info/0, remove_node_if_mnesia_running/1]). -include("rabbit.hrl"). @@ -278,16 +275,16 @@ forget_cluster_node(Node, RemoveWhenOffline) -> true -> ok; false -> e(not_a_cluster_node) end, - case {RemoveWhenOffline, mnesia:system_info(is_running)} of - {true, no} -> remove_node_offline_node(Node); - {true, yes} -> e(online_node_offline_flag); - {false, no} -> e(offline_node_no_offline_flag); - {false, yes} -> rabbit_misc:local_info_msg( - "Removing node ~p from cluster~n", [Node]), - case remove_node_if_mnesia_running(Node) of - ok -> ok; - {error, _} = Err -> throw(Err) - end + case {RemoveWhenOffline, is_running()} of + {true, false} -> remove_node_offline_node(Node); + {true, true} -> e(online_node_offline_flag); + {false, false} -> e(offline_node_no_offline_flag); + {false, true} -> rabbit_misc:local_info_msg( + "Removing node ~p from cluster~n", [Node]), + case remove_node_if_mnesia_running(Node) of + ok -> ok; + {error, _} = Err -> throw(Err) + end end. remove_node_offline_node(Node) -> @@ -334,11 +331,11 @@ status() -> end, [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++ IfNonEmpty(ram, cluster_nodes(ram)))}] ++ - case mnesia:system_info(is_running) of - yes -> RunningNodes = cluster_nodes(running), - [{running_nodes, cluster_nodes(running)}, - {partitions, mnesia_partitions(RunningNodes)}]; - no -> [] + case is_running() of + true -> RunningNodes = cluster_nodes(running), + [{running_nodes, RunningNodes}, + {partitions, mnesia_partitions(RunningNodes)}]; + false -> [] end. mnesia_partitions(Nodes) -> @@ -346,6 +343,8 @@ mnesia_partitions(Nodes) -> Nodes, rabbit_node_monitor, partitions, []), [Reply || Reply = {_, R} <- Replies, R =/= []]. +is_running() -> mnesia:system_info(is_running) =:= yes. + is_clustered() -> AllNodes = cluster_nodes(all), AllNodes =/= [] andalso AllNodes =/= [node()]. @@ -355,10 +354,10 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% the data from mnesia. Obviously it'll work only when mnesia is %% running. cluster_status_from_mnesia() -> - case mnesia:system_info(is_running) of - no -> + case is_running() of + false -> {error, mnesia_not_running}; - yes -> + true -> %% If the tables are not present, it means that %% `init_db/3' hasn't been run yet. In other words, either %% we are a virgin node or a restarted RAM node. In both @@ -601,19 +600,16 @@ discover_cluster(Nodes) when is_list(Nodes) -> lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; (Node, {error, _}) -> discover_cluster(Node) end, {error, no_nodes_provided}, Nodes); +discover_cluster(Node) when Node == node() -> + {error, {cannot_discover_cluster, "Cannot cluster node with itself"}}; discover_cluster(Node) -> OfflineError = {error, {cannot_discover_cluster, "The nodes provided are either offline or not running"}}, - case node() of - Node -> {error, {cannot_discover_cluster, - "Cannot cluster node with itself"}}; - _ -> case rpc:call(Node, - rabbit_mnesia, cluster_status_from_mnesia, []) of - {badrpc, _Reason} -> OfflineError; - {error, mnesia_not_running} -> OfflineError; - {ok, Res} -> {ok, Res} - end + case rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> OfflineError; + {error, mnesia_not_running} -> OfflineError; + {ok, Res} -> {ok, Res} end. schema_ok_or_move() -> @@ -672,20 +668,21 @@ move_db() -> ok. remove_node_if_mnesia_running(Node) -> - case mnesia:system_info(is_running) of - yes -> + case is_running() of + false -> + {error, mnesia_not_running}; + true -> %% Deleting the the schema copy of the node will result in %% the node being removed from the cluster, with that %% change being propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> + rabbit_amqqueue:forget_all_durable(Node), rabbit_node_monitor:notify_left_cluster(Node), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} - end; - no -> - {error, mnesia_not_running} + end end. leave_cluster() -> @@ -735,8 +732,6 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}. - check_consistency(OTP, Rabbit) -> rabbit_misc:sequence_error( [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]). diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index f685b109..81111061 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_msg_file). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c2e55022..13b40a48 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store). @@ -943,15 +943,12 @@ next_state(State = #msstate { cref_to_msg_ids = CTM }) -> _ -> {State, 0} end. -start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> - TRef = erlang:send_after(?SYNC_INTERVAL, self(), sync), - State #msstate { sync_timer_ref = TRef }. +start_sync_timer(State) -> + rabbit_misc:ensure_timer(State, #msstate.sync_timer_ref, + ?SYNC_INTERVAL, sync). -stop_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> - State; -stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> - erlang:cancel_timer(TRef), - State #msstate { sync_timer_ref = undefined }. +stop_sync_timer(State) -> + rabbit_misc:stop_timer(State, #msstate.sync_timer_ref). internal_sync(State = #msstate { current_file_handle = CurHdl, cref_to_msg_ids = CTM }) -> diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 3defeaaf..bbc7db68 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_ets_index). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 3b61ed0b..3881de23 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_gc). diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl index 6cc0b2a7..f0096446 100644 --- a/src/rabbit_msg_store_index.erl +++ b/src/rabbit_msg_store_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_msg_store_index). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 562fc197..b8b03f56 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_net). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 31eeef73..4b6c7538 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -11,14 +11,15 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_networking). -export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, stop_tcp_listener/1, on_node_down/1, active_listeners/0, - node_listeners/1, connections/0, connection_info_keys/0, + node_listeners/1, register_connection/1, unregister_connection/1, + connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, close_connection/2, force_connection_event_refresh/0, tcp_host/1]). @@ -65,6 +66,8 @@ -spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). +-spec(register_connection/1 :: (pid()) -> ok). +-spec(unregister_connection/1 :: (pid()) -> ok). -spec(connections/0 :: () -> [rabbit_types:connection()]). -spec(connections_local/0 :: () -> [rabbit_types:connection()]). -spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). @@ -294,20 +297,15 @@ start_client(Sock) -> start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). +register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). + +unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). + connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). -connections_local() -> - [Reader || - {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup), - Reader <- [try - rabbit_connection_sup:reader(ConnSup) - catch exit:{noproc, _} -> - noproc - end], - Reader =/= noproc]. +connections_local() -> pg_local:get_members(rabbit_connections). connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 258ac0ce..71c2c80a 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_node_monitor). diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index c8d77b0f..c92e5963 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_nodes). diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index 24762a73..a4bd5042 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -11,12 +11,12 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_parameter_validation). --export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]). +-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]). number(_Name, Term) when is_number(Term) -> ok; @@ -73,3 +73,15 @@ proplist(Name, Constraints, Term) when is_list(Term) -> proplist(Name, _Constraints, Term) -> {error, "~s not a list ~p", [Name, Term]}. + +enum(OptionsA) -> + Options = [list_to_binary(atom_to_list(O)) || O <- OptionsA], + fun (Name, Term) when is_binary(Term) -> + case lists:member(Term, Options) of + true -> ok; + false -> {error, "~s should be one of ~p, actually was ~p", + [Name, Options, Term]} + end; + (Name, Term) -> + {error, "~s should be binary, actually was ~p", [Name, Term]} + end. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 9f94af7d..58c906eb 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(rabbit_plugins). @@ -64,8 +64,8 @@ list(PluginsDir) -> [plugin_info(PluginsDir, Plug) || Plug <- EZs ++ FreeApps]), case Problems of [] -> ok; - _ -> io:format("Warning: Problem reading some plugins: ~p~n", - [Problems]) + _ -> error_logger:warning_msg( + "Problem reading some plugins: ~p~n", [Problems]) end, Plugins. @@ -112,8 +112,9 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> case Enabled -- plugin_names(ToUnpackPlugins) of [] -> ok; - Missing -> io:format("Warning: the following enabled plugins were " - "not found: ~p~n", [Missing]) + Missing -> error_logger:warning_msg( + "The following enabled plugins were not found: ~p~n", + [Missing]) end, %% Eliminate the contents of the destination directory diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 2158d1da..308b80cd 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(rabbit_plugins_main). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 2c997f16..7398cd2d 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_policy). @@ -26,7 +26,7 @@ -export([register/0]). -export([name/1, get/2, set/1]). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). @@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). -validate_clear(_VHost, <<"policy">>, _Name) -> - ok. - notify(VHost, <<"policy">>, _Name, _Term) -> update_policies(VHost). @@ -218,10 +215,13 @@ validation(_Name, Terms) when is_list(Terms) -> rabbit_registry:lookup_all(policy_validator)), [] = dups(Keys), %% ASSERTION Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), - {TermKeys, _} = lists:unzip(Terms), - case dups(TermKeys) of - [] -> validation0(Validators, Terms); - Dup -> {error, "~p duplicate keys not allowed", [Dup]} + case is_proplist(Terms) of + true -> {TermKeys, _} = lists:unzip(Terms), + case dups(TermKeys) of + [] -> validation0(Validators, Terms); + Dup -> {error, "~p duplicate keys not allowed", [Dup]} + end; + false -> {error, "definition must be a dictionary: ~p", [Terms]} end; validation(_Name, Term) -> {error, "parse error while reading policy: ~p", [Term]}. @@ -249,3 +249,5 @@ validation0(Validators, Terms) -> a2b(A) -> list_to_binary(atom_to_list(A)). dups(L) -> L -- lists:usort(L). + +is_proplist(L) -> length(L) =:= length([I || I = {_, _} <- L]). diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl index b59dec2b..75b88c39 100644 --- a/src/rabbit_policy_validator.erl +++ b/src/rabbit_policy_validator.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_policy_validator). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 404afe3c..3ce516d0 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_prelaunch). diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 6dad01cc..521cd78b 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_queue_collector). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 21f58154..ea70208f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_queue_index). @@ -162,7 +162,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_msg_ids }). + max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -190,7 +190,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_msg_ids :: gb_set() + unconfirmed :: gb_set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -210,7 +210,7 @@ -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/1 :: (qistate()) -> qistate()). --spec(needs_sync/1 :: (qistate()) -> boolean()). +-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false'). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_types:msg_id(), seq_id(), @@ -269,13 +269,16 @@ delete_and_terminate(State) -> State1. publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + State = #qistate { unconfirmed = Unconfirmed }) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - State #qistate { - unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }), + case MsgProps#message_properties.needs_confirming of + true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), + State #qistate { unconfirmed = Unconfirmed1 }; + false -> State + end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> needs_sync(#qistate { journal_handle = undefined }) -> false; -needs_sync(#qistate { journal_handle = JournalHdl }) -> - file_handle_cache:needs_sync(JournalHdl). +needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> + case gb_sets:is_empty(UC) of + true -> case file_handle_cache:needs_sync(JournalHdl) of + true -> other; + false -> false + end; + false -> confirms + end. flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -398,7 +407,7 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_msg_ids = gb_sets:new() }. + unconfirmed = gb_sets:new() }. clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action, end}; add_to_journal(RelSeq, Action, JEntries) -> - Val = case array:get(RelSeq, JEntries) of - undefined -> - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end; - ({Pub, no_del, no_ack}) when Action == del -> - {Pub, del, no_ack}; - ({Pub, Del, no_ack}) when Action == ack -> - {Pub, Del, ack} - end, - array:set(RelSeq, Val, JEntries). + case array:get(RelSeq, JEntries) of + undefined -> + array:set(RelSeq, + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end, JEntries); + ({Pub, no_del, no_ack}) when Action == del -> + array:set(RelSeq, {Pub, del, no_ack}, JEntries); + ({no_pub, del, no_ack}) when Action == ack -> + array:set(RelSeq, {no_pub, del, ack}, JEntries); + ({?PUB, del, no_ack}) when Action == ack -> + array:reset(RelSeq, JEntries) + end. maybe_flush_journal(State = #qistate { dirty_count = DCount, max_journal_entries = MaxJournal }) @@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(UG), - State #qistate { unsynced_msg_ids = gb_sets:new() }. +notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> + case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State #qistate { unconfirmed = gb_sets:new() } + end. %%---------------------------------------------------------------------------- %% segment manipulation diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 13e8feff..b8ff9c9f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_reader). @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/2]). +-export([init/4, mainloop/2, recvloop/2]). -export([conserve_resources/3, server_properties/1]). @@ -37,7 +37,8 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}). + conn_sup_pid, channel_sup_sup_pid, start_heartbeat_fun, + buf, buf_len, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -64,6 +65,10 @@ State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). +-define(IS_STOPPING(State), + (State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed)). + %%-------------------------------------------------------------------------- -ifdef(use_specs). @@ -105,12 +110,12 @@ start_link(ChannelSupSupPid, Collector, StartHeartbeatFun) -> shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun) -> +init(Parent, ConnSupPid, Collector, StartHeartbeatFun) -> Deb = sys:debug_options([]), receive {go, Sock, SockTransform} -> start_connection( - Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, Sock, + Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) end. @@ -199,7 +204,7 @@ name(Sock) -> socket_ends(Sock) -> socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). -start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, +start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), Name = name(Sock), @@ -230,7 +235,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, connection_state = pre_init, queue_collector = Collector, heartbeater = none, - channel_sup_sup_pid = ChannelSupSupPid, + conn_sup_pid = ConnSupPid, + channel_sup_sup_pid = none, start_heartbeat_fun = StartHeartbeatFun, buf = [], buf_len = 0, @@ -240,9 +246,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_at = never}}, try ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), - recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)), + run({?MODULE, recvloop, + [Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -259,10 +266,16 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, %% accounting as accurate as possible we ought to close the %% socket w/o delay before termination. rabbit_net:fast_close(ClientSock), + rabbit_networking:unregister_connection(self()), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. +run({M, F, A}) -> + try apply(M, F, A) + catch {become, MFA} -> run(MFA) + end. + recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{connection_state = blocked}) -> @@ -282,26 +295,35 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of - {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); - closed -> case State#v1.connection_state of - closed -> State; - _ -> throw(connection_closed_abruptly) - end; - {error, Reason} -> throw({inet_error, Reason}); - {other, Other} -> handle_other(Other, Deb, State) + {data, Data} -> + recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed when State#v1.connection_state =:= closed -> + ok; + closed -> + throw(connection_closed_abruptly); + {error, Reason} -> + throw({inet_error, Reason}); + {other, {system, From, Request}} -> + sys:handle_system_msg(Request, From, State#v1.parent, + ?MODULE, Deb, State); + {other, Other} -> + case handle_other(Other, State) of + stop -> ok; + NewState -> recvloop(Deb, NewState) + end end. -handle_other({conserve_resources, Conserve}, Deb, +handle_other({conserve_resources, Conserve}, State = #v1{throttle = Throttle}) -> Throttle1 = Throttle#throttle{conserve_resources = Conserve}, - recvloop(Deb, control_throttle(State#v1{throttle = Throttle1})); -handle_other({channel_closing, ChPid}, Deb, State) -> + control_throttle(State#v1{throttle = Throttle1}); +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(control_throttle(State))); -handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + maybe_close(control_throttle(State)); +handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to @@ -313,59 +335,54 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% initiated by our parent it is probably more important to exit %% quickly. exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) -> throw(E); -handle_other({channel_exit, Channel, Reason}, Deb, State) -> - mainloop(Deb, handle_exception(State, Channel, Reason)); -handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); -handle_other(terminate_connection, _Deb, State) -> +handle_other({channel_exit, Channel, Reason}, State) -> + handle_exception(State, Channel, Reason); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> + handle_dependent_exit(ChPid, Reason, State); +handle_other(terminate_connection, _State) -> + stop; +handle_other(handshake_timeout, State) + when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> State; -handle_other(handshake_timeout, Deb, State) - when ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> - mainloop(Deb, State); -handle_other(handshake_timeout, _Deb, State) -> +handle_other(handshake_timeout, State) -> throw({handshake_timeout, State#v1.callback}); -handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> - mainloop(Deb, State); -handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> + State; +handle_other(heartbeat_timeout, #v1{connection_state = S}) -> throw({heartbeat_timeout, S}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> +handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) + force -> stop; + normal -> NewState end; -handle_other({'$gen_call', From, info}, Deb, State) -> +handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); -handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + State; +handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) + State; +handle_other({'$gen_cast', force_event_refresh}, State) when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + State; +handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. - mainloop(Deb, State); -handle_other(ensure_stats, Deb, State) -> - mainloop(Deb, ensure_stats_timer(State)); -handle_other(emit_stats, Deb, State) -> - mainloop(Deb, emit_stats(State)); -handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); -handle_other({bump_credit, Msg}, Deb, State) -> + State; +handle_other(ensure_stats, State) -> + ensure_stats_timer(State); +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> + control_throttle(State); +handle_other(Other, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -426,13 +443,13 @@ close_connection(State = #v1{queue_collector = Collector, handle_dependent_exit(ChPid, Reason, State) -> case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, uncontrolled} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> - maybe_close(handle_exception(control_throttle(State), - Channel, Reason)) + {undefined, controlled} -> State; + {undefined, uncontrolled} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_Channel, controlled} -> maybe_close(control_throttle(State)); + {Channel, uncontrolled} -> State1 = handle_exception( + State, Channel, Reason), + maybe_close(control_throttle(State1)) end. terminate_channels() -> @@ -572,17 +589,13 @@ all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. %%-------------------------------------------------------------------------- handle_frame(Type, 0, Payload, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}}) - when CS =:= closing; CS =:= closed -> + State = #v1{connection = #connection{protocol = Protocol}}) + when ?IS_STOPPING(State) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State end; -handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> - State; handle_frame(Type, 0, Payload, State = #v1{connection = #connection{protocol = Protocol}}) -> case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of @@ -600,6 +613,8 @@ handle_frame(Type, Channel, Payload, heartbeat -> unexpected_frame(Type, Channel, Payload, State); Frame -> process_frame(Frame, Channel, State) end; +handle_frame(_Type, _Channel, _Payload, State) when ?IS_STOPPING(State) -> + State; handle_frame(Type, Channel, Payload, State) -> unexpected_frame(Type, Channel, Payload, State). @@ -627,7 +642,10 @@ process_frame(Frame, Channel, State) -> post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + %% This is not strictly necessary, but more obviously + %% correct. Also note that we do not need to call maybe_close/1 + %% since we cannot possibly be in the 'closing' state. + control_throttle(State); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> @@ -689,8 +707,12 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% ... and finally, the 1.0 spec is crystal clear! Note that the +handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) -> + become_1_0(Id, State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, A, B, C, D}); + refuse_connection(Sock, {bad_version, {A, B, C, D}}); handle_input(handshake, Other, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); @@ -704,6 +726,7 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> + rabbit_networking:register_connection(self()), Start = #'connection.start'{ version_major = ProtocolMajor, version_minor = ProtocolMinor, @@ -717,10 +740,13 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, connection_state = starting}, frame_header, 7). -refuse_connection(Sock, Exception) -> - ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), +refuse_connection(Sock, Exception, {A, B, C, D}) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",A,B,C,D>>) end), throw(Exception). +refuse_connection(Sock, Exception) -> + refuse_connection(Sock, Exception, {0, 0, 9, 1}). + ensure_stats_timer(State = #v1{connection_state = running}) -> rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); ensure_stats_timer(State) -> @@ -799,17 +825,24 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, + conn_sup_pid = ConnSupPid, sock = Sock, throttle = Throttle}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + Throttle1 = Throttle#throttle{conserve_resources = Conserve}, + {ok, ChannelSupSupPid} = + supervisor2:start_child( + ConnSupPid, + {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), State1 = control_throttle( - State#v1{connection_state = running, - connection = NewConnection, - throttle = Throttle#throttle{ - conserve_resources = Conserve}}), + State#v1{connection_state = running, + connection = NewConnection, + channel_sup_sup_pid = ChannelSupSupPid, + throttle = Throttle1}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -820,10 +853,9 @@ handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, - State = #v1{connection_state = CS, - connection = #connection{protocol = Protocol}, + State = #v1{connection = #connection{protocol = Protocol}, sock = Sock}) - when CS =:= closing; CS =:= closed -> + when ?IS_STOPPING(State) -> %% We're already closed or closing, so we don't need to cleanup %% anything. ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), @@ -832,8 +864,7 @@ handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, State; -handle_method0(_Method, State = #v1{connection_state = CS}) - when CS =:= closing; CS =:= closed -> +handle_method0(_Method, State) when ?IS_STOPPING(State) -> State; handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( @@ -981,3 +1012,33 @@ cert_info(F, #v1{sock = Sock}) -> emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +%% 1.0 stub +-ifdef(use_specs). +-spec(become_1_0/2 :: (non_neg_integer(), #v1{}) -> no_return()). +-endif. +become_1_0(Id, State = #v1{sock = Sock}) -> + case code:is_loaded(rabbit_amqp1_0_reader) of + false -> refuse_connection(Sock, amqp1_0_plugin_not_enabled); + _ -> Mode = case Id of + 0 -> amqp; + 3 -> sasl; + _ -> refuse_connection( + Sock, {unsupported_amqp1_0_protocol_id, Id}, + {3, 1, 0, 0}) + end, + throw({become, {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(State)]}}) + end. + +pack_for_1_0(#v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + queue_collector = QueueCollector, + conn_sup_pid = ConnSupPid, + start_heartbeat_fun = SHF, + buf = Buf, + buf_len = BufLen}) -> + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, ConnSupPid, SHF, + Buf, BufLen}. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 32709d24..60419856 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_registry). diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index 237ab78c..4c4ab2cf 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_restartable_sup). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index f4bbda0f..2eaef9a7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_router). diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index 18668049..6b62c974 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_runtime_parameter). @@ -23,8 +23,6 @@ -callback validate(rabbit_types:vhost(), binary(), binary(), term()) -> validate_results(). --callback validate_clear(rabbit_types:vhost(), binary(), - binary()) -> validate_results(). -callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'. -callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'. @@ -35,7 +33,6 @@ behaviour_info(callbacks) -> [ {validate, 4}, - {validate_clear, 3}, {notify, 4}, {notify_clear, 3} ]; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 49060409..b1100b65 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_runtime_parameters). @@ -120,21 +120,13 @@ clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). clear_any(VHost, Component, Name) -> - case clear_any0(VHost, Component, Name) of - ok -> ok; - {errors, L} -> format_error(L) - end. - -clear_any0(VHost, Component, Name) -> - case lookup_component(Component) of - {ok, Mod} -> case flatten_errors( - Mod:validate_clear(VHost, Component, Name)) of - ok -> mnesia_clear(VHost, Component, Name), - Mod:notify_clear(VHost, Component, Name), - ok; - E -> E - end; - E -> E + case lookup(VHost, Component, Name) of + not_found -> {error_string, "Parameter does not exist"}; + _ -> mnesia_clear(VHost, Component, Name), + case lookup_component(Component) of + {ok, Mod} -> Mod:notify_clear(VHost, Component, Name); + _ -> ok + end end. mnesia_clear(VHost, Component, Name) -> diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index d4d7271e..05c1dbc1 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -11,14 +11,14 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_runtime_parameters_test). -behaviour(rabbit_runtime_parameter). -behaviour(rabbit_policy_validator). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([register/0, unregister/0]). -export([validate_policy/1]). -export([register_policy_validator/0, unregister_policy_validator/0]). @@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok; validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok; validate(_, <<"test">>, _, _) -> {error, "meh", []}. -validate_clear(_, <<"test">>, <<"good">>) -> ok; -validate_clear(_, <<"test">>, <<"maybe">>) -> ok; -validate_clear(_, <<"test">>, _) -> {error, "meh", []}. - notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl index e8beecfe..566db9a9 100644 --- a/src/rabbit_sasl_report_file_h.erl +++ b/src/rabbit_sasl_report_file_h.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_sasl_report_file_h). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 22ff555f..b1238623 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_ssl). diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index f142d233..6a6b2feb 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_sup). diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index fa1c5bbd..d1c0bb1e 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_table). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 09ed3d08..27807b62 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_tests). @@ -61,6 +61,7 @@ all_tests() -> passed = test_runtime_parameters(), passed = test_policy_validation(), passed = test_server_status(), + passed = test_amqp_connection_refusal(), passed = test_confirms(), passed = do_if_secondary_node( @@ -911,10 +912,10 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) -> {NewM, NewSs0} = rabbit_mirror_queue_misc:suggested_queue_nodes( - Policy, Params, {OldM, OldSs}, All), + Policy, Params, CurrentState, All), NewSs1 = lists:sort(NewSs0), case dm_list_match(NewSs, NewSs1, ExtraSs) of ok -> ok; @@ -922,28 +923,36 @@ test_dynamic_mirroring() -> end end, - Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]), + + N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end, %% Add a node - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), - Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]), %% Add two nodes and drop one - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]), %% Don't try to include nodes that are not running - Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master - Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), - %% And once that's happened, still keep the master even when not listed - Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]), - - Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), - Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]), + %% And once that's happened, still keep the master even when not listed, + %% if nothing is synced + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]), + %% But if something is synced we can lose the master - but make + %% sure we pick the new master from the nodes which are synced! + Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]), + Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]), + + Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]), passed. @@ -1061,7 +1070,11 @@ test_runtime_parameters() -> ok = control_action(clear_parameter, ["test", "maybe"]), {error_string, _} = control_action(clear_parameter, ["test", "neverexisted"]), + + %% We can delete for a component that no longer exists + Good(["test", "good", "\"ignore\""]), rabbit_runtime_parameters_test:unregister(), + ok = control_action(clear_parameter, ["test", "good"]), passed. test_policy_validation() -> @@ -1086,7 +1099,7 @@ test_policy_validation() -> test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun () -> receive shutdown -> ok end end), + Writer = spawn(fun test_writer/0), {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), @@ -1119,11 +1132,9 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, <<"foo">>)), %% list connections - [#listener{host = H, port = P} | _] = - [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), - N =:= node()], - - {ok, _C} = gen_tcp:connect(H, P, []), + {H, P} = find_listener(), + {ok, C} = gen_tcp:connect(H, P, []), + gen_tcp:send(C, <<"AMQP", 0, 0, 9, 1>>), timer:sleep(100), ok = info_action(list_connections, rabbit_networking:connection_info_keys(), false), @@ -1160,10 +1171,34 @@ test_server_status() -> passed. +test_amqp_connection_refusal() -> + [passed = test_amqp_connection_refusal(V) || + V <- [<<"AMQP",9,9,9,9>>, <<"AMQP",0,1,0,0>>, <<"XXXX",0,0,9,1>>]], + passed. + +test_amqp_connection_refusal(Header) -> + {H, P} = find_listener(), + {ok, C} = gen_tcp:connect(H, P, [binary, {active, false}]), + ok = gen_tcp:send(C, Header), + {ok, <<"AMQP",0,0,9,1>>} = gen_tcp:recv(C, 8, 100), + ok = gen_tcp:close(C), + passed. + +find_listener() -> + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {H, P}. + +test_writer() -> test_writer(none). + test_writer(Pid) -> receive - shutdown -> ok; - {send_command, Method} -> Pid ! Method, test_writer(Pid) + {'$gen_call', From, flush} -> gen_server:reply(From, ok), + test_writer(Pid); + {send_command, Method} -> Pid ! Method, + test_writer(Pid); + shutdown -> ok end. test_spawn() -> @@ -2227,10 +2262,10 @@ variable_queue_publish(IsPersistent, Count, VQ) -> variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ). variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> - variable_queue_publish(IsPersistent, Count, PropFun, + variable_queue_publish(IsPersistent, 1, Count, PropFun, fun (_N) -> <<>> end, VQ). -variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> +variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> lists:foldl( fun (N, VQN) -> rabbit_variable_queue:publish( @@ -2242,7 +2277,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) -> end}, PayloadFun(N)), PropFun(N, #message_properties{}), false, self(), VQN) - end, VQ, lists:seq(1, Count)). + end, VQ, lists:seq(Start, Start + Count - 1)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> @@ -2322,59 +2357,120 @@ test_variable_queue() -> fun test_dropwhile_varying_ram_duration/1, fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], passed. test_variable_queue_fold(VQ0) -> - Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64, - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish( - true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), - lists:foldl( - fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end, - VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). - -test_variable_queue_fold(Cut, Count, VQ0) -> + {PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Count = rabbit_variable_queue:depth(VQ1), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). + +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, A) -> - case msg2int(M) =< Cut of - true -> {cont, [M | A]}; + fun (M, _, Pending, A) -> + MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert + case MInt =< Cut of + true -> {cont, [MInt | A]}; false -> {stop, A} end end, [], VQ0), - true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] == - [msg2int(M) || M <- Acc], + Expected = lists:takewhile(fun (I) -> I =< Cut end, Msgs), + Expected = lists:reverse(Acc), %% assertion VQ1. msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> binary_to_term(list_to_binary(lists:reverse(P))). -test_variable_queue_requeue(VQ0) -> - Interval = 50, - Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, +ack_subset(AckSeqs, Interval, Rem) -> + lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs). + +requeue_one_by_one(Acks, VQ) -> + lists:foldl(fun (AckTag, VQN) -> + {_MsgId, VQM} = rabbit_variable_queue:requeue( + [AckTag], VQN), + VQM + end, VQ, Acks). + +%% Create a vq with messages in q1, delta, and q3, and holes (in the +%% form of pending acks) in the latter two. +variable_queue_with_holes(VQ0) -> + Interval = 64, + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), - Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> - [Ack | Acc]; - (_, Acc) -> - Acc - end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), - VQ5 = lists:foldl(fun (AckTag, VQN) -> - {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], VQN), - VQM - end, VQ4, Subset), - VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag}, VQb} = + VQ2 = variable_queue_publish( + false, 1, Count, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + Acks = lists:reverse(AcksR), + AckSeqs = lists:zip(Acks, Seq), + [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] = + [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]], + %% we requeue in three phases in order to exercise requeuing logic + %% in various vq states + {_MsgIds, VQ4} = rabbit_variable_queue:requeue( + Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), + VQ5 = requeue_one_by_one(Subset1, VQ4), + %% by now we have some messages (and holes) in delt + VQ6 = requeue_one_by_one(Subset2, VQ5), + VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6), + %% add the q1 tail + VQ8 = variable_queue_publish( + true, Count + 1, 64, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), + %% assertions + [false = case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- rabbit_variable_queue:status(VQ8), + lists:member(K, [q1, delta, q3])], + Depth = Count + 64, + Depth = rabbit_variable_queue:depth(VQ8), + Len = Depth - length(Subset3), + Len = rabbit_variable_queue:len(VQ8), + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. + +test_variable_queue_requeue(VQ0) -> + {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Msgs = + lists:zip(RequeuedMsgs, + lists:duplicate(length(RequeuedMsgs), true)) ++ + lists:zip(FreshMsgs, + lists:duplicate(length(FreshMsgs), false)), + VQ2 = lists:foldl(fun ({I, Requeued}, VQa) -> + {{M, MRequeued, _}, VQb} = rabbit_variable_queue:fetch(true, VQa), + Requeued = MRequeued, %% assertion + I = msg2int(M), %% assertion VQb - end, VQ5, lists:reverse(Acks)), - {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6), - VQ7. + end, VQ1, Msgs), + {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), + VQ3. + +test_variable_queue_purge(VQ0) -> + LenDepth = fun (VQ) -> + {rabbit_variable_queue:len(VQ), + rabbit_variable_queue:depth(VQ)} + end, + VQ1 = variable_queue_publish(false, 10, VQ0), + {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), + {4, VQ3} = rabbit_variable_queue:purge(VQ2), + {0, 6} = LenDepth(VQ3), + {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), + {2, 6} = LenDepth(VQ4), + VQ5 = rabbit_variable_queue:purge_acks(VQ4), + {2, 2} = LenDepth(VQ5), + VQ5. test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages @@ -2426,7 +2522,7 @@ test_dropfetchwhile(VQ0) -> %% add messages with sequential expiry VQ1 = variable_queue_publish( - false, Count, + false, 1, Count, fun (N, Props) -> Props#message_properties{expiry = N} end, fun erlang:term_to_binary/1, VQ0), diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl index 72c07b51..c52394c7 100644 --- a/src/rabbit_tests_event_receiver.erl +++ b/src/rabbit_tests_event_receiver.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_tests_event_receiver). diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 601656da..432055d4 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_trace). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 5bc3d9f5..c6007061 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_types). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 455134da..fde0dbe1 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 21fdcd66..457b1567 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_upgrade_functions). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90ee3439..f7c6c729 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -11,12 +11,12 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, @@ -262,8 +262,6 @@ durable, transient_threshold, - async_callback, - len, persistent_count, @@ -356,8 +354,6 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: rabbit_backing_queue:async_callback(), - len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, false, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, + init(IsDurable, IndexState, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, + init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). terminate(_Reason, State) -> @@ -519,6 +515,8 @@ purge(State = #vqstate { q4 = Q4, ram_msg_count = 0, persistent_count = PCount1 })}. +purge_acks(State) -> a(purge_pending_ack(false, State)). + publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, @@ -527,7 +525,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount, unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), @@ -538,12 +535,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, end, PCount1 = PCount + one_if(IsPersistent1), UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 })). + a(reduce_memory_use( + inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = UC1 }))). publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, @@ -596,7 +593,7 @@ fetchwhile(Pred, Fun, Acc, State) -> {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {Msg, State2} = read_msg(MsgStatus, false, State1), + true -> {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(true, MsgStatus, State2), fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} @@ -610,7 +607,7 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, false, State1), + {Msg, State2} = read_msg(MsgStatus, State1), {AckTag, State3} = remove(AckRequired, MsgStatus, State2), {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} end. @@ -672,30 +669,17 @@ ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = lists:foldl(fun(SeqId, {Acc0, State0}) -> MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, false, State0), + {Msg, State1} = read_msg(MsgStatus, State0), {MsgFun(Msg, SeqId, Acc0), State1} end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, #vqstate { q1 = Q1, - q2 = Q2, - delta = #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd }, - q3 = Q3, - q4 = Q4 } = State) -> - QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, false, State0), - {StopGo, AccNext} = - Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), - {StopGo, {AccNext, State1}} - end, - {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), - {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), - {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, - DeltaSeqId, DeltaSeqIdEnd, State2), - {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), - {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), - {Acc5, State5}. +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, + [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -784,24 +768,18 @@ ram_duration(State = #vqstate { needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> - case must_sync_index(State) of - true -> timed; - false -> - case rabbit_queue_index:needs_sync(IndexState) of - true -> idle; - false -> case TargetRamCount of - infinity -> false; - _ -> case - reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end - end - end + case rabbit_queue_index:needs_sync(IndexState) of + confirms -> timed; + other -> idle; + false when TargetRamCount == infinity -> false; + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end end. timeout(State = #vqstate { index_state = IndexState }) -> @@ -842,7 +820,8 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. -invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). +invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); +invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. @@ -894,15 +873,28 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a msg_id to the unconfirmed set -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message { id = MsgId }, MsgProps) -> - #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = false, index_on_disk = false, - msg_props = MsgProps }. + Msg = #basic_message {id = MsgId}, MsgProps) -> + #msg_status{seq_id = SeqId, + msg_id = MsgId, + msg = Msg, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = false, + index_on_disk = false, + msg_props = MsgProps}. + +beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> + #msg_status{seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps}. trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. @@ -969,7 +961,7 @@ maybe_write_delivered(true, SeqId, IndexState) -> betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, + fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, {Filtered1, Delivers1, Acks1} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -977,21 +969,10 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) -> [SeqId | Acks1]}; false -> case (gb_trees:is_defined(SeqId, RPA) orelse gb_trees:is_defined(SeqId, DPA)) of - false -> - {?QUEUE:in_r( - m(#msg_status { - seq_id = SeqId, - msg_id = MsgId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }), Filtered1), - Delivers1, Acks1}; - true -> - Acc + false -> {?QUEUE:in_r(m(beta_msg_status(M)), + Filtered1), + Delivers1, Acks1}; + true -> Acc end end end, {?QUEUE:new(), [], []}, List), @@ -1019,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, +init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), @@ -1045,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, durable = IsDurable, transient_threshold = NextSeqId, - async_callback = AsyncCallback, - len = DeltaCount1, persistent_count = DeltaCount1, @@ -1078,9 +1057,10 @@ in_r(MsgStatus = #msg_status { msg = undefined }, case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, true, State), - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { - msg = Msg }, Q4a) } + read_msg(MsgStatus, State), + inc_ram_msg_count( + State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status { + msg = Msg }, Q4a) }) end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. @@ -1096,18 +1076,20 @@ queue_out(State = #vqstate { q4 = Q4 }) -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} end. -read_msg(#msg_status { msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent }, - CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> +read_msg(#msg_status{msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent}, State) -> + read_msg(MsgId, IsPersistent, State); +read_msg(#msg_status{msg = Msg}, State) -> + {Msg, State}. + +read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), - RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam), - {Msg, State #vqstate { ram_msg_count = RamMsgCount1, - msg_store_clients = MSCState1 }}; -read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) -> - {Msg, State}. + {Msg, State #vqstate {msg_store_clients = MSCState1}}. + +inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) -> + State#vqstate{ram_msg_count = RamMsgCount + 1}. remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, @@ -1122,7 +1104,7 @@ remove(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount }) -> + persistent_count = PCount}) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -1151,11 +1133,11 @@ remove(AckRequired, MsgStatus = #msg_status { PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len - 1, - persistent_count = PCount1 }}. + {AckTag, State1 #vqstate {ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len - 1, + persistent_count = PCount1}}. purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1343,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), confirmed = gb_sets:union(C, MsgIdSet) }. -must_sync_index(#vqstate { msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - %% If UC is empty then by definition, MIOD and MOD are also empty - %% and there's nothing that can be pending a sync. - - %% If UC is not empty, then we want to find is_empty(UC - MIOD), - %% but the subtraction can be expensive. Thus instead, we test to - %% see if UC is a subset of MIOD. This can only be the case if - %% MIOD == UC, which would indicate that every message in UC is - %% also in MIOD and is thus _all_ pending on a msg_store sync, not - %% on a qi sync. Thus the negation of this is sufficient. Because - %% is_subset is short circuiting, this is more efficient than the - %% subtraction. - not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). - msgs_written_to_disk(Callback, MsgIdSet, ignored) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); @@ -1386,14 +1353,14 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue and fold +%% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, true, State), - {MsgStatus#msg_status { msg = Msg }, State1}; -publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> - {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. + {Msg, State1} = read_msg(MsgStatus, State), + {MsgStatus#msg_status { msg = Msg }, inc_ram_msg_count(State1)}; +publish_alpha(MsgStatus, State) -> + {MsgStatus, inc_ram_msg_count(State)}. publish_beta(MsgStatus, State) -> {#msg_status { msg = Msg} = MsgStatus1, @@ -1456,40 +1423,81 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; -qfoldl( Fun, {cont, Acc} = A, Q) -> +%%---------------------------------------------------------------------------- +%% Iterator +%%---------------------------------------------------------------------------- + +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). + +istate(start, State) -> {q4, State#vqstate.q4, State}; +istate(q4, State) -> {q3, State#vqstate.q3, State}; +istate(q3, State) -> {delta, State#vqstate.delta, State}; +istate(delta, State) -> {q2, State#vqstate.q2, State}; +istate(q2, State) -> {q1, State#vqstate.q1, State}; +istate(q1, _State) -> done. + +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, + {value, MsgStatus, true, Next, IndexState} + end; +next(done, IndexState) -> {empty, IndexState}; +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqId}, State}, IndexState) -> + next(istate(delta, State), IndexState); +next({delta, #delta{start_seq_id = SeqId, + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), + SeqId1 = lists:min([SeqIdB, SeqIdEnd]), + {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), + next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); +next({delta, Delta, [], State}, IndexState) -> + next({delta, Delta, State}, IndexState); +next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> + case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse + gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of + false -> Next = {delta, Delta, Rest, State}, + {value, beta_msg_status(M), false, Next, IndexState}; + true -> next({delta, Delta, Rest, State}, IndexState) + end; +next({Key, Q, State}, IndexState) -> case ?QUEUE:out(Q) of - {empty, _Q} -> A; - {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) + {empty, _Q} -> next(istate(Key, State), IndexState); + {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, + {value, MsgStatus, false, Next, IndexState} end. -lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; -lfoldl(_Fun, {cont, _Acc} = A, []) -> A; -lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). - -delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> - {stop, {Acc, State}}; -delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {cont, {Acc, State}}; -delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { index_state = IndexState, - msg_store_clients = MSCState } = State) -> - DeltaSeqId1 = lists:min( - [rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, - {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}} - end, {cont, {Acc, MSCState}}, List), - delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }). +inext(It, {Its, IndexState}) -> + case next(It, IndexState) of + {empty, IndexState1} -> + {Its, IndexState1}; + {value, MsgStatus1, Unacked, It1, IndexState1} -> + {[{MsgStatus1, Unacked, It1} | Its], IndexState1} + end. + +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, Unacked, It} | Rest] = + lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, + {#msg_status{seq_id = SeqId2}, _, _}) -> + SeqId1 =< SeqId2 + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), + ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) + end. %%---------------------------------------------------------------------------- %% Phase changes diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 1cc7d6c8..f81a4021 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_version). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 0bb18f4c..d0f39221 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_vhost). @@ -95,9 +95,9 @@ internal_delete(VHostPath) -> || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], [ok = rabbit_runtime_parameters:clear(VHostPath, proplists:get_value(component, Info), - proplists:get_value(key, Info)) + proplists:get_value(name, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], - [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index db674f91..b3e9ec66 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_vm). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..2d15e6a2 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(rabbit_writer). @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 5af38573..c98b528d 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -51,7 +51,7 @@ %% 5) normal, and {shutdown, _} exit reasons are all treated the same %% (i.e. are regarded as normal exits) %% -%% All modifications are (C) 2010-2012 VMware, Inc. +%% All modifications are (C) 2010-2013 VMware, Inc. %% %% %CopyrightBegin% %% diff --git a/src/supervisor2_tests.erl b/src/supervisor2_tests.erl index e42ded7b..f19a53e6 100644 --- a/src/supervisor2_tests.erl +++ b/src/supervisor2_tests.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. %% -module(supervisor2_tests). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 344196d7..0248f878 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(tcp_acceptor). diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index d8844441..61c747c9 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(tcp_acceptor_sup). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index fb01c792..90e84f94 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(tcp_listener). diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 9ee921b4..7f850dbc 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(tcp_listener_sup). diff --git a/src/test_sup.erl b/src/test_sup.erl index 7f4b5049..3342adb5 100644 --- a/src/test_sup.erl +++ b/src/test_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(test_sup). diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 5ce894a9..f70156b6 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% %% In practice Erlang shouldn't be allowed to grow to more than a half diff --git a/src/worker_pool.erl b/src/worker_pool.erl index c9ecccd6..3bdeb377 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(worker_pool). diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index ff356366..b9835f1e 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(worker_pool_sup). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 1ddcebb2..56e4b7b3 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -11,7 +11,7 @@ %% The Original Code is RabbitMQ. %% %% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% -module(worker_pool_worker). |