diff options
49 files changed, 1583 insertions, 743 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 16dfd196..339fa69e 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -10,7 +10,7 @@ rabbit_sup, rabbit_tcp_client_sup, rabbit_direct_client_sup]}, - {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, + {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, @@ -27,7 +27,7 @@ {frame_max, 131072}, {heartbeat, 600}, {msg_store_file_size_limit, 16777216}, - {queue_index_max_journal_entries, 262144}, + {queue_index_max_journal_entries, 65536}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, {default_user_tags, [administrator]}, @@ -44,6 +44,7 @@ {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, {reverse_dns_lookups, false}, + {cluster_partition_handling, ignore}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions index 183eb034..75b9fe46 100644 --- a/packaging/debs/apt-repository/distributions +++ b/packaging/debs/apt-repository/distributions @@ -2,6 +2,6 @@ Origin: RabbitMQ Label: RabbitMQ Repository for Debian / Ubuntu etc Suite: testing Codename: kitten -Architectures: arm hppa ia64 mips mipsel s390 sparc i386 amd64 powerpc source +Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source Components: main Description: RabbitMQ Repository for Debian / Ubuntu etc diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile new file mode 100644 index 00000000..89ccde93 --- /dev/null +++ b/packaging/standalone/Makefile @@ -0,0 +1,82 @@ +VERSION=0.0.0 +SOURCE_DIR=rabbitmq-server-$(VERSION) +TARGET_DIR=rabbitmq_server-$(VERSION) +TARGET_TARBALL=rabbitmq-server-$(OS)-standalone-$(VERSION) +RLS_DIR=$(TARGET_DIR)/release/$(TARGET_DIR) + +ERTS_VSN=$(shell erl -noshell -eval 'io:format("~s", [erlang:system_info(version)]), halt().') +ERTS_ROOT_DIR=$(shell erl -noshell -eval 'io:format("~s", [code:root_dir()]), halt().') + +# used to generate the erlang release +RABBITMQ_HOME=$(TARGET_DIR) +RABBITMQ_EBIN_ROOT=$(RABBITMQ_HOME)/ebin +RABBITMQ_PLUGINS_DIR=$(RABBITMQ_HOME)/plugins +RABBITMQ_PLUGINS_EXPAND_DIR=$(RABBITMQ_PLUGINS_DIR)/expand + +RABBITMQ_DEFAULTS=$(TARGET_DIR)/sbin/rabbitmq-defaults +fix_defaults = sed -e $(1) $(RABBITMQ_DEFAULTS) > $(RABBITMQ_DEFAULTS).tmp \ + && mv $(RABBITMQ_DEFAULTS).tmp $(RABBITMQ_DEFAULTS) + +dist: + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz + + $(MAKE) -C $(SOURCE_DIR) \ + TARGET_DIR=`pwd`/$(TARGET_DIR) \ + SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ + MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ + install + +## Here we set the RABBITMQ_HOME variable, +## then we make ERL_DIR point to our released erl +## and we add the paths to our released start_clean and start_sasl boot scripts + $(call fix_defaults,'s:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:') + $(call fix_defaults,'s:^ERL_DIR=$$:ERL_DIR=\$${RABBITMQ_HOME}/erts-$(ERTS_VSN)/bin/:') + $(call fix_defaults,'s:start_clean$$:"\$${SYS_PREFIX}/releases/$(VERSION)/start_clean":') + $(call fix_defaults,'s:start_sasl:"\$${SYS_PREFIX}/releases/$(VERSION)/start_sasl":') + + chmod 0755 $(RABBITMQ_DEFAULTS) + + mkdir -p $(TARGET_DIR)/etc/rabbitmq + + $(MAKE) generate_release + + mkdir -p $(RLS_DIR) + tar -C $(RLS_DIR) -xzf $(RABBITMQ_HOME)/rabbit.tar.gz + +# add minimal boot file + cp $(ERTS_ROOT_DIR)/bin/start_clean.boot $(RLS_DIR)/releases/$(VERSION) + cp $(ERTS_ROOT_DIR)/bin/start_sasl.boot $(RLS_DIR)/releases/$(VERSION) + +# move rabbitmq files to top level folder + mv $(RLS_DIR)/lib/rabbit-$(VERSION)/* $(RLS_DIR) + +# remove empty lib/rabbit-$(VERSION) folder + rm -rf $(RLS_DIR)/lib/rabbit-$(VERSION) + +# fix Erlang ROOTDIR + patch -o $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl.src < erl.diff + + tar -zcf $(TARGET_TARBALL).tar.gz -C $(TARGET_DIR)/release $(TARGET_DIR) + rm -rf $(SOURCE_DIR) $(TARGET_DIR) + +clean: clean_partial + rm -f rabbitmq-server-$(OS)-standalone-*.tar.gz + +clean_partial: + rm -rf $(SOURCE_DIR) + rm -rf $(TARGET_DIR) + +.PHONY : generate_release +generate_release: + erlc \ + -I $(TARGET_DIR)/include/ -o src -Wall \ + -v +debug_info -Duse_specs -Duse_proper_qc \ + -pa $(TARGET_DIR)/ebin/ src/rabbit_release.erl + erl \ + -pa "$(RABBITMQ_EBIN_ROOT)" \ + -pa src \ + -noinput \ + -hidden \ + -s rabbit_release \ + -extra "$(RABBITMQ_PLUGINS_DIR)" "$(RABBITMQ_PLUGINS_EXPAND_DIR)" "$(RABBITMQ_HOME)" + rm src/rabbit_release.beam diff --git a/packaging/standalone/erl.diff b/packaging/standalone/erl.diff new file mode 100644 index 00000000..c51bfe22 --- /dev/null +++ b/packaging/standalone/erl.diff @@ -0,0 +1,5 @@ +20c20,21 +< ROOTDIR="%FINAL_ROOTDIR%" +--- +> realpath() { [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" ; } +> ROOTDIR="$(dirname `realpath $0`)/../.." diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl new file mode 100644 index 00000000..26f36d68 --- /dev/null +++ b/packaging/standalone/src/rabbit_release.erl @@ -0,0 +1,152 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% +-module(rabbit_release). + +-export([start/0]). + +-include("rabbit.hrl"). + +-define(BaseApps, [rabbit]). +-define(ERROR_CODE, 1). + +%% We need to calculate all the ERTS apps we need to ship with a +%% standalone rabbit. To acomplish that we need to unpack and load the plugins +%% apps that are shiped with rabbit. +%% Once we get that we generate an erlang release inside a tarball. +%% Our make file will work with that release to generate our final rabbitmq +%% package. +start() -> + %% Determine our various directories + [PluginsDistDir, UnpackedPluginDir, RabbitHome] = + init:get_plain_arguments(), + RootName = UnpackedPluginDir ++ "/rabbit", + + %% extract the plugins so we can load their apps later + prepare_plugins(PluginsDistDir, UnpackedPluginDir), + + %% add the plugin ebin folder to the code path. + add_plugins_to_path(UnpackedPluginDir), + + PluginAppNames = [P#plugin.name || + P <- rabbit_plugins:list(PluginsDistDir)], + + %% Build the entire set of dependencies - this will load the + %% applications along the way + AllApps = case catch sets:to_list(expand_dependencies(PluginAppNames)) of + {failed_to_load_app, App, Err} -> + terminate("failed to load application ~s:~n~p", + [App, Err]); + AppList -> + AppList + end, + + %% we need a list of ERTS apps we need to ship with rabbit + BaseApps = AllApps -- PluginAppNames, + + AppVersions = [determine_version(App) || App <- BaseApps], + RabbitVersion = proplists:get_value(rabbit, AppVersions), + + %% Build the overall release descriptor + RDesc = {release, + {"rabbit", RabbitVersion}, + {erts, erlang:system_info(version)}, + AppVersions}, + + %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel + rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), + + %% Compile the script + systools:make_script(RootName), + systools:script2boot(RootName), + %% Make release tarfile + make_tar(RootName, RabbitHome), + rabbit_misc:quit(0). + +make_tar(Release, RabbitHome) -> + systools:make_tar(Release, + [ + {dirs, [docs, etc, include, plugins, sbin, share]}, + {erts, code:root_dir()}, + {outdir, RabbitHome} + ]). + +determine_version(App) -> + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), + {App, Vsn}. + +delete_recursively(Fn) -> + case rabbit_file:recursive_delete([Fn]) of + ok -> ok; + {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; + Error -> Error + end. + +prepare_plugins(PluginsDistDir, DestDir) -> + %% Eliminate the contents of the destination directory + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2]) + end, + + [prepare_plugin(Plugin, DestDir) || + Plugin <- rabbit_plugins:list(PluginsDistDir)]. + +prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> + zip:unzip(Location, [{cwd, PluginDestDir}]); +prepare_plugin(#plugin{type = dir, name = Name, location = Location}, + PluginsDestDir) -> + rabbit_file:recursive_copy(Location, + filename:join([PluginsDestDir, Name])). + +expand_dependencies(Pending) -> + expand_dependencies(sets:new(), Pending). +expand_dependencies(Current, []) -> + Current; +expand_dependencies(Current, [Next|Rest]) -> + case sets:is_element(Next, Current) of + true -> + expand_dependencies(Current, Rest); + false -> + case application:load(Next) of + ok -> + ok; + {error, {already_loaded, _}} -> + ok; + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) + end, + {ok, Required} = application:get_key(Next, applications), + Unique = [A || A <- Required, not(sets:is_element(A, Current))], + expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) + end. + +add_plugins_to_path(PluginDir) -> + [add_plugin_to_path(PluginName) || + PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. + +add_plugin_to_path(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN). + +terminate(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + rabbit_misc:quit(?ERROR_CODE). diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults index db1d4f2b..83c5639d 100644 --- a/scripts/rabbitmq-defaults +++ b/scripts/rabbitmq-defaults @@ -18,6 +18,12 @@ ### next line potentially updated in package install steps SYS_PREFIX= +### next line will be updated when generating a standalone release +ERL_DIR= + +CLEAN_BOOT_FILE=start_clean +SASL_BOOT_FILE=start_sasl + ## Set default values CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 43f450c0..c043c90a 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -26,11 +26,12 @@ ##--- End of overridden <var_name> variables -exec erl \ +exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ -sname rabbitmq-plugins$$ \ + -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 713d7000..4b4dbe47 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -23,8 +23,12 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 184ae931..161ec2e6 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -82,7 +82,8 @@ case "$(uname -s)" in esac RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" -if ! erl -pa "$RABBITMQ_EBIN_ROOT" \ +if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ + -boot "${CLEAN_BOOT_FILE}" \ -noinput \ -hidden \ -s rabbit_prelaunch \ @@ -103,11 +104,11 @@ RABBITMQ_LISTEN_ARG= # there is no other way of preventing their expansion. set -f -exec erl \ +exec ${ERL_DIR}erl \ -pa ${RABBITMQ_EBIN_ROOT} \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ - -boot start_sasl \ + -boot "${SASL_BOOT_FILE}" \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 00fffa9f..0368db3f 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -26,12 +26,13 @@ ##--- End of overridden <var_name> variables -exec erl \ +exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ + -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_control_main \ -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/src/rabbit.erl b/src/rabbit.erl index 6b730fda..3cfa21ba 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -236,7 +236,7 @@ {memory, any()}]). -spec(is_running/0 :: () -> boolean()). -spec(is_running/1 :: (node()) -> boolean()). --spec(environment/0 :: () -> [{param() | term()}]). +-spec(environment/0 :: () -> [{param(), term()}]). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). -spec(force_event_refresh/0 :: () -> 'ok'). @@ -355,6 +355,8 @@ handle_app_error(App, Reason) -> throw({could_not_start, App, Reason}). start_it(StartFun) -> + Marker = spawn_link(fun() -> receive stop -> ok end end), + register(rabbit_boot, Marker), try StartFun() catch @@ -363,11 +365,17 @@ start_it(StartFun) -> _:Reason -> boot_error(Reason, erlang:get_stacktrace()) after + unlink(Marker), + Marker ! stop, %% give the error loggers some time to catch up timer:sleep(100) end. stop() -> + case whereis(rabbit_boot) of + undefined -> ok; + _ -> await_startup() + end, rabbit_log:info("Stopping RabbitMQ~n"), ok = app_utils:stop_applications(app_shutdown_order()). @@ -435,8 +443,9 @@ start(normal, []) -> case erts_version_check() of ok -> {ok, Vsn} = application:get_key(rabbit, vsn), - error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n", - [Vsn, erlang:system_info(otp_release)]), + error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n", + [Vsn, erlang:system_info(otp_release), + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), print_banner(), @@ -699,10 +708,12 @@ force_event_refresh() -> log_broker_started(Plugins) -> rabbit_misc:with_local_io( fun() -> + PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P]) + || P <- Plugins]), error_logger:info_msg( - "Server startup complete; plugins are: ~p~n", [Plugins]), - io:format("~n Broker running with ~p plugins.~n", - [length(Plugins)]) + "Server startup complete; ~b plugins started.~n~s", + [length(Plugins), PluginList]), + io:format(" completed with ~p plugins.~n", [length(Plugins)]) end). erts_version_check() -> @@ -716,43 +727,38 @@ erts_version_check() -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~n## ## ~s ~s. ~s" - "~n## ## ~s" - "~n##########" - "~n###### ## Logs: ~s" - "~n########## ~s~n", + io:format("~n ~s ~s. ~s" + "~n ## ## ~s" + "~n ## ##" + "~n ########## Logs: ~s" + "~n ###### ## ~s" + "~n ##########" + "~n Starting broker...", [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, log_location(kernel), log_location(sasl)]). log_banner() -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), Settings = [{"node", node()}, {"home dir", home_dir()}, {"config file(s)", config_files()}, {"cookie hash", rabbit_nodes:cookie_hash()}, {"log", log_location(kernel)}, {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}, - {"erlang version", erlang:system_info(otp_release)}], + {"database dir", rabbit_mnesia:dir()}], DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]), Format = fun (K, V) -> rabbit_misc:format( "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V]) end, Banner = iolist_to_binary( - rabbit_misc:format( - "~s ~s~n~s~n~s~n", - [Product, Version, ?COPYRIGHT_MESSAGE, - ?INFORMATION_MESSAGE]) ++ - [case S of - {"config file(s)" = K, []} -> - Format(K, "(none)"); - {"config file(s)" = K, [V0 | Vs]} -> - Format(K, V0), [Format("", V) || V <- Vs]; - {K, V} -> - Format(K, V) - end || S <- Settings]), + [case S of + {"config file(s)" = K, []} -> + Format(K, "(none)"); + {"config file(s)" = K, [V0 | Vs]} -> + Format(K, V0), [Format("", V) || V <- Vs]; + {K, V} -> + Format(K, V) + end || S <- Settings]), error_logger:info_msg("~s", [Banner]). home_dir() -> diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 362b11aa..6d24d130 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -208,11 +208,19 @@ internal_register(Pid, {M, F, A} = AlertMFA, State#alarms{alertees = NewAlertees}. handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> - rabbit_log:warning("~s resource limit alarm set on node ~p~n", - [Source, Node]), + rabbit_log:warning( + "~s resource limit alarm set on node ~p.~n~n" + "**********************************************************~n" + "*** Publishers will be blocked until this alarm clears ***~n" + "**********************************************************~n", + [Source, Node]), {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> - rabbit_log:warning("file descriptor limit alarm set~n"), + rabbit_log:warning( + "file descriptor limit alarm set.~n~n" + "********************************************************************~n" + "*** New connections will not be accepted until this alarm clears ***~n" + "********************************************************************~n"), {ok, State}; handle_set_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' set~n", [Alarm]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9228755e..8c00c85c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,7 +17,7 @@ -module(rabbit_amqqueue). -export([recover/0, stop/0, start/1, declare/5, - delete_immediately/1, delete/3, purge/1]). + delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([basic_get/4, basic_consume/9, basic_cancel/4]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -135,6 +135,7 @@ rabbit_types:error('in_use') | rabbit_types:error('not_empty')). -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). +-spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> @@ -143,19 +144,20 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> - ok_or_errors()). --spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> +-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). +-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). +-spec(basic_consume/9 :: + (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), + rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(resume/2 :: (pid(), pid()) -> 'ok'). -spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | @@ -406,7 +408,8 @@ args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}]. + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}]. check_string_arg({longstr, _}, _Args) -> ok; check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. @@ -417,6 +420,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; @@ -529,16 +539,19 @@ notify_down_all(QPids, ChPid) -> Bads1 -> {error, Bads1} end. -limit_all(QPids, ChPid, Limiter) -> - delegate:cast(QPids, {limit, ChPid, Limiter}). +activate_limit_all(QPids, ChPid) -> + delegate:cast(QPids, {activate_limit, ChPid}). -basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:call(QPid, {basic_get, ChPid, NoAck}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> + delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). + +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -560,7 +573,7 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). +resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). @@ -591,6 +604,24 @@ internal_delete(QueueName) -> end end). +forget_all_durable(Node) -> + %% Note rabbit is not running so we avoid e.g. the worker pool. Also why + %% we don't invoke the return from rabbit_binding:process_deletions/1. + {atomic, ok} = + mnesia:sync_transaction( + fun () -> + Qs = mnesia:match_object(rabbit_durable_queue, + #amqqueue{_ = '_'}, write), + [rabbit_binding:process_deletions( + internal_delete1(Name)) || + #amqqueue{name = Name, pid = Pid} = Q <- Qs, + node(Pid) =:= Node, + rabbit_policy:get(<<"ha-mode">>, Q) + =:= {error, not_found}], + ok + end), + ok. + run_backing_queue(QPid, Mod, Fun) -> gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 27d73445..b016c4d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,6 +55,7 @@ queue_monitors, dlx, dlx_routing_key, + max_length, status }). @@ -65,9 +66,12 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% The limiter itself limiter, - is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -164,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. + rabbit_event:if_enabled(State, #q.stats_timer, + fun() -> emit_stats(State) end), rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -242,7 +248,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -254,6 +261,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -347,9 +356,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(#q{active_consumers = AC, - backing_queue = BQ, backing_queue_state = BQS}) -> - true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). +assert_invariant(State = #q{active_consumers = AC}) -> + true = (queue:is_empty(AC) orelse is_empty(State)). + +is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -357,17 +367,17 @@ lookup_ch(ChPid) -> C -> C end. -ch_record(ChPid) -> +ch_record(ChPid, LimiterPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - is_limit_active = false, - limiter = rabbit_limiter:make_token(), + limiter = Limiter, unsent_message_count = 0}, put(Key, C), C; @@ -387,37 +397,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), ok. -erase_ch_record(#cr{ch_pid = ChPid, - limiter = Limiter, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(Limiter, self()), +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. -update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> - ok = rabbit_limiter:register(Limiter, self()), - update_ch_record(C#cr{consumer_count = 1}); -update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> - ok = rabbit_limiter:unregister(Limiter, self()), - update_ch_record(C#cr{consumer_count = 0, - limiter = rabbit_limiter:make_token()}); -update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> - update_ch_record(C#cr{consumer_count = Count + Delta}). - all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). -is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + +maybe_send_drained(WasEmpty, State) -> + case (not WasEmpty) andalso is_empty(State) of + true -> [send_drained(C) || C <- all_ch_record()]; + false -> ok + end, + State. -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> ok; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) end. deliver_msgs_to_consumers(_DeliverFun, true, State) -> @@ -435,18 +440,21 @@ deliver_msgs_to_consumers(DeliverFun, false, end. deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> - C = ch_record(ChPid), + C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + {false, State}; + {continue, Limiter} -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Limiter}, + State#q{active_consumers = AC1}) end end. @@ -471,9 +479,7 @@ deliver_msg_to_consumer(DeliverFun, deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State1), - {Result, BQ:is_empty(BQS), State2}. + {Result, is_empty(State1), State1}. confirm_messages([], State) -> State; @@ -521,12 +527,10 @@ discard(#delivery{sender = SenderPid, State1#q{backing_queue_state = BQS1}. run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + is_empty(State), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -561,19 +565,56 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that if a new message that might have an + %% expiry ends up at the head of the queue. If the head + %% remains unchanged, or if the newly published message + %% has no expiry and becomes the head of the queue then + %% the call is unnecessary. + case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) + end + end. + +maybe_drop_head(State = #q{max_length = undefined}) -> + {0, State}; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) - MaxLen of + Excess when Excess > 0 -> + {Excess, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, + fun () -> + {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> + BQ:drop(false, BQS0) + end, {ok, BQS}, + lists:seq(1, Excess)), + State#q{backing_queue_state = BQS1} + end)}; + _ -> {0, State} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), + run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -602,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end + end. + +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -712,9 +762,19 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_msgs(State = #q{backing_queue_state = BQS, - backing_queue = BQ }) -> - Now = now_micros(), +%% Logically this function should invoke maybe_send_drained/2. +%% However, that is expensive. Since some frequent callers of +%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot +%% possibly cause the queue to become empty, we push the +%% responsibility to the callers. So be cautious when adding new ones. +drop_expired_msgs(State) -> + case is_empty(State) of + true -> State; + false -> drop_expired_msgs(now_micros(), State) + end. + +drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, + backing_queue = BQ }) -> ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = with_dlx( @@ -723,8 +783,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS, fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), {Next, State#q{backing_queue_state = BQS1}} end), ensure_ttl_timer(case Props of - undefined -> undefined; - #message_properties{expiry = Exp} -> Exp + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp end, State1). with_dlx(undefined, _With, Without) -> Without(); @@ -747,6 +807,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + lists:foldl(fun (_, {ok, Acc0, BQS0}) -> + {{Msg, _, AckTag}, BQS1} = + BQ:fetch(true, BQS0), + {ok, DLFun(Msg, AckTag, Acc0), BQS1} + end, {ok, Acc, BQS}, lists:seq(1, Excess)) + end, maxlen, X, State), + State1. + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, @@ -1055,17 +1127,18 @@ handle_call({notify_down, ChPid}, From, State) -> {stop, State1} -> stop(From, ok, State1) end; -handle_call({basic_get, ChPid, NoAck}, _From, +handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), ChAckTags1 = queue:in(AckTag, ChAckTags), update_ch_record(C#cr{acktags = ChAckTags1}), State2; @@ -1075,15 +1148,30 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, BQ:len(BQS), Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1092,18 +1180,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - E = {ChPid, Consumer}, - State2 = - case is_ch_blocked(C1) of - true -> block_consumer(C1, E), - State1; - false -> update_ch_record(C1), - AC1 = queue:in(E, State1#q.active_consumers), - run_message_queue(State1#q{active_consumers = AC1}) - end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State2)), - reply(ok, State2) + not NoAck, qname(State1)), + AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, @@ -1112,10 +1192,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1132,7 +1221,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), + ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, @@ -1148,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From, handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1211,8 +1301,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - noreply(run_message_queue( - State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); + noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> @@ -1244,10 +1333,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> handle_cast(delete_immediately, State) -> stop(State); -handle_cast({unblock, ChPid}, State) -> +handle_cast({resume, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end)); handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( @@ -1256,21 +1347,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) -> C#cr{unsent_message_count = Count - Credit} end)); -handle_cast({limit, ChPid, Limiter}, State) -> +handle_cast({activate_limit, ChPid}, State) -> noreply( - possibly_unblock( - State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter = OldLimiter, - is_limit_active = OldLimited}) -> - case (ConsumerCount =/= 0 andalso - not rabbit_limiter:is_enabled(OldLimiter)) of - true -> ok = rabbit_limiter:register(Limiter, self()); - false -> ok - end, - Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} - end)); + possibly_unblock(State, ChPid, + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), @@ -1302,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end + end); + handle_cast(wake_up, State) -> noreply(State). @@ -1321,7 +1421,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + WasEmpty = is_empty(State), + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index 1ed54fef..847a38f5 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -33,8 +33,7 @@ %% referring generically to "SASL security mechanism", i.e. the above. description() -> - [{name, <<"AMQPLAIN">>}, - {description, <<"QPid AMQPLAIN mechanism">>}]. + [{description, <<"QPid AMQPLAIN mechanism">>}]. should_offer(_Sock) -> true. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index e4494ab4..4b08e4be 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -37,8 +37,7 @@ %% SECURE-OK: "My password is ~s", [Password] description() -> - [{name, <<"RABBIT-CR-DEMO">>}, - {description, <<"RabbitMQ Demo challenge-response authentication " + [{description, <<"RabbitMQ Demo challenge-response authentication " "mechanism">>}]. should_offer(_Sock) -> diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 5553a641..a35a133a 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -36,8 +36,7 @@ %% matching and will thus be much faster. description() -> - [{name, <<"PLAIN">>}, - {description, <<"SASL PLAIN authentication mechanism">>}]. + [{description, <<"SASL PLAIN authentication mechanism">>}]. should_offer(_Sock) -> true. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c2b52a7c..2f247448 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,8 +18,6 @@ -ifdef(use_specs). --export_type([async_callback/0]). - %% We can't specify a per-queue ack/state with callback signatures -type(ack() :: any()). -type(state() :: any()). diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6096e07b..cb86e5ae 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -40,8 +40,11 @@ [{'not_found', (rabbit_types:binding_source() | rabbit_types:binding_destination())} | {'absent', rabbit_types:amqqueue()}]})). + -type(bind_ok_or_error() :: 'ok' | bind_errors() | - rabbit_types:error('binding_not_found')). + rabbit_types:error( + 'binding_not_found' | + {'binding_invalid', string(), [any()]})). -type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())). -type(inner_fun() :: fun((rabbit_types:exchange(), @@ -157,15 +160,22 @@ add(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(Src, Dst) of - ok -> case mnesia:read({rabbit_route, B}) of - [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/0 - end; - {error, _} = Err -> rabbit_misc:const(Err) + case rabbit_exchange:validate_binding(Src, B) of + ok -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(Src, Dst) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> add(Src, Dst, B); + [_] -> fun rabbit_misc:const_ok/0 + end; + {error, _} = Err -> + rabbit_misc:const(Err) + end; + {error, _} = Err -> + rabbit_misc:const(Err) end end). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 877c73c3..52c6140e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -81,8 +82,8 @@ -spec(start_link/11 :: (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), - rabbit_framing:amqp_table(), - pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). + rabbit_framing:amqp_table(), pid(), pid()) -> + rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -94,6 +95,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) + -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). @@ -138,6 +142,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, CTagCredit) -> + gen_server2:cast(Pid, {send_drained, CTagCredit}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -180,7 +190,7 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, - Capabilities, CollectorPid, Limiter]) -> + Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, @@ -190,7 +200,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = Limiter, + limiter = rabbit_limiter:new(LimiterPid), tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> + [ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}) + || {ConsumerTag, CreditDrained} <- CTagCredit], + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); @@ -372,6 +394,8 @@ terminate(Reason, State) -> _ -> ok end, pg_local:leave(rabbit_channels, self()), + rabbit_event:if_enabled(State, #ch.stats_timer, + fun() -> emit_stats(State) end), rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> @@ -544,14 +568,17 @@ check_name(_Kind, NameBin) -> queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case sets:is_element(QPid, Blocking) of false -> State; - true -> Blocking1 = sets:del_element(QPid, Blocking), - case sets:size(Blocking1) of - 0 -> ok = send(#'channel.flow_ok'{active = false}, State); - _ -> ok - end, - State#ch{blocking = Blocking1} + true -> maybe_send_flow_ok( + State#ch{blocking = sets:del_element(QPid, Blocking)}) end. +maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> + case sets:size(Blocking) of + 0 -> ok = send(#'channel.flow_ok'{active = false}, State); + _ -> ok + end, + State. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -586,6 +613,15 @@ handle_method(_Method, _, State = #ch{state = closing}) -> handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> {ok, State1} = notify_queues(State), + %% We issue the channel.close_ok response after a handshake with + %% the reader, the other half of which is ready_for_close. That + %% way the reader forgets about the channel before we send the + %% response (and this channel process terminates). If we didn't do + %% that, a channel.open for the same channel number, which a + %% client is entitled to send as soon as it has received the + %% close_ok, might be received by the reader before it has seen + %% the termination and hence be sent to the old, now dead/dying + %% channel process, instead of a new process, and thus lost. ReaderPid ! {channel_closing, self()}, {noreply, State1}; @@ -664,12 +700,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, conn_pid = ConnPid, + limiter = Limiter, next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, - fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get( + Q, self(), NoAck, rabbit_limiter:pid(Limiter)) + end) of {ok, MessageCount, Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -694,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -716,8 +756,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), Limiter, + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -791,19 +834,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, +handle_method(#'basic.qos'{prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> - Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of - {false, 0} -> Limiter; - {false, _} -> enable_limiter(State); - {_, _} -> Limiter - end, - Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of - ok -> Limiter1; - {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), - Limiter2 - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, + State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> + Limiter1 = rabbit_limiter:limit_prefetch(Limiter, + PrefetchCount, queue:len(UAMQ)), + {reply, #'basic.qos_ok'{}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, @@ -1066,27 +1107,44 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter = Limiter}) -> - Limiter2 = case rabbit_limiter:unblock(Limiter) of - ok -> Limiter; - {disabled, Limiter1} -> ok = limit_queues(Limiter1, State), - Limiter1 - end, - {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}}; + Limiter1 = rabbit_limiter:unblock(Limiter), + {reply, #'channel.flow_ok'{active = true}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'channel.flow'{active = false}, _, State = #ch{consumer_mapping = Consumers, limiter = Limiter}) -> - Limiter1 = case rabbit_limiter:is_enabled(Limiter) of - true -> Limiter; - false -> enable_limiter(State) - end, - State1 = State#ch{limiter = Limiter1}, - ok = rabbit_limiter:block(Limiter1), - case consumer_queues(Consumers) of - [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, + case rabbit_limiter:is_blocked(Limiter) of + true -> {noreply, maybe_send_flow_ok(State)}; + false -> Limiter1 = rabbit_limiter:block(Limiter), + State1 = maybe_limit_queues(Limiter, Limiter1, + State#ch{limiter = Limiter1}), + %% The semantics of channel.flow{active=false} + %% require that no messages are delivered after the + %% channel.flow_ok has been sent. We accomplish that + %% by "flushing" all messages in flight from the + %% consumer queues to us. To do this we tell all the + %% queues to invoke rabbit_channel:flushed/2, which + %% will send us a {flushed, ...} message that appears + %% *after* all the {deliver, ...} messages. We keep + %% track of all the QPids thus asked, and once all of + %% them have responded (or died) we send the + %% channel.flow_ok. + QPids = consumer_queues(Consumers), ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, State2} + {noreply, maybe_send_flow_ok( + State1#ch{blocking = sets:from_list(QPids)})} + end; + +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) end; handle_method(_MethodRecord, _Content, _State) -> @@ -1155,6 +1213,16 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1185,6 +1253,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); + {error, {binding_invalid, Fmt, Args}} -> + rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) @@ -1321,14 +1391,14 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -enable_limiter(State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> - Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), - ok = limit_queues(Limiter1, State), - Limiter1. - -limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter). +maybe_limit_queues(OldLimiter, NewLimiter, State) -> + case ((not rabbit_limiter:is_active(OldLimiter)) andalso + rabbit_limiter:is_active(NewLimiter)) of + true -> Queues = consumer_queues(State#ch.consumer_mapping), + rabbit_amqqueue:activate_limit_all(Queues, self()); + false -> ok + end, + State. consumer_queues(Consumers) -> lists:usort([QPid || @@ -1339,7 +1409,9 @@ consumer_queues(Consumers) -> %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) notify_limiter(Limiter, Acked) -> - case rabbit_limiter:is_enabled(Limiter) of + %% optimisation: avoid the potentially expensive 'foldl' in the + %% common case. + case rabbit_limiter:is_prefetch_limited(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1504,7 +1576,7 @@ i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> - rabbit_limiter:get_limit(Limiter); + rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> rabbit_limiter:is_blocked(Limiter); i(Item, _) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 8ea44a81..a0c7624b 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; @@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index b396b289..3bb163a1 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -31,6 +31,7 @@ -record(state, {dir, limit, + actual, timeout, timer, alarmed @@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; -handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> - {reply, get_disk_free(Dir), State}; +handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> + {reply, Actual, State}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit, _ -> ok end, - State #state {alarmed = NewAlarmed}. + State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}. get_disk_free(Dir) -> get_disk_free(Dir, os:type()). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 3efc9c0c..eb6247e0 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -76,6 +76,9 @@ init_file(File, PrevHandler) -> Error -> Error end. +%% filter out "application: foo; exited: stopped; type: temporary" +handle_event({info_report, _, {_, std_info, _}}, State) -> + {ok, State}; handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 88033f77..9e98448d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -22,7 +22,7 @@ assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, info_keys/0, info/1, info/2, info_all/1, info_all/2, - route/2, delete/2]). + route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). @@ -83,6 +83,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). +-spec(validate_binding/2 :: + (rabbit_types:exchange(), rabbit_types:binding()) + -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). @@ -117,14 +120,18 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- decorators()], + M <- registry_lookup(exchange_decorator)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). -policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). +policy_changed(X = #exchange{type = XType}, X1) -> + [ok = M:policy_changed(X, X1) || + M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], + ok. serialise_events(X = #exchange{type = Type}) -> - lists:any(fun (M) -> M:serialise_events(X) end, decorators()) + lists:any(fun (M) -> M:serialise_events(X) end, + registry_lookup(exchange_decorator)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -136,8 +143,15 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -decorators() -> - [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. +registry_lookup(exchange_decorator_route = Class) -> + case get(exchange_decorator_route_modules) of + undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], + put(exchange_decorator_route_modules, Mods), + Mods; + Mods -> Mods + end; +registry_lookup(Class) -> + [M || {_, M} <- rabbit_registry:lookup_all(Class)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, @@ -304,32 +318,42 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -%% Optimisation -route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, - #delivery{message = #basic_message{routing_keys = RKs}}) -> - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; - -route(X = #exchange{name = XName}, Delivery) -> - route1(Delivery, {[X], XName, []}). +route(#exchange{name = #resource{virtual_host = VHost, + name = RName} = XName} = X, + #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> + case {registry_lookup(exchange_decorator_route), RName == <<"">>} of + {[], true} -> + %% Optimisation + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + {Decorators, _} -> + lists:usort(route1(Delivery, Decorators, {[X], XName, []})) + end. -route1(_, {[], _, QNames}) -> - lists:usort(QNames); -route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> - DstNames = process_alternate( - X, ((type_to_module(Type)):route(X, Delivery))), - route1(Delivery, +route1(_, _, {[], _, QNames}) -> + QNames; +route1(Delivery, Decorators, + {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> + ExchangeDests = (type_to_module(Type)):route(X, Delivery), + DecorateDests = process_decorators(X, Decorators, Delivery), + AlternateDests = process_alternate(X, ExchangeDests), + route1(Delivery, Decorators, lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames}, - DstNames)). + AlternateDests ++ DecorateDests ++ ExchangeDests)). -process_alternate(#exchange{arguments = []}, Results) -> %% optimisation - Results; +process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation + []; process_alternate(#exchange{name = XName, arguments = Args}, []) -> case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of undefined -> []; AName -> [AName] end; -process_alternate(_X, Results) -> - Results. +process_alternate(_X, _Results) -> + []. + +process_decorators(_, [], _) -> %% optimisation + []; +process_decorators(X, Decorators, Delivery) -> + lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). process_route(#resource{kind = exchange} = XName, {_WorkList, XName, _QNames} = Acc) -> @@ -381,6 +405,10 @@ delete(XName, IfUnused) -> end end). +validate_binding(X = #exchange{type = XType}, Binding) -> + Module = type_to_module(XType), + Module:validate_binding(X, Binding). + maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> @@ -422,8 +450,7 @@ peek_serial(XName, LockType) -> end. invalid_module(T) -> - rabbit_log:warning( - "Could not find exchange type ~s.~n", [T]), + rabbit_log:warning("Could not find exchange type ~s.~n", [T]), put({xtype_to_module, T}, rabbit_exchange_type_invalid), rabbit_exchange_type_invalid. diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index befbc462..040b55db 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -21,9 +21,8 @@ %% 1) It applies to all exchanges as soon as it is installed, therefore %% 2) It is not allowed to affect validation, so no validate/1 or %% assert_args_equivalence/2 -%% 3) It also can't affect routing %% -%% It's possible in the future we might relax 3), or even make these +%% It's possible in the future we might make decorators %% able to manipulate messages as they are published. -ifdef(use_specs). @@ -46,6 +45,10 @@ -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. +%% called when the policy attached to this exchange changes. +-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> + 'ok'. + %% called after a binding has been added or recovered -callback add_binding(serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'. @@ -54,9 +57,10 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% called when the policy attached to this exchange changes. --callback policy_changed ( - serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. +%% Decorators can optionally implement route/2 which allows additional +%% destinations to be added to the routing decision. +%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> +%% [rabbit_amqqueue:name() | rabbit_exchange:name()]. -else. @@ -64,7 +68,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 1fbcb2d8..ebc59501 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -37,6 +37,10 @@ %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} -callback validate(rabbit_types:exchange()) -> 'ok'. +%% called BEFORE declaration, to check args etc +-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) -> + rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). + %% called after declaration and recovery -callback create(tx(), rabbit_types:exchange()) -> 'ok'. @@ -44,6 +48,10 @@ -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. +%% called when the policy attached to this exchange changes. +-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> + 'ok'. + %% called after a binding has been added or recovered -callback add_binding(serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'. @@ -58,18 +66,15 @@ rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). -%% called when the policy attached to this exchange changes. --callback policy_changed(serial(), rabbit_types:exchange(), - rabbit_types:exchange()) -> 'ok'. - -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, + [{description, 0}, {serialise_events, 0}, {route, 2}, + {validate, 1}, {validate_binding, 2}, {policy_changed, 2}, {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, - {assert_args_equivalence, 2}, {policy_changed, 3}]; + {assert_args_equivalence, 2}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index e54bd66e..10a79c55 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,8 +20,9 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, - add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, @@ -31,8 +32,7 @@ {enables, kernel_ready}]}). description() -> - [{name, <<"direct">>}, - {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP direct exchange, as per the AMQP specification">>}]. serialise_events() -> false. @@ -41,9 +41,10 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 870b327a..3ebd8548 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -31,8 +32,7 @@ {enables, kernel_ready}]}). description() -> - [{name, <<"fanout">>}, - {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. serialise_events() -> false. @@ -40,9 +40,10 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index b185cc4a..cf2d3140 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -37,8 +38,7 @@ -endif. description() -> - [{name, <<"headers">>}, - {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP headers exchange, as per the AMQP specification">>}]. serialise_events() -> false. @@ -51,14 +51,24 @@ route(#exchange{name = Name}, rabbit_router:match_bindings( Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). -default_headers_match_kind() -> all. +validate_binding(_X, #binding{args = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-match">>) of + {longstr, <<"all">>} -> ok; + {longstr, <<"any">>} -> ok; + {longstr, Other} -> {error, + {binding_invalid, + "Invalid x-match field value ~p; " + "expected all or any", [Other]}}; + {Type, Other} -> {error, + {binding_invalid, + "Invalid x-match field type ~p (value ~p); " + "expected longstr", [Type, Other]}}; + undefined -> {error, + {binding_invalid, "x-match field missing", []}} + end. parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). +parse_x_match(<<"any">>) -> any. %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort @@ -69,17 +79,9 @@ parse_x_match(Other) -> %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). +headers_match(Args, Data) -> + {longstr, MK} = rabbit_misc:table_lookup(Args, <<"x-match">>), + headers_match(Args, Data, true, false, parse_x_match(MK)). headers_match([], _Data, AllMatch, _AnyMatch, all) -> AllMatch; @@ -116,7 +118,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 4a48a458..07a8004a 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -20,12 +20,12 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, - add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). description() -> - [{name, <<"invalid">>}, - {description, + [{description, <<"Dummy exchange type, to be used when the intended one is not found.">> }]. @@ -42,9 +42,10 @@ route(#exchange{name = Name, type = Type}, _) -> [rabbit_misc:rs(Name), Type]). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 70e32eaa..ce76ccb0 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -34,8 +35,7 @@ %%---------------------------------------------------------------------------- description() -> - [{name, <<"topic">>}, - {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + [{description, <<"AMQP topic exchange, as per the AMQP specification">>}]. serialise_events() -> false. @@ -48,6 +48,7 @@ route(#exchange{name = X}, end || RKey <- Routes]). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> @@ -58,7 +59,7 @@ delete(transaction, #exchange{name = X}, _Bs) -> delete(none, _Exchange, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 74d36aba..d9f1170e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -14,42 +14,165 @@ %% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. %% +%% The purpose of the limiter is to stem the flow of messages from +%% queues to channels, in order to act upon various protocol-level +%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos +%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer) +%% credit mechanism. +%% +%% Each channel has an associated limiter process, created with +%% start_link/1, which it passes to queues on consumer creation with +%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4. +%% The latter isn't strictly necessary, since basic.get is not +%% subject to limiting, but it means that whenever a queue knows about +%% a channel, it also knows about its limiter, which is less fiddly. +%% +%% The limiter process holds state that is, in effect, shared between +%% the channel and all queues from which the channel is +%% consuming. Essentially all these queues are competing for access to +%% a single, limited resource - the ability to deliver messages via +%% the channel - and it is the job of the limiter process to mediate +%% that access. +%% +%% The limiter process is separate from the channel process for two +%% reasons: separation of concerns, and efficiency. Channels can get +%% very busy, particularly if they are also dealing with publishes. +%% With a separate limiter process all the aforementioned access +%% mediation can take place without touching the channel. +%% +%% For efficiency, both the channel and the queues keep some local +%% state, initialised from the limiter pid with new/1 and client/1, +%% respectively. In particular this allows them to avoid any +%% interaction with the limiter process when it is 'inactive', i.e. no +%% protocol-level flow control is taking place. +%% +%% This optimisation does come at the cost of some complexity though: +%% when a limiter becomes active, the channel needs to inform all its +%% consumer queues of this change in status. It does this by invoking +%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse +%% transition, i.e. once a queue has been told about an active +%% limiter, it is not subsequently told when that limiter becomes +%% inactive. In practice it is rare for that to happen, though we +%% could optimise this case in the future. +%% +%% In addition, the consumer credit bookkeeping is local to queues, so +%% it is not necessary to store information about it in the limiter +%% process. But for abstraction we hide it from the queue behind the +%% limiter API, and it therefore becomes part of the queue local +%% state. +%% +%% The interactions with the limiter are as follows: +%% +%% 1. Channels tell the limiter about basic.qos prefetch counts - +%% that's what the limit_prefetch/3, unlimit_prefetch/1, +%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are +%% about - and channel.flow blocking - that's what block/1, +%% unblock/1 and is_blocked/1 are for. They also tell the limiter +%% queue state (via the queue) about consumer credit changes - +%% that's what credit/4 is for. +%% +%% 2. Queues also tell the limiter queue state about the queue +%% becoming empty (via drained/1) and consumers leaving (via +%% forget_consumer/2). +%% +%% 3. Queues register with the limiter - this happens as part of +%% activate/1. +%% +%% 4. The limiter process maintains an internal counter of 'messages +%% sent but not yet acknowledged', called the 'volume'. +%% +%% 5. Queues ask the limiter for permission (with can_send/3) whenever +%% they want to deliver a message to a channel. The limiter checks +%% whether a) the channel isn't blocked by channel.flow, b) the +%% volume has not yet reached the prefetch limit, and c) whether +%% the consumer has enough credit. If so it increments the volume +%% and tells the queue to proceed. Otherwise it marks the queue as +%% requiring notification (see below) and tells the queue not to +%% proceed. +%% +%% 6. A queue that has been told to proceed (by the return value of +%% can_send/3) sends the message to the channel. Conversely, a +%% queue that has been told not to proceed, will not attempt to +%% deliver that message, or any future messages, to the +%% channel. This is accomplished by can_send/3 capturing the +%% outcome in the local state, where it can be accessed with +%% is_suspended/1. +%% +%% 7. When a channel receives an ack it tells the limiter (via ack/2) +%% how many messages were ack'ed. The limiter process decrements +%% the volume and if it falls below the prefetch_count then it +%% notifies (through rabbit_amqqueue:resume/2) all the queues +%% requiring notification, i.e. all those that had a can_send/3 +%% request denied. +%% +%% 8. Upon receipt of such a notification, queues resume delivery to +%% the channel, i.e. they will once again start asking limiter, as +%% described in (5). +%% +%% 9. When a queue has no more consumers associated with a particular +%% channel, it deactivates use of the limiter with deactivate/1, +%% which alters the local state such that no further interactions +%% with the limiter process take place until a subsequent +%% activate/1. + -module(rabbit_limiter). -behaviour(gen_server2). +-export([start_link/0]). +%% channel API +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, + is_prefetch_limited/1, is_blocked/1, is_active/1, + get_prefetch_limit/1, ack/2, pid/1]). +%% queue API +-export([client/1, activate/1, can_send/3, resume/1, deactivate/1, + is_suspended/1, is_consumer_blocked/2, credit/4, drained/1, + forget_consumer/2]). +%% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). --export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, - disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(lstate, {pid, prefetch_limited, blocked}). +-record(qstate, {pid, state, credits}). -ifdef(use_specs). --export_type([token/0]). - --opaque(token() :: #token{}). +-type(lstate() :: #lstate{pid :: pid(), + prefetch_limited :: boolean(), + blocked :: boolean()}). +-type(qstate() :: #qstate{pid :: pid(), + state :: 'dormant' | 'active' | 'suspended'}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(make_token/0 :: () -> token()). --spec(make_token/1 :: ('undefined' | pid()) -> token()). --spec(is_enabled/1 :: (token()) -> boolean()). --spec(enable/2 :: (token(), non_neg_integer()) -> token()). --spec(disable/1 :: (token()) -> token()). --spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (token(), pid()) -> 'ok'). --spec(unregister/2 :: (token(), pid()) -> 'ok'). --spec(get_limit/1 :: (token()) -> non_neg_integer()). --spec(block/1 :: (token()) -> 'ok'). --spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). --spec(is_blocked/1 :: (token()) -> boolean()). +-spec(new/1 :: (pid()) -> lstate()). + +-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) + -> lstate()). +-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). +-spec(block/1 :: (lstate()) -> lstate()). +-spec(unblock/1 :: (lstate()) -> lstate()). +-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). +-spec(is_blocked/1 :: (lstate()) -> boolean()). +-spec(is_active/1 :: (lstate()) -> boolean()). +-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). +-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). +-spec(pid/1 :: (lstate()) -> pid()). + +-spec(client/1 :: (pid()) -> qstate()). +-spec(activate/1 :: (qstate()) -> qstate()). +-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) -> + {'continue' | 'suspend', qstate()}). +-spec(resume/1 :: (qstate()) -> qstate()). +-spec(deactivate/1 :: (qstate()) -> qstate()). +-spec(is_suspended/1 :: (qstate()) -> boolean()). +-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). +-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean()) + -> qstate()). +-spec(drained/1 :: (qstate()) + -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). +-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). -endif. @@ -64,120 +187,181 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link(?MODULE, [], []). -make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +new(Pid) -> + %% this a 'call' to ensure that it is invoked at most once. + ok = gen_server:call(Pid, {new, self()}), + #lstate{pid = Pid, prefetch_limited = false, blocked = false}. -is_enabled(#token{enabled = Enabled}) -> Enabled. +limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> + ok = gen_server:call(L#lstate.pid, + {limit_prefetch, PrefetchCount, UnackedCount}), + L#lstate{prefetch_limited = true}. -enable(#token{pid = Pid} = Token, Volume) -> - gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity). +unlimit_prefetch(L) -> + ok = gen_server:call(L#lstate.pid, unlimit_prefetch), + L#lstate{prefetch_limited = false}. -disable(#token{pid = Pid} = Token) -> - gen_server2:call(Pid, {disable, Token}, infinity). +block(L) -> + ok = gen_server:call(L#lstate.pid, block), + L#lstate{blocked = true}. -limit(Limiter, PrefetchCount) -> - maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). +unblock(L) -> + ok = gen_server:call(L#lstate.pid, unblock), + L#lstate{blocked = false}. -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. Note that we don't use maybe_call here in order -%% to avoid always going through with_exit_handler/2, even when the -%% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_send(_, _, _) -> - true. +is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. + +is_blocked(#lstate{blocked = Blocked}) -> Blocked. + +is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). + +get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; +get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit). -%% Let the limiter know that the channel has received some acks from a -%% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; +ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). -register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). +pid(#lstate{pid = Pid}) -> Pid. -unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). +client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}. -get_limit(Limiter) -> +activate(L = #qstate{state = dormant}) -> + ok = gen_server:cast(L#qstate.pid, {register, self()}), + L#qstate{state = active}; +activate(L) -> L. + +can_send(L = #qstate{pid = Pid, state = State, credits = Credits}, + AckRequired, CTag) -> + case is_consumer_blocked(L, CTag) of + false -> case (State =/= active orelse + safe_call(Pid, {can_send, self(), AckRequired}, true)) of + true -> {continue, L#qstate{ + credits = record_send_q(CTag, Credits)}}; + false -> {suspend, L#qstate{state = suspended}} + end; + true -> {suspend, L} + end. + +safe_call(Pid, Msg, ExitValue) -> rabbit_misc:with_exit_handler( - fun () -> 0 end, - fun () -> maybe_call(Limiter, get_limit, 0) end). + fun () -> ExitValue end, + fun () -> gen_server2:call(Pid, Msg, infinity) end). + +resume(L) -> L#qstate{state = active}. -block(Limiter) -> - maybe_call(Limiter, block, ok). +deactivate(L = #qstate{state = dormant}) -> L; +deactivate(L) -> + ok = gen_server:cast(L#qstate.pid, {unregister, self()}), + L#qstate{state = dormant}. + +is_suspended(#qstate{state = suspended}) -> true; +is_suspended(#qstate{}) -> false. + +is_consumer_blocked(#qstate{credits = Credits}, CTag) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = C}} when C > 0 -> false; + {value, #credit{}} -> true; + none -> false + end. -unblock(Limiter) -> - maybe_call(Limiter, {unblock, Limiter}, ok). +credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) -> + Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. -is_blocked(Limiter) -> - maybe_call(Limiter, is_blocked, false). +drained(Limiter = #qstate{credits = Credits}) -> + {CTagCredits, Credits2} = + rabbit_misc:gb_trees_fold( + fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> + {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; + (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + {Acc, Creds0} + end, {[], Credits}, Credits), + {CTagCredits, Limiter#qstate{credits = Credits2}}. + +forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> + Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations +%% in the queue (to do them elsewhere introduces a ton of +%% races). However, it's a big chunk of code that is conceptually very +%% linked to the limiter concept. So we get the queue to hold a bit of +%% state for us (#qstate.credits), and maintain a fiction that the +%% limiter is making the decisions... + +record_send_q(CTag, Credits) -> + case gb_trees:lookup(CTag, Credits) of + {value, #credit{credit = Credit, drain = Drain}} -> + update_credit(CTag, Credit - 1, Drain, Credits); + none -> + Credits + end. + +update_credit(CTag, Credit, Drain, Credits) -> + %% Using up all credit implies no need to send a 'drained' event + Drain1 = Drain andalso Credit > 0, + gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> - {ok, #lim{}}. +init([]) -> {ok, #lim{}}. + +prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. -prioritise_call(get_limit, _From, _Len, _State) -> 9; -prioritise_call(_Msg, _From, _Len, _State) -> 0. +handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> + {reply, ok, State#lim{ch_pid = ChPid}}; + +handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) -> + %% assertion + true = State#lim.prefetch_count == 0 orelse + State#lim.volume == UnackedCount, + {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount, + volume = UnackedCount})}; + +handle_call(unlimit_prefetch, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, + volume = 0})}; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{blocked = false})}; + +handle_call(get_prefetch_limit, _From, + State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}; handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case prefetch_limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end; - -handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}; - -handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call({unblock, Token}, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}; - -handle_call({enable, Token, Channel, Volume}, _From, State) -> - {reply, Token#token{enabled = true}, - State#lim{ch_pid = Channel, volume = Volume}}; -handle_call({disable, Token}, _From, State) -> - {reply, Token#token{enabled = false}, State}. + end. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), - {noreply, State1}; + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -199,27 +383,13 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of - true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont - end, NewState1}; - false -> {cont, NewState} + case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso + not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + true -> notify_queues(NewState); + false -> NewState end. -maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> - gen_server2:call(Pid, Call, infinity); -maybe_call(_, _Call, Default) -> - Default. - -maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> - gen_server2:cast(Pid, Cast); -maybe_cast(_, _Call) -> - ok. - -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> +prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -231,10 +401,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{queues = Queues}) -> case orddict:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), State#lim{queues = orddict:erase(QPid, Queues)}; error -> State end. @@ -251,13 +420,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> end, {[], Queues}, Queues), case length(QList) of 0 -> ok; - 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case + 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case L -> %% We randomly vary the position of queues in the list, %% thus ensuring that each queue has an equal chance of %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3] + [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3] || L3 <- [L2, L1]], ok end, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 05036d35..4fb1fc3b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -32,6 +32,8 @@ [policy_validator, <<"ha-mode">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-params">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-sync-mode">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of {ok, SPid} when is_pid(SPid) -> + maybe_auto_sync(Q), rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), {ok, started}; @@ -235,13 +238,13 @@ suggested_queue_nodes(Q) -> %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. suggested_queue_nodes(Q, PossibleNodes) -> - {MNode0, SNodes} = actual_queue_nodes(Q), + {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes}, PossibleNodes). + {MNode, SNodes, SSNodes}, PossibleNodes). policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -249,15 +252,20 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> - {MNode, Possible -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> + {MNode, Poss -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is currently not in the nodes specified, - %% act like it is for the purposes below - otherwise we will not - %% return it in the results... - Nodes = lists:usort([MNode | Nodes1]), - Unavailable = Nodes -- Possible, + %% If the current master is not in the nodes specified, then what we want + %% to do depends on whether there are any synchronised slaves. If there + %% are then we can just kill the current master - the admin has asked for + %% a migration and we should give it to them. If there are not however + %% then we must keep the master around so as not to lose messages. + Nodes = case SSNodes of + [] -> lists:usort([MNode | Nodes1]); + _ -> Nodes1 + end, + Unavailable = Nodes -- Poss, Available = Nodes -- Unavailable, case Available of [] -> %% We have never heard of anything? Not much we can do but @@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> {MNode, []}; _ -> case lists:member(MNode, Available) of true -> {MNode, Available -- [MNode]}; - false -> promote_slave(Available) + false -> %% Make sure the new master is synced! In order to + %% get here SSNodes must not be empty. + [NewMNode | _] = SSNodes, + {NewMNode, Available -- [NewMNode]} end end; %% When we need to add nodes, we randomise our candidate list as a %% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - but -%% that would fail to take account of synchronisation... -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> +%% randomise the list of ones to remove when we have too many - we +%% would have to take account of synchronisation though. +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> SCount = Count - 1, {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), + true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); false -> lists:sublist(SNodes, SCount) end}; -suggested_queue_nodes(_, _, {MNode, _}, _) -> +suggested_queue_nodes(_, _, {MNode, _, _}, _) -> {MNode, []}. shuffle(L) -> @@ -288,11 +299,14 @@ shuffle(L) -> {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. -actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> +actual_queue_nodes(#amqqueue{pid = MPid, + slave_pids = SPids, + sync_slave_pids = SSPids}) -> + Nodes = fun (L) -> [node(Pid) || Pid <- L] end, {case MPid of none -> none; _ -> node(MPid) - end, [node(Pid) || Pid <- SPids]}. + end, Nodes(SPids), Nodes(SSPids)}. is_mirrored(Q) -> case policy(<<"ha-mode">>, Q) of @@ -302,6 +316,14 @@ is_mirrored(Q) -> _ -> false end. +maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> + case policy(<<"ha-sync-mode">>, Q) of + <<"automatic">> -> + spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end); + _ -> + ok + end. + update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of @@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, update_mirrors0(OldQ = #amqqueue{name = QName}, NewQ = #amqqueue{name = QName}) -> - All = fun ({A,B}) -> [A|B] end, - OldNodes = All(actual_queue_nodes(OldQ)), - NewNodes = All(suggested_queue_nodes(NewQ)), - add_mirrors(QName, NewNodes -- OldNodes), + {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), + {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), + OldNodes = [OldMNode | OldSNodes], + NewNodes = [NewMNode | NewSNodes], + add_mirrors (QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), + maybe_auto_sync(NewQ), ok. %%---------------------------------------------------------------------------- validate_policy(KeyList) -> - validate_policy( - proplists:get_value(<<"ha-mode">>, KeyList), - proplists:get_value(<<"ha-params">>, KeyList, none)). + case validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList, none)) of + ok -> case proplists:get_value( + <<"ha-sync-mode">>, KeyList, <<"manual">>) of + <<"automatic">> -> ok; + <<"manual">> -> ok; + Mode -> {error, "ha-sync-mode must be \"manual\" " + "or \"automatic\", got ~p", [Mode]} + end; + E -> E + end. validate_policy(<<"all">>, none) -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8d5159e6..22edfcb6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) -> rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQ:set_ram_duration_target(DesiredDuration, BQS1). +%% [1] - the arrival of this newly synced slave may cause the master to die if +%% the admin has requested a migration-type change to policy. record_synchronised(#amqqueue { name = QName }) -> Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q = #amqqueue { sync_slave_pids = SSPids }] -> - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { sync_slave_pids = [Self | SSPids] }), - ok - end - end). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue { sync_slave_pids = SSPids }] -> + Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, + rabbit_mirror_queue_misc:store_updated_slaves(Q2), + {ok, Q1, Q2} + end + end) of + ok -> ok; + {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1] + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 010fd9ac..c39e898c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -677,6 +677,7 @@ remove_node_if_mnesia_running(Node) -> %% change being propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> + rabbit_amqqueue:forget_all_durable(Node), rabbit_node_monitor:notify_left_cluster(Node), ok; {aborted, Reason} -> diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b8b03f56..b53c16bf 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -20,7 +20,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1, peercert/1, - tune_buffer_size/1, connection_string/2, socket_ends/2]). + connection_string/2, socket_ends/2]). %%--------------------------------------------------------------------------- @@ -69,7 +69,6 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). --spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()). -spec(connection_string/2 :: (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -spec(socket_ends/2 :: @@ -189,13 +188,6 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. -tune_buffer_size(Sock) -> - case getopts(Sock, [sndbuf, recbuf, buffer]) of - {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - setopts(Sock, [{buffer, BufSz}]); - Err -> Err - end. - connection_string(Sock, Direction) -> case socket_ends(Sock, Direction) of {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 71c2c80a..de53b7f0 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -24,7 +24,7 @@ write_cluster_status/1, read_cluster_status/0, update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). --export([partitions/0]). +-export([partitions/0, subscribe/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). --record(state, {monitors, partitions}). +-record(state, {monitors, partitions, subscribers}). %%---------------------------------------------------------------------------- @@ -54,6 +54,7 @@ -spec(notify_left_cluster/1 :: (node()) -> 'ok'). -spec(partitions/0 :: () -> {node(), [node()]}). +-spec(subscribe/1 :: (pid()) -> 'ok'). -endif. @@ -179,6 +180,9 @@ notify_left_cluster(Node) -> partitions() -> gen_server:call(?SERVER, partitions, infinity). +subscribe(Pid) -> + gen_server:cast(?SERVER, {subscribe, Pid}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -190,8 +194,9 @@ init([]) -> %% happen. process_flag(trap_exit, true), {ok, _} = mnesia:subscribe(system), - {ok, #state{monitors = pmon:new(), - partitions = []}}. + {ok, #state{monitors = pmon:new(), + subscribers = pmon:new(), + partitions = []}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -232,23 +237,40 @@ handle_cast({left_cluster, Node}, State) -> write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), del_node(Node, RunningNodes)}), {noreply, State}; +handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) -> + {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}}; handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, - State = #state{monitors = Monitors}) -> + State = #state{monitors = Monitors, subscribers = Subscribers}) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}}; + [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], + {noreply, handle_dead_rabbit_state( + State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state{subscribers = Subscribers}) -> + {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, - State = #state{partitions = Partitions}) -> + State = #state{partitions = Partitions, + monitors = Monitors}) -> + %% We will not get a node_up from this node - yet we should treat it as + %% up (mostly). + State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of + true -> State; + false -> State#state{ + monitors = pmon:monitor({rabbit, Node}, Monitors)} + end, + ok = handle_live_rabbit(Node), Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State#state{partitions = Partitions1}}; + {noreply, State1#state{partitions = Partitions1}}; handle_info(_Info, State) -> {noreply, State}. @@ -270,7 +292,65 @@ handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), - ok = rabbit_mnesia:on_node_down(Node). + ok = rabbit_mnesia:on_node_down(Node), + case application:get_env(rabbit, cluster_partition_handling) of + {ok, pause_minority} -> + case majority() of + true -> ok; + false -> await_cluster_recovery() + end; + {ok, ignore} -> + ok; + {ok, Term} -> + rabbit_log:warning("cluster_partition_handling ~p unrecognised, " + "assuming 'ignore'~n", [Term]), + ok + end, + ok. + +majority() -> + length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5. + +%% mnesia:system_info(db_nodes) (and hence +%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results +%% when partitioned. +alive_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + [N || N <- Nodes, pong =:= net_adm:ping(N)]. + +await_cluster_recovery() -> + rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", + []), + Nodes = rabbit_mnesia:cluster_nodes(all), + spawn(fun () -> + %% If our group leader is inside an application we are about + %% to stop, application:stop/1 does not return. + group_leader(whereis(init), self()), + %% Ensure only one restarting process at a time, will + %% exit(badarg) (harmlessly) if one is already running + register(rabbit_restarting_process, self()), + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +wait_for_cluster_recovery(Nodes) -> + case majority() of + true -> rabbit:start(); + false -> timer:sleep(1000), + wait_for_cluster_recovery(Nodes) + end. + +handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> + %% If we have been partitioned, and we are now in the only remaining + %% partition, we no longer care about partitions - forget them. Note + %% that we do not attempt to deal with individual (other) partitions + %% going away. It's only safe to forget anything about partitions when + %% there are no partitions. + Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of + [] -> []; + _ -> Partitions + end, + State#state{partitions = Partitions1}. handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index 39d0188c..a4bd5042 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -16,7 +16,7 @@ -module(rabbit_parameter_validation). --export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]). +-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]). number(_Name, Term) when is_number(Term) -> ok; @@ -73,3 +73,15 @@ proplist(Name, Constraints, Term) when is_list(Term) -> proplist(Name, _Constraints, Term) -> {error, "~s not a list ~p", [Name, Term]}. + +enum(OptionsA) -> + Options = [list_to_binary(atom_to_list(O)) || O <- OptionsA], + fun (Name, Term) when is_binary(Term) -> + case lists:member(Term, Options) of + true -> ok; + false -> {error, "~s should be one of ~p, actually was ~p", + [Name, Options, Term]} + end; + (Name, Term) -> + {error, "~s should be binary, actually was ~p", [Name, Term]} + end. diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index e712078b..7398cd2d 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -26,7 +26,7 @@ -export([register/0]). -export([name/1, get/2, set/1]). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, list_formatted/1, info_keys/0]). @@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). -validate_clear(_VHost, <<"policy">>, _Name) -> - ok. - notify(VHost, <<"policy">>, _Name, _Term) -> update_policies(VHost). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3841b680..ea70208f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -162,7 +162,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_msg_ids }). + max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -190,7 +190,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_msg_ids :: gb_set() + unconfirmed :: gb_set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -210,7 +210,7 @@ -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/1 :: (qistate()) -> qistate()). --spec(needs_sync/1 :: (qistate()) -> boolean()). +-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false'). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_types:msg_id(), seq_id(), @@ -269,13 +269,16 @@ delete_and_terminate(State) -> State1. publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + State = #qistate { unconfirmed = Unconfirmed }) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - State #qistate { - unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }), + case MsgProps#message_properties.needs_confirming of + true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), + State #qistate { unconfirmed = Unconfirmed1 }; + false -> State + end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) -> needs_sync(#qistate { journal_handle = undefined }) -> false; -needs_sync(#qistate { journal_handle = JournalHdl }) -> - file_handle_cache:needs_sync(JournalHdl). +needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) -> + case gb_sets:is_empty(UC) of + true -> case file_handle_cache:needs_sync(JournalHdl) of + true -> other; + false -> false + end; + false -> confirms + end. flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -398,7 +407,7 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_msg_ids = gb_sets:new() }. + unconfirmed = gb_sets:new() }. clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action, end}; add_to_journal(RelSeq, Action, JEntries) -> - Val = case array:get(RelSeq, JEntries) of - undefined -> - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end; - ({Pub, no_del, no_ack}) when Action == del -> - {Pub, del, no_ack}; - ({Pub, del, no_ack}) when Action == ack -> - {Pub, del, ack} - end, - array:set(RelSeq, Val, JEntries). + case array:get(RelSeq, JEntries) of + undefined -> + array:set(RelSeq, + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end, JEntries); + ({Pub, no_del, no_ack}) when Action == del -> + array:set(RelSeq, {Pub, del, no_ack}, JEntries); + ({no_pub, del, no_ack}) when Action == ack -> + array:set(RelSeq, {no_pub, del, ack}, JEntries); + ({?PUB, del, no_ack}) when Action == ack -> + array:reset(RelSeq, JEntries) + end. maybe_flush_journal(State = #qistate { dirty_count = DCount, max_journal_entries = MaxJournal }) @@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(UG), - State #qistate { unsynced_msg_ids = gb_sets:new() }. +notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> + case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State #qistate { unconfirmed = gb_sets:new() } + end. %%---------------------------------------------------------------------------- %% segment manipulation diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index af7aac6f..61fac0e2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -186,32 +186,37 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). +socket_error(Reason) -> + log(error, "error on AMQP connection ~p: ~p (~s)~n", + [self(), Reason, rabbit_misc:format_inet_error(Reason)]). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", - [self(), Reason]), + {error, Reason} -> socket_error(Reason), %% NB: this is tcp socket, even in case of ssl rabbit_net:fast_close(Sock), exit(normal) end. -name(Sock) -> - socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end). - -socket_ends(Sock) -> - socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). - start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - Name = name(Sock), + Name = case rabbit_net:connection_string(Sock, inbound) of + {ok, Str} -> Str; + {error, enotconn} -> rabbit_net:fast_close(Sock), + exit(normal); + {error, Reason} -> socket_error(Reason), + rabbit_net:fast_close(Sock), + exit(normal) + end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - {PeerHost, PeerPort, Host, Port} = socket_ends(Sock), + {PeerHost, PeerPort, Host, Port} = + socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -245,7 +250,6 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_by = none, last_blocked_at = never}}, try - ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -295,26 +299,37 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of - {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); - closed -> case State#v1.connection_state of - closed -> State; - _ -> throw(connection_closed_abruptly) - end; - {error, Reason} -> throw({inet_error, Reason}); - {other, Other} -> handle_other(Other, Deb, State) + {data, Data} -> + recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed when State#v1.connection_state =:= closed -> + ok; + closed -> + maybe_emit_stats(State), + throw(connection_closed_abruptly); + {error, Reason} -> + maybe_emit_stats(State), + throw({inet_error, Reason}); + {other, {system, From, Request}} -> + sys:handle_system_msg(Request, From, State#v1.parent, + ?MODULE, Deb, State); + {other, Other} -> + case handle_other(Other, State) of + stop -> ok; + NewState -> recvloop(Deb, NewState) + end end. -handle_other({conserve_resources, Conserve}, Deb, +handle_other({conserve_resources, Conserve}, State = #v1{throttle = Throttle}) -> Throttle1 = Throttle#throttle{conserve_resources = Conserve}, - recvloop(Deb, control_throttle(State#v1{throttle = Throttle1})); -handle_other({channel_closing, ChPid}, Deb, State) -> + control_throttle(State#v1{throttle = Throttle1}); +handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(control_throttle(State))); -handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + maybe_close(control_throttle(State)); +handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), %% this is what we are expected to do according to @@ -325,59 +340,62 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% ordinary error case. However, since this termination is %% initiated by our parent it is probably more important to exit %% quickly. + maybe_emit_stats(State), exit(Reason); -handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> +handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) -> + maybe_emit_stats(State), throw(E); -handle_other({channel_exit, Channel, Reason}, Deb, State) -> - mainloop(Deb, handle_exception(State, Channel, Reason)); -handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); -handle_other(terminate_connection, _Deb, State) -> - State; -handle_other(handshake_timeout, Deb, State) +handle_other({channel_exit, Channel, Reason}, State) -> + handle_exception(State, Channel, Reason); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) -> + handle_dependent_exit(ChPid, Reason, State); +handle_other(terminate_connection, State) -> + maybe_emit_stats(State), + stop; +handle_other(handshake_timeout, State) when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) -> - mainloop(Deb, State); -handle_other(handshake_timeout, _Deb, State) -> + State; +handle_other(handshake_timeout, State) -> + maybe_emit_stats(State), throw({handshake_timeout, State#v1.callback}); -handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> - mainloop(Deb, State); -handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) -> + State; +handle_other(heartbeat_timeout, State = #v1{connection_state = S}) -> + maybe_emit_stats(State), throw({heartbeat_timeout, S}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> +handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) + force -> stop; + normal -> NewState end; -handle_other({'$gen_call', From, info}, Deb, State) -> +handle_other({'$gen_call', From, info}, State) -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); -handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + State; +handle_other({'$gen_call', From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) + State; +handle_other({'$gen_cast', force_event_refresh}, State) when ?IS_RUNNING(State) -> rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), - mainloop(Deb, State); -handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + State; +handle_other({'$gen_cast', force_event_refresh}, State) -> %% Ignore, we will emit a created event once we start running. - mainloop(Deb, State); -handle_other(ensure_stats, Deb, State) -> - mainloop(Deb, ensure_stats_timer(State)); -handle_other(emit_stats, Deb, State) -> - mainloop(Deb, emit_stats(State)); -handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); -handle_other({bump_credit, Msg}, Deb, State) -> + State; +handle_other(ensure_stats, State) -> + ensure_stats_timer(State); +handle_other(emit_stats, State) -> + emit_stats(State); +handle_other({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), - recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> + control_throttle(State); +handle_other(Other, State) -> %% internal error -> something worth dying for + maybe_emit_stats(State), exit({unexpected_message, Other}). switch_callback(State, Callback, Length) -> @@ -437,13 +455,13 @@ close_connection(State = #v1{queue_collector = Collector, handle_dependent_exit(ChPid, Reason, State) -> case {channel_cleanup(ChPid), termination_kind(Reason)} of - {undefined, uncontrolled} -> - exit({abnormal_dependent_exit, ChPid, Reason}); - {_Channel, controlled} -> - maybe_close(control_throttle(State)); - {Channel, uncontrolled} -> - maybe_close(handle_exception(control_throttle(State), - Channel, Reason)) + {undefined, controlled} -> State; + {undefined, uncontrolled} -> exit({abnormal_dependent_exit, + ChPid, Reason}); + {_Channel, controlled} -> maybe_close(control_throttle(State)); + {Channel, uncontrolled} -> State1 = handle_exception( + State, Channel, Reason), + maybe_close(control_throttle(State1)) end. terminate_channels() -> @@ -636,7 +654,10 @@ process_frame(Frame, Channel, State) -> post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + %% This is not strictly necessary, but more obviously + %% correct. Also note that we do not need to call maybe_close/1 + %% since we cannot possibly be in the 'closing' state. + control_throttle(State); post_process_frame({content_header, _, _, _, _}, _ChPid, State) -> maybe_block(State); post_process_frame({content_body, _}, _ChPid, State) -> @@ -774,7 +795,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, Connection#connection{ client_properties = ClientProperties, capabilities = Capabilities, - auth_mechanism = AuthMechanism, + auth_mechanism = {Mechanism, AuthMechanism}, auth_state = AuthMechanism:init(Sock)}}, auth_phase(Response, State); @@ -837,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(State1, #v1.stats_timer, - fun() -> emit_stats(State1) end), + maybe_emit_stats(State1), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -902,15 +922,14 @@ auth_mechanisms_binary(Sock) -> auth_phase(Response, State = #v1{connection = Connection = #connection{protocol = Protocol, - auth_mechanism = AuthMechanism, + auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> rabbit_misc:protocol_error( access_refused, "~s login refused: ~s", - [proplists:get_value(name, AuthMechanism:description()), - io_lib:format(Msg, Args)]); + [Name, io_lib:format(Msg, Args)]); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> @@ -970,10 +989,8 @@ ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; ic(client_properties, #connection{client_properties = CP}) -> CP; -ic(auth_mechanism, #connection{auth_mechanism = none}) -> - none; -ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) -> - proplists:get_value(name, Mechanism:description()); +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; +ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> @@ -1000,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) -> {ok, Cert} -> list_to_binary(F(Cert)) end. +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #v1.stats_timer, + fun() -> emit_stats(State) end). + emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 60419856..acdc2cff 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> internal_register(Class, TypeName, ModuleName) when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> ok = sanity_check_module(class_module(Class), ModuleName), - true = ets:insert(?ETS_NAME, - {{Class, internal_binary_to_type(TypeName)}, ModuleName}), + RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName}, + true = ets:insert(?ETS_NAME, RegArg), + conditional_register(RegArg), ok. internal_unregister(Class, TypeName) -> - true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}), + UnregArg = {Class, internal_binary_to_type(TypeName)}, + conditional_unregister(UnregArg), + true = ets:delete(?ETS_NAME, UnregArg), + ok. + +%% register exchange decorator route callback only when implemented, +%% in order to avoid unnecessary decorator calls on the fast +%% publishing path +conditional_register({{exchange_decorator, Type}, ModuleName}) -> + case erlang:function_exported(ModuleName, route, 2) of + true -> true = ets:insert(?ETS_NAME, + {{exchange_decorator_route, Type}, + ModuleName}); + false -> ok + end; +conditional_register(_) -> + ok. + +conditional_unregister({exchange_decorator, Type}) -> + true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}), + ok; +conditional_unregister(_) -> ok. sanity_check_module(ClassModule, Module) -> diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl index 8a237105..6b62c974 100644 --- a/src/rabbit_runtime_parameter.erl +++ b/src/rabbit_runtime_parameter.erl @@ -23,8 +23,6 @@ -callback validate(rabbit_types:vhost(), binary(), binary(), term()) -> validate_results(). --callback validate_clear(rabbit_types:vhost(), binary(), - binary()) -> validate_results(). -callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'. -callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'. @@ -35,7 +33,6 @@ behaviour_info(callbacks) -> [ {validate, 4}, - {validate_clear, 3}, {notify, 4}, {notify_clear, 3} ]; diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 2615372c..05520170 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, - list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3, + list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -40,12 +40,9 @@ -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]). --spec(list_strict/1 :: (binary() | '_') - -> [rabbit_types:infos()] | 'not_found'). +-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]). -spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_') -> [rabbit_types:infos()]). --spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_') - -> [rabbit_types:infos()] | 'not_found'). -spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary()) -> rabbit_types:infos() | 'not_found'). @@ -120,21 +117,13 @@ clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). clear_any(VHost, Component, Name) -> - case clear_any0(VHost, Component, Name) of - ok -> ok; - {errors, L} -> format_error(L) - end. - -clear_any0(VHost, Component, Name) -> - case lookup_component(Component) of - {ok, Mod} -> case flatten_errors( - Mod:validate_clear(VHost, Component, Name)) of - ok -> mnesia_clear(VHost, Component, Name), - Mod:notify_clear(VHost, Component, Name), - ok; - E -> E - end; - E -> E + case lookup(VHost, Component, Name) of + not_found -> {error_string, "Parameter does not exist"}; + _ -> mnesia_clear(VHost, Component, Name), + case lookup_component(Component) of + {ok, Mod} -> Mod:notify_clear(VHost, Component, Name); + _ -> ok + end end. mnesia_clear(VHost, Component, Name) -> @@ -147,21 +136,14 @@ list() -> [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. -list(VHost) -> list(VHost, '_', []). -list_strict(Component) -> list('_', Component, not_found). -list(VHost, Component) -> list(VHost, Component, []). -list_strict(VHost, Component) -> list(VHost, Component, not_found). - -list(VHost, Component, Default) -> - case component_good(Component) of - true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, - _ = '_'}, - [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- - mnesia:dirty_match_object(?TABLE, Match), - Comp =/= <<"policy">> orelse - Component =:= <<"policy">>]; - _ -> Default - end. +list(VHost) -> list(VHost, '_'). +list_component(Component) -> list('_', Component). + +list(VHost, Component) -> + Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, + [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <- + mnesia:dirty_match_object(?TABLE, Match), + Comp =/= <<"policy">> orelse Component =:= <<"policy">>]. list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. @@ -216,12 +198,6 @@ info_keys() -> [component, name, value]. %%--------------------------------------------------------------------------- -component_good('_') -> true; -component_good(Component) -> case lookup_component(Component) of - {ok, _} -> true; - _ -> false - end. - lookup_component(Component) -> case rabbit_registry:lookup_module( runtime_parameter, list_to_atom(binary_to_list(Component))) of diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index c27f1b4a..05c1dbc1 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -18,7 +18,7 @@ -behaviour(rabbit_runtime_parameter). -behaviour(rabbit_policy_validator). --export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([validate/4, notify/4, notify_clear/3]). -export([register/0, unregister/0]). -export([validate_policy/1]). -export([register_policy_validator/0, unregister_policy_validator/0]). @@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok; validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok; validate(_, <<"test">>, _, _) -> {error, "meh", []}. -validate_clear(_, <<"test">>, <<"good">>) -> ok; -validate_clear(_, <<"test">>, <<"maybe">>) -> ok; -validate_clear(_, <<"test">>, _) -> {error, "meh", []}. - notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f5ea4fba..e7b69879 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -912,10 +912,10 @@ test_arguments_parser() -> test_dynamic_mirroring() -> %% Just unit tests of the node selection logic, see multi node %% tests for the rest... - Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) -> {NewM, NewSs0} = rabbit_mirror_queue_misc:suggested_queue_nodes( - Policy, Params, {OldM, OldSs}, All), + Policy, Params, CurrentState, All), NewSs1 = lists:sort(NewSs0), case dm_list_match(NewSs, NewSs1, ExtraSs) of ok -> ok; @@ -923,28 +923,36 @@ test_dynamic_mirroring() -> end end, - Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), - Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]), + + N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end, %% Add a node - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), - Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]), %% Add two nodes and drop one - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]), %% Don't try to include nodes that are not running - Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master - Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), - %% And once that's happened, still keep the master even when not listed - Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]), - - Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), - Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), - Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), - Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), - Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]), + %% And once that's happened, still keep the master even when not listed, + %% if nothing is synced + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]), + Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]), + %% But if something is synced we can lose the master - but make + %% sure we pick the new master from the nodes which are synced! + Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]), + Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]), + + Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]), passed. @@ -1062,7 +1070,11 @@ test_runtime_parameters() -> ok = control_action(clear_parameter, ["test", "maybe"]), {error_string, _} = control_action(clear_parameter, ["test", "neverexisted"]), + + %% We can delete for a component that no longer exists + Good(["test", "good", "\"ignore\""]), rabbit_runtime_parameters_test:unregister(), + ok = control_action(clear_parameter, ["test", "good"]), passed. test_policy_validation() -> @@ -1082,25 +1094,20 @@ test_policy_validation() -> {error_string, _} = SetPol("testpos", [-1, 0, 1]), {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + ok = control_action(clear_policy, ["name"]), rabbit_runtime_parameters_test:unregister_policy_validator(), passed. test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun test_writer/0), - {ok, Ch} = rabbit_channel:start_link( - 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, - user(<<"user">>), <<"/">>, [], self(), - rabbit_limiter:make_token(self())), + {_Writer, Limiter, Ch} = test_channel(), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, rabbit_limiter:make_token(), - <<"ctag">>, true, undefined), + Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1178,8 +1185,6 @@ find_listener() -> N =:= node()], {H, P}. -test_writer() -> test_writer(none). - test_writer(Pid) -> receive {'$gen_call', From, flush} -> gen_server:reply(From, ok), @@ -1189,13 +1194,17 @@ test_writer(Pid) -> shutdown -> ok end. -test_spawn() -> +test_channel() -> Me = self(), Writer = spawn(fun () -> test_writer(Me) end), + {ok, Limiter} = rabbit_limiter:start_link(), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1, - user(<<"guest">>), <<"/">>, [], Me, - rabbit_limiter:make_token(self())), + user(<<"guest">>), <<"/">>, [], Me, Limiter), + {Writer, Limiter, Ch}. + +test_spawn() -> + {Writer, _Limiter, Ch} = test_channel(), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) @@ -1567,7 +1576,7 @@ control_action(Command, Node, Args, Opts) -> info_action(Command, Args, CheckVHost) -> ok = control_action(Command, []), - if CheckVHost -> ok = control_action(Command, []); + if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]); true -> ok end, ok = control_action(Command, lists:map(fun atom_to_list/1, Args)), @@ -2709,12 +2718,13 @@ test_queue_recover() -> end, rabbit_amqqueue:stop(), rabbit_amqqueue:start(rabbit_amqqueue:recover()), + {ok, Limiter} = rabbit_limiter:start_link(), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false), + rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = @@ -2735,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() -> rabbit_amqqueue:set_ram_duration_target(QPid, 0), + {ok, Limiter} = rabbit_limiter:start_link(), + CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true), + rabbit_amqqueue:basic_get(Q, self(), true, Limiter), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 18cab48b..f7c6c729 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -262,8 +262,6 @@ durable, transient_threshold, - async_callback, - len, persistent_count, @@ -356,8 +354,6 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: rabbit_backing_queue:async_callback(), - len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) -> init(#amqqueue { name = QueueName, durable = IsDurable }, false, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), - init(IsDurable, IndexState, 0, [], AsyncCallback, + init(IsDurable, IndexState, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, AsyncCallback); @@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true, rabbit_msg_store:contains(MsgId, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, AsyncCallback, + init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). terminate(_Reason, State) -> @@ -772,24 +768,18 @@ ram_duration(State = #vqstate { needs_timeout(State = #vqstate { index_state = IndexState, target_ram_count = TargetRamCount }) -> - case must_sync_index(State) of - true -> timed; - false -> - case rabbit_queue_index:needs_sync(IndexState) of - true -> idle; - false -> case TargetRamCount of - infinity -> false; - _ -> case - reduce_memory_use( - fun (_Quota, State1) -> {0, State1} end, - fun (_Quota, State1) -> State1 end, - fun (_Quota, State1) -> {0, State1} end, - State) of - {true, _State} -> idle; - {false, _State} -> false - end - end - end + case rabbit_queue_index:needs_sync(IndexState) of + confirms -> timed; + other -> idle; + false when TargetRamCount == infinity -> false; + false -> case reduce_memory_use( + fun (_Quota, State1) -> {0, State1} end, + fun (_Quota, State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, + State) of + {true, _State} -> idle; + {false, _State} -> false + end end. timeout(State = #vqstate { index_state = IndexState }) -> @@ -883,8 +873,7 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a msg_id to the unconfirmed set -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, IsDelivered, SeqId, Msg = #basic_message {id = MsgId}, MsgProps) -> @@ -1011,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, +init(IsDurable, IndexState, DeltaCount, Terms, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), @@ -1037,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback, durable = IsDurable, transient_threshold = NextSeqId, - async_callback = AsyncCallback, - len = DeltaCount1, persistent_count = DeltaCount1, @@ -1338,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), confirmed = gb_sets:union(C, MsgIdSet) }. -must_sync_index(#vqstate { msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - %% If UC is empty then by definition, MIOD and MOD are also empty - %% and there's nothing that can be pending a sync. - - %% If UC is not empty, then we want to find is_empty(UC - MIOD), - %% but the subtraction can be expensive. Thus instead, we test to - %% see if UC is a subset of MIOD. This can only be the case if - %% MIOD == UC, which would indicate that every message in UC is - %% also in MIOD and is thus _all_ pending on a msg_store sync, not - %% on a qi sync. Thus the negation of this is sufficient. Because - %% is_subset is short circuiting, this is more efficient than the - %% subtraction. - not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). - msgs_written_to_disk(Callback, MsgIdSet, ignored) -> Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 8d2cbc41..2858cf58 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -70,6 +70,7 @@ add(VHostPath) -> {<<"amq.rabbitmq.trace">>, topic}]], ok end), + rabbit_event:notify(vhost_created, info(VHostPath)), R. delete(VHostPath) -> @@ -87,6 +88,7 @@ delete(VHostPath) -> with(VHostPath, fun () -> ok = internal_delete(VHostPath) end)), + ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. internal_delete(VHostPath) -> @@ -95,9 +97,9 @@ internal_delete(VHostPath) -> || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], [ok = rabbit_runtime_parameters:clear(VHostPath, proplists:get_value(component, Info), - proplists:get_value(key, Info)) + proplists:get_value(name, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], - [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 0248f878..2725be31 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,21 +55,30 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, inet_db:register_socket(Sock, Mod), %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain(), + case tune_buffer_size(Sock) of + ok -> file_handle_cache:transfer( + apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(); + {error, enotconn} -> catch port_close(Sock); + {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock), + error_logger:error_msg( + "failed to tune buffer size of " + "connection accepted on ~s:~p - ~p (~s)~n", + [rabbit_misc:ntoab(IPAddress), Port, + Err, rabbit_misc:format_inet_error(Err)]), + catch port_close(Sock) + end, %% accept more accept(State); -handle_info({inet_async, LSock, Ref, {error, closed}}, - State=#state{sock=LSock, ref=Ref}) -> - %% It would be wrong to attempt to restart the acceptor when we - %% know this will fail. - {stop, normal, State}; - handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {stop, {accept_failed, Reason}, State}; + case Reason of + closed -> {stop, normal, State}; %% listening socket closed + econnaborted -> accept(State); %% client sent RST before we accepted + _ -> {stop, {accept_failed, Reason}, State} + end; handle_info(_Info, State) -> {noreply, State}. @@ -87,3 +96,10 @@ accept(State = #state{sock=LSock}) -> {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} end. + +tune_buffer_size(Sock) -> + case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of + {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), + inet:setopts(Sock, [{buffer, BufSz}]); + Error -> Error + end. |