summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ebin/rabbit_app.in5
-rw-r--r--packaging/debs/apt-repository/distributions2
-rw-r--r--packaging/standalone/Makefile82
-rw-r--r--packaging/standalone/erl.diff5
-rw-r--r--packaging/standalone/src/rabbit_release.erl152
-rw-r--r--scripts/rabbitmq-defaults6
-rwxr-xr-xscripts/rabbitmq-plugins3
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server7
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--src/rabbit.erl60
-rw-r--r--src/rabbit_alarm.erl14
-rw-r--r--src/rabbit_amqqueue.erl73
-rw-r--r--src/rabbit_amqqueue_process.erl320
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl3
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl3
-rw-r--r--src/rabbit_auth_mechanism_plain.erl3
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_binding.erl30
-rw-r--r--src/rabbit_channel.erl176
-rw-r--r--src/rabbit_channel_sup.erl4
-rw-r--r--src/rabbit_disk_monitor.erl7
-rw-r--r--src/rabbit_error_logger_file_h.erl3
-rw-r--r--src/rabbit_exchange.erl79
-rw-r--r--src/rabbit_exchange_decorator.erl16
-rw-r--r--src/rabbit_exchange_type.erl17
-rw-r--r--src/rabbit_exchange_type_direct.erl11
-rw-r--r--src/rabbit_exchange_type_fanout.erl9
-rw-r--r--src/rabbit_exchange_type_headers.erl44
-rw-r--r--src/rabbit_exchange_type_invalid.erl11
-rw-r--r--src/rabbit_exchange_type_topic.erl9
-rw-r--r--src/rabbit_limiter.erl407
-rw-r--r--src/rabbit_mirror_queue_misc.erl83
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_mnesia.erl1
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_node_monitor.erl98
-rw-r--r--src/rabbit_parameter_validation.erl14
-rw-r--r--src/rabbit_policy.erl5
-rw-r--r--src/rabbit_queue_index.erl64
-rw-r--r--src/rabbit_reader.erl175
-rw-r--r--src/rabbit_registry.erl28
-rw-r--r--src/rabbit_runtime_parameter.erl3
-rw-r--r--src/rabbit_runtime_parameters.erl58
-rw-r--r--src/rabbit_runtime_parameters_test.erl6
-rw-r--r--src/rabbit_tests.erl82
-rw-r--r--src/rabbit_variable_queue.erl60
-rw-r--r--src/rabbit_vhost.erl6
-rw-r--r--src/tcp_acceptor.erl34
49 files changed, 1583 insertions, 743 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 16dfd196..339fa69e 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -10,7 +10,7 @@
rabbit_sup,
rabbit_tcp_client_sup,
rabbit_direct_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon, xmerl]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
@@ -27,7 +27,7 @@
{frame_max, 131072},
{heartbeat, 600},
{msg_store_file_size_limit, 16777216},
- {queue_index_max_journal_entries, 262144},
+ {queue_index_max_journal_entries, 65536},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},
{default_user_tags, [administrator]},
@@ -44,6 +44,7 @@
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
{reverse_dns_lookups, false},
+ {cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions
index 183eb034..75b9fe46 100644
--- a/packaging/debs/apt-repository/distributions
+++ b/packaging/debs/apt-repository/distributions
@@ -2,6 +2,6 @@ Origin: RabbitMQ
Label: RabbitMQ Repository for Debian / Ubuntu etc
Suite: testing
Codename: kitten
-Architectures: arm hppa ia64 mips mipsel s390 sparc i386 amd64 powerpc source
+Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source
Components: main
Description: RabbitMQ Repository for Debian / Ubuntu etc
diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile
new file mode 100644
index 00000000..89ccde93
--- /dev/null
+++ b/packaging/standalone/Makefile
@@ -0,0 +1,82 @@
+VERSION=0.0.0
+SOURCE_DIR=rabbitmq-server-$(VERSION)
+TARGET_DIR=rabbitmq_server-$(VERSION)
+TARGET_TARBALL=rabbitmq-server-$(OS)-standalone-$(VERSION)
+RLS_DIR=$(TARGET_DIR)/release/$(TARGET_DIR)
+
+ERTS_VSN=$(shell erl -noshell -eval 'io:format("~s", [erlang:system_info(version)]), halt().')
+ERTS_ROOT_DIR=$(shell erl -noshell -eval 'io:format("~s", [code:root_dir()]), halt().')
+
+# used to generate the erlang release
+RABBITMQ_HOME=$(TARGET_DIR)
+RABBITMQ_EBIN_ROOT=$(RABBITMQ_HOME)/ebin
+RABBITMQ_PLUGINS_DIR=$(RABBITMQ_HOME)/plugins
+RABBITMQ_PLUGINS_EXPAND_DIR=$(RABBITMQ_PLUGINS_DIR)/expand
+
+RABBITMQ_DEFAULTS=$(TARGET_DIR)/sbin/rabbitmq-defaults
+fix_defaults = sed -e $(1) $(RABBITMQ_DEFAULTS) > $(RABBITMQ_DEFAULTS).tmp \
+ && mv $(RABBITMQ_DEFAULTS).tmp $(RABBITMQ_DEFAULTS)
+
+dist:
+ tar -zxf ../../dist/$(SOURCE_DIR).tar.gz
+
+ $(MAKE) -C $(SOURCE_DIR) \
+ TARGET_DIR=`pwd`/$(TARGET_DIR) \
+ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
+ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \
+ install
+
+## Here we set the RABBITMQ_HOME variable,
+## then we make ERL_DIR point to our released erl
+## and we add the paths to our released start_clean and start_sasl boot scripts
+ $(call fix_defaults,'s:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:')
+ $(call fix_defaults,'s:^ERL_DIR=$$:ERL_DIR=\$${RABBITMQ_HOME}/erts-$(ERTS_VSN)/bin/:')
+ $(call fix_defaults,'s:start_clean$$:"\$${SYS_PREFIX}/releases/$(VERSION)/start_clean":')
+ $(call fix_defaults,'s:start_sasl:"\$${SYS_PREFIX}/releases/$(VERSION)/start_sasl":')
+
+ chmod 0755 $(RABBITMQ_DEFAULTS)
+
+ mkdir -p $(TARGET_DIR)/etc/rabbitmq
+
+ $(MAKE) generate_release
+
+ mkdir -p $(RLS_DIR)
+ tar -C $(RLS_DIR) -xzf $(RABBITMQ_HOME)/rabbit.tar.gz
+
+# add minimal boot file
+ cp $(ERTS_ROOT_DIR)/bin/start_clean.boot $(RLS_DIR)/releases/$(VERSION)
+ cp $(ERTS_ROOT_DIR)/bin/start_sasl.boot $(RLS_DIR)/releases/$(VERSION)
+
+# move rabbitmq files to top level folder
+ mv $(RLS_DIR)/lib/rabbit-$(VERSION)/* $(RLS_DIR)
+
+# remove empty lib/rabbit-$(VERSION) folder
+ rm -rf $(RLS_DIR)/lib/rabbit-$(VERSION)
+
+# fix Erlang ROOTDIR
+ patch -o $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl.src < erl.diff
+
+ tar -zcf $(TARGET_TARBALL).tar.gz -C $(TARGET_DIR)/release $(TARGET_DIR)
+ rm -rf $(SOURCE_DIR) $(TARGET_DIR)
+
+clean: clean_partial
+ rm -f rabbitmq-server-$(OS)-standalone-*.tar.gz
+
+clean_partial:
+ rm -rf $(SOURCE_DIR)
+ rm -rf $(TARGET_DIR)
+
+.PHONY : generate_release
+generate_release:
+ erlc \
+ -I $(TARGET_DIR)/include/ -o src -Wall \
+ -v +debug_info -Duse_specs -Duse_proper_qc \
+ -pa $(TARGET_DIR)/ebin/ src/rabbit_release.erl
+ erl \
+ -pa "$(RABBITMQ_EBIN_ROOT)" \
+ -pa src \
+ -noinput \
+ -hidden \
+ -s rabbit_release \
+ -extra "$(RABBITMQ_PLUGINS_DIR)" "$(RABBITMQ_PLUGINS_EXPAND_DIR)" "$(RABBITMQ_HOME)"
+ rm src/rabbit_release.beam
diff --git a/packaging/standalone/erl.diff b/packaging/standalone/erl.diff
new file mode 100644
index 00000000..c51bfe22
--- /dev/null
+++ b/packaging/standalone/erl.diff
@@ -0,0 +1,5 @@
+20c20,21
+< ROOTDIR="%FINAL_ROOTDIR%"
+---
+> realpath() { [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" ; }
+> ROOTDIR="$(dirname `realpath $0`)/../.."
diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl
new file mode 100644
index 00000000..26f36d68
--- /dev/null
+++ b/packaging/standalone/src/rabbit_release.erl
@@ -0,0 +1,152 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+-module(rabbit_release).
+
+-export([start/0]).
+
+-include("rabbit.hrl").
+
+-define(BaseApps, [rabbit]).
+-define(ERROR_CODE, 1).
+
+%% We need to calculate all the ERTS apps we need to ship with a
+%% standalone rabbit. To acomplish that we need to unpack and load the plugins
+%% apps that are shiped with rabbit.
+%% Once we get that we generate an erlang release inside a tarball.
+%% Our make file will work with that release to generate our final rabbitmq
+%% package.
+start() ->
+ %% Determine our various directories
+ [PluginsDistDir, UnpackedPluginDir, RabbitHome] =
+ init:get_plain_arguments(),
+ RootName = UnpackedPluginDir ++ "/rabbit",
+
+ %% extract the plugins so we can load their apps later
+ prepare_plugins(PluginsDistDir, UnpackedPluginDir),
+
+ %% add the plugin ebin folder to the code path.
+ add_plugins_to_path(UnpackedPluginDir),
+
+ PluginAppNames = [P#plugin.name ||
+ P <- rabbit_plugins:list(PluginsDistDir)],
+
+ %% Build the entire set of dependencies - this will load the
+ %% applications along the way
+ AllApps = case catch sets:to_list(expand_dependencies(PluginAppNames)) of
+ {failed_to_load_app, App, Err} ->
+ terminate("failed to load application ~s:~n~p",
+ [App, Err]);
+ AppList ->
+ AppList
+ end,
+
+ %% we need a list of ERTS apps we need to ship with rabbit
+ BaseApps = AllApps -- PluginAppNames,
+
+ AppVersions = [determine_version(App) || App <- BaseApps],
+ RabbitVersion = proplists:get_value(rabbit, AppVersions),
+
+ %% Build the overall release descriptor
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
+ {erts, erlang:system_info(version)},
+ AppVersions},
+
+ %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
+ rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
+
+ %% Compile the script
+ systools:make_script(RootName),
+ systools:script2boot(RootName),
+ %% Make release tarfile
+ make_tar(RootName, RabbitHome),
+ rabbit_misc:quit(0).
+
+make_tar(Release, RabbitHome) ->
+ systools:make_tar(Release,
+ [
+ {dirs, [docs, etc, include, plugins, sbin, share]},
+ {erts, code:root_dir()},
+ {outdir, RabbitHome}
+ ]).
+
+determine_version(App) ->
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
+ {App, Vsn}.
+
+delete_recursively(Fn) ->
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
+ end.
+
+prepare_plugins(PluginsDistDir, DestDir) ->
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+
+ [prepare_plugin(Plugin, DestDir) ||
+ Plugin <- rabbit_plugins:list(PluginsDistDir)].
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
+
+expand_dependencies(Pending) ->
+ expand_dependencies(sets:new(), Pending).
+expand_dependencies(Current, []) ->
+ Current;
+expand_dependencies(Current, [Next|Rest]) ->
+ case sets:is_element(Next, Current) of
+ true ->
+ expand_dependencies(Current, Rest);
+ false ->
+ case application:load(Next) of
+ ok ->
+ ok;
+ {error, {already_loaded, _}} ->
+ ok;
+ {error, Reason} ->
+ throw({failed_to_load_app, Next, Reason})
+ end,
+ {ok, Required} = application:get_key(Next, applications),
+ Unique = [A || A <- Required, not(sets:is_element(A, Current))],
+ expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
+ end.
+
+add_plugins_to_path(PluginDir) ->
+ [add_plugin_to_path(PluginName) ||
+ PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
+
+add_plugin_to_path(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN).
+
+terminate(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ rabbit_misc:quit(?ERROR_CODE).
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults
index db1d4f2b..83c5639d 100644
--- a/scripts/rabbitmq-defaults
+++ b/scripts/rabbitmq-defaults
@@ -18,6 +18,12 @@
### next line potentially updated in package install steps
SYS_PREFIX=
+### next line will be updated when generating a standalone release
+ERL_DIR=
+
+CLEAN_BOOT_FILE=start_clean
+SASL_BOOT_FILE=start_sasl
+
## Set default values
CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 43f450c0..c043c90a 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -26,11 +26,12 @@
##--- End of overridden <var_name> variables
-exec erl \
+exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
-sname rabbitmq-plugins$$ \
+ -boot "${CLEAN_BOOT_FILE}" \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 713d7000..4b4dbe47 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -23,8 +23,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 184ae931..161ec2e6 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -82,7 +82,8 @@ case "$(uname -s)" in
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if ! erl -pa "$RABBITMQ_EBIN_ROOT" \
+if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
+ -boot "${CLEAN_BOOT_FILE}" \
-noinput \
-hidden \
-s rabbit_prelaunch \
@@ -103,11 +104,11 @@ RABBITMQ_LISTEN_ARG=
# there is no other way of preventing their expansion.
set -f
-exec erl \
+exec ${ERL_DIR}erl \
-pa ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot start_sasl \
+ -boot "${SASL_BOOT_FILE}" \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 00fffa9f..0368db3f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -26,12 +26,13 @@
##--- End of overridden <var_name> variables
-exec erl \
+exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
+ -boot "${CLEAN_BOOT_FILE}" \
-s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6b730fda..3cfa21ba 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -236,7 +236,7 @@
{memory, any()}]).
-spec(is_running/0 :: () -> boolean()).
-spec(is_running/1 :: (node()) -> boolean()).
--spec(environment/0 :: () -> [{param() | term()}]).
+-spec(environment/0 :: () -> [{param(), term()}]).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(force_event_refresh/0 :: () -> 'ok').
@@ -355,6 +355,8 @@ handle_app_error(App, Reason) ->
throw({could_not_start, App, Reason}).
start_it(StartFun) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_boot, Marker),
try
StartFun()
catch
@@ -363,11 +365,17 @@ start_it(StartFun) ->
_:Reason ->
boot_error(Reason, erlang:get_stacktrace())
after
+ unlink(Marker),
+ Marker ! stop,
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
+ case whereis(rabbit_boot) of
+ undefined -> ok;
+ _ -> await_startup()
+ end,
rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
@@ -435,8 +443,9 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n",
- [Vsn, erlang:system_info(otp_release)]),
+ error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
@@ -699,10 +708,12 @@ force_event_refresh() ->
log_broker_started(Plugins) ->
rabbit_misc:with_local_io(
fun() ->
+ PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
+ || P <- Plugins]),
error_logger:info_msg(
- "Server startup complete; plugins are: ~p~n", [Plugins]),
- io:format("~n Broker running with ~p plugins.~n",
- [length(Plugins)])
+ "Server startup complete; ~b plugins started.~n~s",
+ [length(Plugins), PluginList]),
+ io:format(" completed with ~p plugins.~n", [length(Plugins)])
end).
erts_version_check() ->
@@ -716,43 +727,38 @@ erts_version_check() ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~n## ## ~s ~s. ~s"
- "~n## ## ~s"
- "~n##########"
- "~n###### ## Logs: ~s"
- "~n########## ~s~n",
+ io:format("~n ~s ~s. ~s"
+ "~n ## ## ~s"
+ "~n ## ##"
+ "~n ########## Logs: ~s"
+ "~n ###### ## ~s"
+ "~n ##########"
+ "~n Starting broker...",
[Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE,
log_location(kernel), log_location(sasl)]).
log_banner() ->
- {ok, Product} = application:get_key(id),
- {ok, Version} = application:get_key(vsn),
Settings = [{"node", node()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
{"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()},
- {"erlang version", erlang:system_info(otp_release)}],
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = 1 + lists:max([length(K) || {K, _V} <- Settings]),
Format = fun (K, V) ->
rabbit_misc:format(
"~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", [K, V])
end,
Banner = iolist_to_binary(
- rabbit_misc:format(
- "~s ~s~n~s~n~s~n",
- [Product, Version, ?COPYRIGHT_MESSAGE,
- ?INFORMATION_MESSAGE]) ++
- [case S of
- {"config file(s)" = K, []} ->
- Format(K, "(none)");
- {"config file(s)" = K, [V0 | Vs]} ->
- Format(K, V0), [Format("", V) || V <- Vs];
- {K, V} ->
- Format(K, V)
- end || S <- Settings]),
+ [case S of
+ {"config file(s)" = K, []} ->
+ Format(K, "(none)");
+ {"config file(s)" = K, [V0 | Vs]} ->
+ Format(K, V0), [Format("", V) || V <- Vs];
+ {K, V} ->
+ Format(K, V)
+ end || S <- Settings]),
error_logger:info_msg("~s", [Banner]).
home_dir() ->
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 362b11aa..6d24d130 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -208,11 +208,19 @@ internal_register(Pid, {M, F, A} = AlertMFA,
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
- rabbit_log:warning("~s resource limit alarm set on node ~p~n",
- [Source, Node]),
+ rabbit_log:warning(
+ "~s resource limit alarm set on node ~p.~n~n"
+ "**********************************************************~n"
+ "*** Publishers will be blocked until this alarm clears ***~n"
+ "**********************************************************~n",
+ [Source, Node]),
{ok, maybe_alert(fun dict:append/3, Node, Source, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
- rabbit_log:warning("file descriptor limit alarm set~n"),
+ rabbit_log:warning(
+ "file descriptor limit alarm set.~n~n"
+ "********************************************************************~n"
+ "*** New connections will not be accepted until this alarm clears ***~n"
+ "********************************************************************~n"),
{ok, State};
handle_set_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' set~n", [Alarm]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9228755e..8c00c85c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,7 +17,7 @@
-module(rabbit_amqqueue).
-export([recover/0, stop/0, start/1, declare/5,
- delete_immediately/1, delete/3, purge/1]).
+ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -135,6 +135,7 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
+-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
@@ -143,19 +144,20 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
- ok_or_errors()).
--spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
+-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
+-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
+ non_neg_integer(), boolean()) -> 'ok').
+-spec(basic_consume/9 ::
+ (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(resume/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
@@ -406,7 +408,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -417,6 +420,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
@@ -529,16 +539,19 @@ notify_down_all(QPids, ChPid) ->
Bads1 -> {error, Bads1}
end.
-limit_all(QPids, ChPid, Limiter) ->
- delegate:cast(QPids, {limit, ChPid, Limiter}).
+activate_limit_all(QPids, ChPid) ->
+ delegate:cast(QPids, {activate_limit, ChPid}).
-basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck}).
+credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
+ delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:call(QPid, {basic_consume, NoAck, ChPid,
- Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
+basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
+ delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) ->
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -560,7 +573,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
+resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
@@ -591,6 +604,24 @@ internal_delete(QueueName) ->
end
end).
+forget_all_durable(Node) ->
+ %% Note rabbit is not running so we avoid e.g. the worker pool. Also why
+ %% we don't invoke the return from rabbit_binding:process_deletions/1.
+ {atomic, ok} =
+ mnesia:sync_transaction(
+ fun () ->
+ Qs = mnesia:match_object(rabbit_durable_queue,
+ #amqqueue{_ = '_'}, write),
+ [rabbit_binding:process_deletions(
+ internal_delete1(Name)) ||
+ #amqqueue{name = Name, pid = Pid} = Q <- Qs,
+ node(Pid) =:= Node,
+ rabbit_policy:get(<<"ha-mode">>, Q)
+ =:= {error, not_found}],
+ ok
+ end),
+ ok.
+
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 27d73445..b016c4d2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -65,9 +66,12 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% The limiter itself
limiter,
- is_limit_active,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -164,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -242,7 +248,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -254,6 +261,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -347,9 +356,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
-assert_invariant(#q{active_consumers = AC,
- backing_queue = BQ, backing_queue_state = BQS}) ->
- true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
+assert_invariant(State = #q{active_consumers = AC}) ->
+ true = (queue:is_empty(AC) orelse is_empty(State)).
+
+is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -357,17 +367,17 @@ lookup_ch(ChPid) ->
C -> C
end.
-ch_record(ChPid) ->
+ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- is_limit_active = false,
- limiter = rabbit_limiter:make_token(),
+ limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
@@ -387,37 +397,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter = Limiter,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
-update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 1});
-update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter, self()),
- update_ch_record(C#cr{consumer_count = 0,
- limiter = rabbit_limiter:make_token()});
-update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
- update_ch_record(C#cr{consumer_count = Count + Delta}).
-
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
-is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+maybe_send_drained(WasEmpty, State) ->
+ case (not WasEmpty) andalso is_empty(State) of
+ true -> [send_drained(C) || C <- all_ch_record()];
+ false -> ok
+ end,
+ State.
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
end.
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
@@ -435,18 +440,21 @@ deliver_msgs_to_consumers(DeliverFun, false,
end.
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
- C = ch_record(ChPid),
+ C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
- Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ {false, State};
+ {continue, Limiter} ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
+ State#q{active_consumers = AC1})
end
end.
@@ -471,9 +479,7 @@ deliver_msg_to_consumer(DeliverFun,
deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State1),
- {Result, BQ:is_empty(BQS), State2}.
+ {Result, is_empty(State1), State1}.
confirm_messages([], State) ->
State;
@@ -521,12 +527,10 @@ discard(#delivery{sender = SenderPid,
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(State),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
- BQ:is_empty(BQS), State1),
- State2.
+ is_empty(State), State),
+ State1.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
@@ -561,19 +565,56 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
+ %% optimisation: it would be perfectly safe to always
+ %% invoke drop_expired_msgs here, but that is expensive so
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
+ end
+ end.
+
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ WasEmpty = BQ:is_empty(BQS),
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(State#q{backing_queue_state = BQS1}).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, State#q{backing_queue_state = BQS1}}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ {Result, maybe_send_drained(Result =:= empty, State1)}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -602,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) ->
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
- not_found ->
+ not_found -> State;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ State;
+ true -> unblock(State, C1)
+ end
+ end.
+
+unblock(State, C = #cr{limiter = Limiter}) ->
+ case lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, queue:to_list(C#cr.blocked_consumers)) of
+ {_, []} ->
+ update_ch_record(C),
State;
- C ->
- C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
- end
+ {Blocked, Unblocked} ->
+ BlockedQ = queue:from_list(Blocked),
+ UnblockedQ = queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ}),
+ AC1 = queue:join(State#q.active_consumers, UnblockedQ),
+ run_message_queue(State#q{active_consumers = AC1})
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -712,9 +762,19 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{backing_queue_state = BQS,
- backing_queue = BQ }) ->
- Now = now_micros(),
+%% Logically this function should invoke maybe_send_drained/2.
+%% However, that is expensive. Since some frequent callers of
+%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot
+%% possibly cause the queue to become empty, we push the
+%% responsibility to the callers. So be cautious when adding new ones.
+drop_expired_msgs(State) ->
+ case is_empty(State) of
+ true -> State;
+ false -> drop_expired_msgs(now_micros(), State)
+ end.
+
+drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ }) ->
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, State1} =
with_dlx(
@@ -723,8 +783,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS,
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
{Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
- undefined -> undefined;
- #message_properties{expiry = Exp} -> Exp
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
end, State1).
with_dlx(undefined, _With, Without) -> Without();
@@ -747,6 +807,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,
@@ -1055,17 +1127,18 @@ handle_call({notify_down, ChPid}, From, State) ->
{stop, State1} -> stop(From, ok, State1)
end;
-handle_call({basic_get, ChPid, NoAck}, _From,
+handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_msgs(State1)) of
+ case fetch(AckRequired, State1) of
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag}, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
@@ -1075,15 +1148,30 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, BQ:len(BQS), Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, Limiter,
- ConsumerTag, ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = ch_record(ChPid),
- C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ Limiter2 = case CreditArgs of
+ none -> Limiter1;
+ {Crd, Drain} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, Crd, Drain)
+ end,
+ C1 = update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter2}),
+ case is_empty(State) of
+ true -> send_drained(C1);
+ false -> ok
+ end,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1092,18 +1180,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
- E = {ChPid, Consumer},
- State2 =
- case is_ch_blocked(C1) of
- true -> block_consumer(C1, E),
- State1;
- false -> update_ch_record(C1),
- AC1 = queue:in(E, State1#q.active_consumers),
- run_message_queue(State1#q{active_consumers = AC1})
- end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State2)),
- reply(ok, State2)
+ not NoAck, qname(State1)),
+ AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers),
+ reply(ok, run_message_queue(State1#q{active_consumers = AC1}))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
@@ -1112,10 +1192,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1132,7 +1221,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_msgs(ensure_expiry_timer(State)),
+ ensure_expiry_timer(State),
reply({ok, BQ:len(BQS), consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
@@ -1148,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From,
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1211,8 +1301,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- noreply(run_message_queue(
- State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
+ noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
@@ -1244,10 +1333,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
handle_cast(delete_immediately, State) ->
stop(State);
-handle_cast({unblock, ChPid}, State) ->
+handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end));
handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
@@ -1256,21 +1347,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
C#cr{unsent_message_count = Count - Credit}
end));
-handle_cast({limit, ChPid, Limiter}, State) ->
+handle_cast({activate_limit, ChPid}, State) ->
noreply(
- possibly_unblock(
- State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter = OldLimiter,
- is_limit_active = OldLimited}) ->
- case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_enabled(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter, self());
- false -> ok
- end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
- end));
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
@@ -1302,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({credit, ChPid, CTag, Credit, Drain},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ Len = BQ:len(BQS),
+ rabbit_channel:send_credit_reply(ChPid, Len),
+ C = #cr{limiter = Limiter} = lookup_ch(ChPid),
+ C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)},
+ noreply(case Drain andalso Len == 0 of
+ true -> update_ch_record(C1),
+ send_drained(C1),
+ State;
+ false -> case is_ch_blocked(C1) of
+ true -> update_ch_record(C1),
+ State;
+ false -> unblock(State, C1)
+ end
+ end);
+
handle_cast(wake_up, State) ->
noreply(State).
@@ -1321,7 +1421,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
+ WasEmpty = is_empty(State),
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ noreply(maybe_send_drained(WasEmpty, State1));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 1ed54fef..847a38f5 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -33,8 +33,7 @@
%% referring generically to "SASL security mechanism", i.e. the above.
description() ->
- [{name, <<"AMQPLAIN">>},
- {description, <<"QPid AMQPLAIN mechanism">>}].
+ [{description, <<"QPid AMQPLAIN mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index e4494ab4..4b08e4be 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -37,8 +37,7 @@
%% SECURE-OK: "My password is ~s", [Password]
description() ->
- [{name, <<"RABBIT-CR-DEMO">>},
- {description, <<"RabbitMQ Demo challenge-response authentication "
+ [{description, <<"RabbitMQ Demo challenge-response authentication "
"mechanism">>}].
should_offer(_Sock) ->
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 5553a641..a35a133a 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -36,8 +36,7 @@
%% matching and will thus be much faster.
description() ->
- [{name, <<"PLAIN">>},
- {description, <<"SASL PLAIN authentication mechanism">>}].
+ [{description, <<"SASL PLAIN authentication mechanism">>}].
should_offer(_Sock) ->
true.
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c2b52a7c..2f247448 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,8 +18,6 @@
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6096e07b..cb86e5ae 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -40,8 +40,11 @@
[{'not_found', (rabbit_types:binding_source() |
rabbit_types:binding_destination())} |
{'absent', rabbit_types:amqqueue()}]})).
+
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
- rabbit_types:error('binding_not_found')).
+ rabbit_types:error(
+ 'binding_not_found' |
+ {'binding_invalid', string(), [any()]})).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
@@ -157,15 +160,22 @@ add(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(Src, Dst) of
- ok -> case mnesia:read({rabbit_route, B}) of
- [] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
- end;
- {error, _} = Err -> rabbit_misc:const(Err)
+ case rabbit_exchange:validate_binding(Src, B) of
+ ok ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
end
end).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 877c73c3..52c6140e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,8 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
+ flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -81,8 +82,8 @@
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(),
- pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_framing:amqp_table(), pid(), pid()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -94,6 +95,9 @@
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
+-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
+ -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
@@ -138,6 +142,12 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, CTagCredit) ->
+ gen_server2:cast(Pid, {send_drained, CTagCredit}).
+
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -180,7 +190,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, Limiter]) ->
+ Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
State = #ch{state = starting,
@@ -190,7 +200,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
- limiter = Limiter,
+ limiter = rabbit_limiter:new(LimiterPid),
tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
@@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+ [ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
+ credit_drained = CreditDrained})
+ || {ConsumerTag, CreditDrained} <- CTagCredit],
+ noreply(State);
+
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
@@ -372,6 +394,8 @@ terminate(Reason, State) ->
_ -> ok
end,
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
@@ -544,14 +568,17 @@ check_name(_Kind, NameBin) ->
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case sets:is_element(QPid, Blocking) of
false -> State;
- true -> Blocking1 = sets:del_element(QPid, Blocking),
- case sets:size(Blocking1) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ true -> maybe_send_flow_ok(
+ State#ch{blocking = sets:del_element(QPid, Blocking)})
end.
+maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
+ case sets:size(Blocking) of
+ 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
+ _ -> ok
+ end,
+ State.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -586,6 +613,15 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
@@ -664,12 +700,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
+ limiter = Limiter,
next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(
+ Q, self(), NoAck, rabbit_limiter:pid(Limiter))
+ end) of
{ok, MessageCount,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -694,7 +733,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
- nowait = NoWait},
+ nowait = NoWait,
+ arguments = Arguments},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
@@ -716,8 +756,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), Limiter,
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
+ parse_credit_args(Arguments),
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -791,19 +834,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+handle_method(#'basic.qos'{prefetch_count = 0}, _,
State = #ch{limiter = Limiter}) ->
- Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
- {false, 0} -> Limiter;
- {false, _} -> enable_limiter(State);
- {_, _} -> Limiter
- end,
- Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
- ok -> Limiter1;
- {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
- Limiter2
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+ State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
+ Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
+ PrefetchCount, queue:len(UAMQ)),
+ {reply, #'basic.qos_ok'{},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
@@ -1066,27 +1107,44 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter = Limiter}) ->
- Limiter2 = case rabbit_limiter:unblock(Limiter) of
- ok -> Limiter;
- {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
- Limiter1
- end,
- {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
+ Limiter1 = rabbit_limiter:unblock(Limiter),
+ {reply, #'channel.flow_ok'{active = true},
+ maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
handle_method(#'channel.flow'{active = false}, _,
State = #ch{consumer_mapping = Consumers,
limiter = Limiter}) ->
- Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
- true -> Limiter;
- false -> enable_limiter(State)
- end,
- State1 = State#ch{limiter = Limiter1},
- ok = rabbit_limiter:block(Limiter1),
- case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
+ case rabbit_limiter:is_blocked(Limiter) of
+ true -> {noreply, maybe_send_flow_ok(State)};
+ false -> Limiter1 = rabbit_limiter:block(Limiter),
+ State1 = maybe_limit_queues(Limiter, Limiter1,
+ State#ch{limiter = Limiter1}),
+ %% The semantics of channel.flow{active=false}
+ %% require that no messages are delivered after the
+ %% channel.flow_ok has been sent. We accomplish that
+ %% by "flushing" all messages in flight from the
+ %% consumer queues to us. To do this we tell all the
+ %% queues to invoke rabbit_channel:flushed/2, which
+ %% will send us a {flushed, ...} message that appears
+ %% *after* all the {deliver, ...} messages. We keep
+ %% track of all the QPids thus asked, and once all of
+ %% them have responded (or died) we send the
+ %% channel.flow_ok.
+ QPids = consumer_queues(Consumers),
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
+ {noreply, maybe_send_flow_ok(
+ State1#ch{blocking = sets:from_list(QPids)})}
+ end;
+
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ drain = Drain}, _,
+ State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, Q} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed("unknown consumer tag '~s'", [CTag])
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1155,6 +1213,16 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+parse_credit_args(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {boolean, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1185,6 +1253,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(DestinationName)]);
+ {error, {binding_invalid, Fmt, Args}} ->
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
@@ -1321,14 +1391,14 @@ foreach_per_queue(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-enable_limiter(State = #ch{unacked_message_q = UAMQ,
- limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
- ok = limit_queues(Limiter1, State),
- Limiter1.
-
-limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
+maybe_limit_queues(OldLimiter, NewLimiter, State) ->
+ case ((not rabbit_limiter:is_active(OldLimiter)) andalso
+ rabbit_limiter:is_active(NewLimiter)) of
+ true -> Queues = consumer_queues(State#ch.consumer_mapping),
+ rabbit_amqqueue:activate_limit_all(Queues, self());
+ false -> ok
+ end,
+ State.
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1339,7 +1409,9 @@ consumer_queues(Consumers) ->
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
notify_limiter(Limiter, Acked) ->
- case rabbit_limiter:is_enabled(Limiter) of
+ %% optimisation: avoid the potentially expensive 'foldl' in the
+ %% common case.
+ case rabbit_limiter:is_prefetch_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1504,7 +1576,7 @@ i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
- rabbit_limiter:get_limit(Limiter);
+ rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8ea44a81..a0c7624b 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{channel, {rabbit_channel, start_link,
[Channel, ReaderPid, WriterPid, ReaderPid, ConnName,
Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
@@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
{channel, {rabbit_channel, start_link,
[Channel, ClientChannelPid, ClientChannelPid, ConnPid,
ConnName, Protocol, User, VHost, Capabilities, Collector,
- rabbit_limiter:make_token(LimiterPid)]},
+ LimiterPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b396b289..3bb163a1 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -31,6 +31,7 @@
-record(state, {dir,
limit,
+ actual,
timeout,
timer,
alarmed
@@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
- {reply, get_disk_free(Dir), State};
+handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
+ {reply, Actual, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
- State #state {alarmed = NewAlarmed}.
+ State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 3efc9c0c..eb6247e0 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -76,6 +76,9 @@ init_file(File, PrevHandler) ->
Error -> Error
end.
+%% filter out "application: foo; exited: stopped; type: temporary"
+handle_event({info_report, _, {_, std_info, _}}, State) ->
+ {ok, State};
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 88033f77..9e98448d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- route/2, delete/2]).
+ route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
@@ -83,6 +83,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
+-spec(validate_binding/2 ::
+ (rabbit_types:exchange(), rabbit_types:binding())
+ -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
@@ -117,14 +120,18 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- decorators()],
+ M <- registry_lookup(exchange_decorator)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+policy_changed(X = #exchange{type = XType}, X1) ->
+ [ok = M:policy_changed(X, X1) ||
+ M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+ ok.
serialise_events(X = #exchange{type = Type}) ->
- lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ lists:any(fun (M) -> M:serialise_events(X) end,
+ registry_lookup(exchange_decorator))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -136,8 +143,15 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-decorators() ->
- [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+registry_lookup(exchange_decorator_route = Class) ->
+ case get(exchange_decorator_route_modules) of
+ undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
+ put(exchange_decorator_route_modules, Mods),
+ Mods;
+ Mods -> Mods
+ end;
+registry_lookup(Class) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(Class)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
@@ -304,32 +318,42 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-%% Optimisation
-route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
- #delivery{message = #basic_message{routing_keys = RKs}}) ->
- [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
-
-route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {[X], XName, []}).
+route(#exchange{name = #resource{virtual_host = VHost,
+ name = RName} = XName} = X,
+ #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
+ case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
+ {[], true} ->
+ %% Optimisation
+ [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+ {Decorators, _} ->
+ lists:usort(route1(Delivery, Decorators, {[X], XName, []}))
+ end.
-route1(_, {[], _, QNames}) ->
- lists:usort(QNames);
-route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
+route1(_, _, {[], _, QNames}) ->
+ QNames;
+route1(Delivery, Decorators,
+ {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ ExchangeDests = (type_to_module(Type)):route(X, Delivery),
+ DecorateDests = process_decorators(X, Decorators, Delivery),
+ AlternateDests = process_alternate(X, ExchangeDests),
+ route1(Delivery, Decorators,
lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
- DstNames)).
+ AlternateDests ++ DecorateDests ++ ExchangeDests)).
-process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+process_alternate(#exchange{arguments = []}, _Results) -> %% optimisation
+ [];
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
AName -> [AName]
end;
-process_alternate(_X, Results) ->
- Results.
+process_alternate(_X, _Results) ->
+ [].
+
+process_decorators(_, [], _) -> %% optimisation
+ [];
+process_decorators(X, Decorators, Delivery) ->
+ lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
process_route(#resource{kind = exchange} = XName,
{_WorkList, XName, _QNames} = Acc) ->
@@ -381,6 +405,10 @@ delete(XName, IfUnused) ->
end
end).
+validate_binding(X = #exchange{type = XType}, Binding) ->
+ Module = type_to_module(XType),
+ Module:validate_binding(X, Binding).
+
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
@@ -422,8 +450,7 @@ peek_serial(XName, LockType) ->
end.
invalid_module(T) ->
- rabbit_log:warning(
- "Could not find exchange type ~s.~n", [T]),
+ rabbit_log:warning("Could not find exchange type ~s.~n", [T]),
put({xtype_to_module, T}, rabbit_exchange_type_invalid),
rabbit_exchange_type_invalid.
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index befbc462..040b55db 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -21,9 +21,8 @@
%% 1) It applies to all exchanges as soon as it is installed, therefore
%% 2) It is not allowed to affect validation, so no validate/1 or
%% assert_args_equivalence/2
-%% 3) It also can't affect routing
%%
-%% It's possible in the future we might relax 3), or even make these
+%% It's possible in the future we might make decorators
%% able to manipulate messages as they are published.
-ifdef(use_specs).
@@ -46,6 +45,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -54,9 +57,10 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called when the policy attached to this exchange changes.
--callback policy_changed (
- serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+%% Decorators can optionally implement route/2 which allows additional
+%% destinations to be added to the routing decision.
+%% -callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+%% [rabbit_amqqueue:name() | rabbit_exchange:name()].
-else.
@@ -64,7 +68,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1fbcb2d8..ebc59501 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -37,6 +37,10 @@
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
-callback validate(rabbit_types:exchange()) -> 'ok'.
+%% called BEFORE declaration, to check args etc
+-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) ->
+ rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
+
%% called after declaration and recovery
-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
@@ -44,6 +48,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -58,18 +66,15 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
-%% called when the policy attached to this exchange changes.
--callback policy_changed(serial(), rabbit_types:exchange(),
- rabbit_types:exchange()) -> 'ok'.
-
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
+ [{description, 0}, {serialise_events, 0}, {route, 2},
+ {validate, 1}, {validate_binding, 2}, {policy_changed, 2},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}, {policy_changed, 3}];
+ {assert_args_equivalence, 2}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index e54bd66e..10a79c55 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,8 +20,9 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
@@ -31,8 +32,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"direct">>},
- {description, <<"AMQP direct exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
serialise_events() -> false.
@@ -41,9 +41,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 870b327a..3ebd8548 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -31,8 +32,7 @@
{enables, kernel_ready}]}).
description() ->
- [{name, <<"fanout">>},
- {description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
serialise_events() -> false.
@@ -40,9 +40,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index b185cc4a..cf2d3140 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -37,8 +38,7 @@
-endif.
description() ->
- [{name, <<"headers">>},
- {description, <<"AMQP headers exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP headers exchange, as per the AMQP specification">>}].
serialise_events() -> false.
@@ -51,14 +51,24 @@ route(#exchange{name = Name},
rabbit_router:match_bindings(
Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
-default_headers_match_kind() -> all.
+validate_binding(_X, #binding{args = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-match">>) of
+ {longstr, <<"all">>} -> ok;
+ {longstr, <<"any">>} -> ok;
+ {longstr, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field value ~p; "
+ "expected all or any", [Other]}};
+ {Type, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field type ~p (value ~p); "
+ "expected longstr", [Type, Other]}};
+ undefined -> {error,
+ {binding_invalid, "x-match field missing", []}}
+ end.
parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
+parse_x_match(<<"any">>) -> any.
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
@@ -69,17 +79,9 @@ parse_x_match(Other) ->
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
+headers_match(Args, Data) ->
+ {longstr, MK} = rabbit_misc:table_lookup(Args, <<"x-match">>),
+ headers_match(Args, Data, true, false, parse_x_match(MK)).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
AllMatch;
@@ -116,7 +118,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 4a48a458..07a8004a 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,12 +20,12 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
description() ->
- [{name, <<"invalid">>},
- {description,
+ [{description,
<<"Dummy exchange type, to be used when the intended one is not found.">>
}].
@@ -42,9 +42,10 @@ route(#exchange{name = Name, type = Type}, _) ->
[rabbit_misc:rs(Name), Type]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 70e32eaa..ce76ccb0 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -34,8 +35,7 @@
%%----------------------------------------------------------------------------
description() ->
- [{name, <<"topic">>},
- {description, <<"AMQP topic exchange, as per the AMQP specification">>}].
+ [{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
serialise_events() -> false.
@@ -48,6 +48,7 @@ route(#exchange{name = X},
end || RKey <- Routes]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
@@ -58,7 +59,7 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 74d36aba..d9f1170e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -14,42 +14,165 @@
%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
%%
+%% The purpose of the limiter is to stem the flow of messages from
+%% queues to channels, in order to act upon various protocol-level
+%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
+%% prefetch_count and channel.flow, and AMQP 1.0's link (aka consumer)
+%% credit mechanism.
+%%
+%% Each channel has an associated limiter process, created with
+%% start_link/1, which it passes to queues on consumer creation with
+%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4.
+%% The latter isn't strictly necessary, since basic.get is not
+%% subject to limiting, but it means that whenever a queue knows about
+%% a channel, it also knows about its limiter, which is less fiddly.
+%%
+%% The limiter process holds state that is, in effect, shared between
+%% the channel and all queues from which the channel is
+%% consuming. Essentially all these queues are competing for access to
+%% a single, limited resource - the ability to deliver messages via
+%% the channel - and it is the job of the limiter process to mediate
+%% that access.
+%%
+%% The limiter process is separate from the channel process for two
+%% reasons: separation of concerns, and efficiency. Channels can get
+%% very busy, particularly if they are also dealing with publishes.
+%% With a separate limiter process all the aforementioned access
+%% mediation can take place without touching the channel.
+%%
+%% For efficiency, both the channel and the queues keep some local
+%% state, initialised from the limiter pid with new/1 and client/1,
+%% respectively. In particular this allows them to avoid any
+%% interaction with the limiter process when it is 'inactive', i.e. no
+%% protocol-level flow control is taking place.
+%%
+%% This optimisation does come at the cost of some complexity though:
+%% when a limiter becomes active, the channel needs to inform all its
+%% consumer queues of this change in status. It does this by invoking
+%% rabbit_amqqueue:activate_limit_all/2. Note that there is no inverse
+%% transition, i.e. once a queue has been told about an active
+%% limiter, it is not subsequently told when that limiter becomes
+%% inactive. In practice it is rare for that to happen, though we
+%% could optimise this case in the future.
+%%
+%% In addition, the consumer credit bookkeeping is local to queues, so
+%% it is not necessary to store information about it in the limiter
+%% process. But for abstraction we hide it from the queue behind the
+%% limiter API, and it therefore becomes part of the queue local
+%% state.
+%%
+%% The interactions with the limiter are as follows:
+%%
+%% 1. Channels tell the limiter about basic.qos prefetch counts -
+%% that's what the limit_prefetch/3, unlimit_prefetch/1,
+%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
+%% about - and channel.flow blocking - that's what block/1,
+%% unblock/1 and is_blocked/1 are for. They also tell the limiter
+%% queue state (via the queue) about consumer credit changes -
+%% that's what credit/4 is for.
+%%
+%% 2. Queues also tell the limiter queue state about the queue
+%% becoming empty (via drained/1) and consumers leaving (via
+%% forget_consumer/2).
+%%
+%% 3. Queues register with the limiter - this happens as part of
+%% activate/1.
+%%
+%% 4. The limiter process maintains an internal counter of 'messages
+%% sent but not yet acknowledged', called the 'volume'.
+%%
+%% 5. Queues ask the limiter for permission (with can_send/3) whenever
+%% they want to deliver a message to a channel. The limiter checks
+%% whether a) the channel isn't blocked by channel.flow, b) the
+%% volume has not yet reached the prefetch limit, and c) whether
+%% the consumer has enough credit. If so it increments the volume
+%% and tells the queue to proceed. Otherwise it marks the queue as
+%% requiring notification (see below) and tells the queue not to
+%% proceed.
+%%
+%% 6. A queue that has been told to proceed (by the return value of
+%% can_send/3) sends the message to the channel. Conversely, a
+%% queue that has been told not to proceed, will not attempt to
+%% deliver that message, or any future messages, to the
+%% channel. This is accomplished by can_send/3 capturing the
+%% outcome in the local state, where it can be accessed with
+%% is_suspended/1.
+%%
+%% 7. When a channel receives an ack it tells the limiter (via ack/2)
+%% how many messages were ack'ed. The limiter process decrements
+%% the volume and if it falls below the prefetch_count then it
+%% notifies (through rabbit_amqqueue:resume/2) all the queues
+%% requiring notification, i.e. all those that had a can_send/3
+%% request denied.
+%%
+%% 8. Upon receipt of such a notification, queues resume delivery to
+%% the channel, i.e. they will once again start asking limiter, as
+%% described in (5).
+%%
+%% 9. When a queue has no more consumers associated with a particular
+%% channel, it deactivates use of the limiter with deactivate/1,
+%% which alters the local state such that no further interactions
+%% with the limiter process take place until a subsequent
+%% activate/1.
+
-module(rabbit_limiter).
-behaviour(gen_server2).
+-export([start_link/0]).
+%% channel API
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
+ is_prefetch_limited/1, is_blocked/1, is_active/1,
+ get_prefetch_limit/1, ack/2, pid/1]).
+%% queue API
+-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
+ is_suspended/1, is_consumer_blocked/2, credit/4, drained/1,
+ forget_consumer/2]).
+%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/4]).
--export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
- disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(lstate, {pid, prefetch_limited, blocked}).
+-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
--export_type([token/0]).
-
--opaque(token() :: #token{}).
+-type(lstate() :: #lstate{pid :: pid(),
+ prefetch_limited :: boolean(),
+ blocked :: boolean()}).
+-type(qstate() :: #qstate{pid :: pid(),
+ state :: 'dormant' | 'active' | 'suspended'}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(make_token/0 :: () -> token()).
--spec(make_token/1 :: ('undefined' | pid()) -> token()).
--spec(is_enabled/1 :: (token()) -> boolean()).
--spec(enable/2 :: (token(), non_neg_integer()) -> token()).
--spec(disable/1 :: (token()) -> token()).
--spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
--spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
--spec(register/2 :: (token(), pid()) -> 'ok').
--spec(unregister/2 :: (token(), pid()) -> 'ok').
--spec(get_limit/1 :: (token()) -> non_neg_integer()).
--spec(block/1 :: (token()) -> 'ok').
--spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
--spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(new/1 :: (pid()) -> lstate()).
+
+-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
+ -> lstate()).
+-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
+-spec(block/1 :: (lstate()) -> lstate()).
+-spec(unblock/1 :: (lstate()) -> lstate()).
+-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
+-spec(is_blocked/1 :: (lstate()) -> boolean()).
+-spec(is_active/1 :: (lstate()) -> boolean()).
+-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
+-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+-spec(pid/1 :: (lstate()) -> pid()).
+
+-spec(client/1 :: (pid()) -> qstate()).
+-spec(activate/1 :: (qstate()) -> qstate()).
+-spec(can_send/3 :: (qstate(), boolean(), rabbit_types:ctag()) ->
+ {'continue' | 'suspend', qstate()}).
+-spec(resume/1 :: (qstate()) -> qstate()).
+-spec(deactivate/1 :: (qstate()) -> qstate()).
+-spec(is_suspended/1 :: (qstate()) -> boolean()).
+-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
+-spec(credit/4 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean())
+ -> qstate()).
+-spec(drained/1 :: (qstate())
+ -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
+-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
-endif.
@@ -64,120 +187,181 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
+-record(credit, {credit = 0, drain = false}).
+
%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------
start_link() -> gen_server2:start_link(?MODULE, [], []).
-make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+new(Pid) ->
+ %% this a 'call' to ensure that it is invoked at most once.
+ ok = gen_server:call(Pid, {new, self()}),
+ #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
-is_enabled(#token{enabled = Enabled}) -> Enabled.
+limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
+ ok = gen_server:call(L#lstate.pid,
+ {limit_prefetch, PrefetchCount, UnackedCount}),
+ L#lstate{prefetch_limited = true}.
-enable(#token{pid = Pid} = Token, Volume) ->
- gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity).
+unlimit_prefetch(L) ->
+ ok = gen_server:call(L#lstate.pid, unlimit_prefetch),
+ L#lstate{prefetch_limited = false}.
-disable(#token{pid = Pid} = Token) ->
- gen_server2:call(Pid, {disable, Token}, infinity).
+block(L) ->
+ ok = gen_server:call(L#lstate.pid, block),
+ L#lstate{blocked = true}.
-limit(Limiter, PrefetchCount) ->
- maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok).
+unblock(L) ->
+ ok = gen_server:call(L#lstate.pid, unblock),
+ L#lstate{blocked = false}.
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit. Note that we don't use maybe_call here in order
-%% to avoid always going through with_exit_handler/2, even when the
-%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
- rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
- end);
-can_send(_, _, _) ->
- true.
+is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
+
+is_blocked(#lstate{blocked = Blocked}) -> Blocked.
+
+is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+
+get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
+get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit).
-%% Let the limiter know that the channel has received some acks from a
-%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
+ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
-register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
+pid(#lstate{pid = Pid}) -> Pid.
-unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}).
+client(Pid) -> #qstate{pid = Pid, state = dormant, credits = gb_trees:empty()}.
-get_limit(Limiter) ->
+activate(L = #qstate{state = dormant}) ->
+ ok = gen_server:cast(L#qstate.pid, {register, self()}),
+ L#qstate{state = active};
+activate(L) -> L.
+
+can_send(L = #qstate{pid = Pid, state = State, credits = Credits},
+ AckRequired, CTag) ->
+ case is_consumer_blocked(L, CTag) of
+ false -> case (State =/= active orelse
+ safe_call(Pid, {can_send, self(), AckRequired}, true)) of
+ true -> {continue, L#qstate{
+ credits = record_send_q(CTag, Credits)}};
+ false -> {suspend, L#qstate{state = suspended}}
+ end;
+ true -> {suspend, L}
+ end.
+
+safe_call(Pid, Msg, ExitValue) ->
rabbit_misc:with_exit_handler(
- fun () -> 0 end,
- fun () -> maybe_call(Limiter, get_limit, 0) end).
+ fun () -> ExitValue end,
+ fun () -> gen_server2:call(Pid, Msg, infinity) end).
+
+resume(L) -> L#qstate{state = active}.
-block(Limiter) ->
- maybe_call(Limiter, block, ok).
+deactivate(L = #qstate{state = dormant}) -> L;
+deactivate(L) ->
+ ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
+ L#qstate{state = dormant}.
+
+is_suspended(#qstate{state = suspended}) -> true;
+is_suspended(#qstate{}) -> false.
+
+is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = C}} when C > 0 -> false;
+ {value, #credit{}} -> true;
+ none -> false
+ end.
-unblock(Limiter) ->
- maybe_call(Limiter, {unblock, Limiter}, ok).
+credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain) ->
+ Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
-is_blocked(Limiter) ->
- maybe_call(Limiter, is_blocked, false).
+drained(Limiter = #qstate{credits = Credits}) ->
+ {CTagCredits, Credits2} =
+ rabbit_misc:gb_trees_fold(
+ fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
+ (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ {Acc, Creds0}
+ end, {[], Credits}, Credits),
+ {CTagCredits, Limiter#qstate{credits = Credits2}}.
+
+forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
+ Limiter#qstate{credits = gb_trees:delete_any(CTag, Credits)}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations
+%% in the queue (to do them elsewhere introduces a ton of
+%% races). However, it's a big chunk of code that is conceptually very
+%% linked to the limiter concept. So we get the queue to hold a bit of
+%% state for us (#qstate.credits), and maintain a fiction that the
+%% limiter is making the decisions...
+
+record_send_q(CTag, Credits) ->
+ case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{credit = Credit, drain = Drain}} ->
+ update_credit(CTag, Credit - 1, Drain, Credits);
+ none ->
+ Credits
+ end.
+
+update_credit(CTag, Credit, Drain, Credits) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ Drain1 = Drain andalso Credit > 0,
+ gb_trees:enter(CTag, #credit{credit = Credit, drain = Drain1}, Credits).
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #lim{}}.
+init([]) -> {ok, #lim{}}.
+
+prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
+prioritise_call(_Msg, _From, _Len, _State) -> 0.
-prioritise_call(get_limit, _From, _Len, _State) -> 9;
-prioritise_call(_Msg, _From, _Len, _State) -> 0.
+handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
+ {reply, ok, State#lim{ch_pid = ChPid}};
+
+handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) ->
+ %% assertion
+ true = State#lim.prefetch_count == 0 orelse
+ State#lim.volume == UnackedCount,
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount,
+ volume = UnackedCount})};
+
+handle_call(unlimit_prefetch, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
+ volume = 0})};
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State) ->
+ {reply, ok, maybe_notify(State, State#lim{blocked = false})};
+
+handle_call(get_prefetch_limit, _From,
+ State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State};
handle_call({can_send, QPid, _AckRequired}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case prefetch_limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end;
-
-handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State};
-
-handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
- {cont, State1} ->
- {reply, ok, State1};
- {stop, State1} ->
- {reply, {disabled, Token#token{enabled = false}}, State1}
- end;
-
-handle_call(is_blocked, _From, State) ->
- {reply, blocked(State), State};
-
-handle_call({enable, Token, Channel, Volume}, _From, State) ->
- {reply, Token#token{enabled = true},
- State#lim{ch_pid = Channel, volume = Volume}};
-handle_call({disable, Token}, _From, State) ->
- {reply, Token#token{enabled = false}, State}.
+ end.
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
- {noreply, State1};
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
{noreply, remember_queue(QPid, State)};
@@ -199,27 +383,13 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
- true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
- end, NewState1};
- false -> {cont, NewState}
+ case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
end.
-maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) ->
- gen_server2:call(Pid, Call, infinity);
-maybe_call(_, _Call, Default) ->
- Default.
-
-maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
- gen_server2:cast(Pid, Cast);
-maybe_cast(_, _Call) ->
- ok.
-
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
blocked(#lim{blocked = Blocked}) -> Blocked.
@@ -231,10 +401,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:find(QPid, Queues) of
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
State#lim{queues = orddict:erase(QPid, Queues)};
error -> State
end.
@@ -251,13 +420,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
- 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
+ 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
%% being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3]
+ [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3]
|| L3 <- [L2, L1]],
ok
end,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 05036d35..4fb1fc3b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -32,6 +32,8 @@
[policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
{ok, SPid} when is_pid(SPid) ->
+ maybe_auto_sync(Q),
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
@@ -235,13 +238,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +252,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +299,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -302,6 +316,14 @@ is_mirrored(Q) ->
_ -> false
end.
+maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+ case policy(<<"ha-sync-mode">>, Q) of
+ <<"automatic">> ->
+ spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
+ _ ->
+ ok
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
- OldNodes = All(actual_queue_nodes(OldQ)),
- NewNodes = All(suggested_queue_nodes(NewQ)),
- add_mirrors(QName, NewNodes -- OldNodes),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ OldNodes = [OldMNode | OldSNodes],
+ NewNodes = [NewMNode | NewSNodes],
+ add_mirrors (QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
+ maybe_auto_sync(NewQ),
ok.
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)).
+ case validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)) of
+ ok -> case proplists:get_value(
+ <<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end;
+ E -> E
+ end.
validate_policy(<<"all">>, none) ->
ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8d5159e6..22edfcb6 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
+%% [1] - the arrival of this newly synced slave may cause the master to die if
+%% the admin has requested a migration-type change to policy.
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q = #amqqueue { sync_slave_pids = SSPids }] ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
- ok
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q1, Q2}
+ end
+ end) of
+ ok -> ok;
+ {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1]
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 010fd9ac..c39e898c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -677,6 +677,7 @@ remove_node_if_mnesia_running(Node) ->
%% change being propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
+ rabbit_amqqueue:forget_all_durable(Node),
rabbit_node_monitor:notify_left_cluster(Node),
ok;
{aborted, Reason} ->
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b8b03f56..b53c16bf 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2, socket_ends/2]).
+ connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -69,7 +69,6 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
--spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-spec(socket_ends/2 ::
@@ -189,13 +188,6 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
-tune_buffer_size(Sock) ->
- case getopts(Sock, [sndbuf, recbuf, buffer]) of
- {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- setopts(Sock, [{buffer, BufSz}]);
- Err -> Err
- end.
-
connection_string(Sock, Direction) ->
case socket_ends(Sock, Direction) of
{ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 71c2c80a..de53b7f0 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -24,7 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
--export([partitions/0]).
+-export([partitions/0, subscribe/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
--record(state, {monitors, partitions}).
+-record(state, {monitors, partitions, subscribers}).
%%----------------------------------------------------------------------------
@@ -54,6 +54,7 @@
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
-spec(partitions/0 :: () -> {node(), [node()]}).
+-spec(subscribe/1 :: (pid()) -> 'ok').
-endif.
@@ -179,6 +180,9 @@ notify_left_cluster(Node) ->
partitions() ->
gen_server:call(?SERVER, partitions, infinity).
+subscribe(Pid) ->
+ gen_server:cast(?SERVER, {subscribe, Pid}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -190,8 +194,9 @@ init([]) ->
%% happen.
process_flag(trap_exit, true),
{ok, _} = mnesia:subscribe(system),
- {ok, #state{monitors = pmon:new(),
- partitions = []}}.
+ {ok, #state{monitors = pmon:new(),
+ subscribers = pmon:new(),
+ partitions = []}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -232,23 +237,40 @@ handle_cast({left_cluster, Node}, State) ->
write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)}),
{noreply, State};
+handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
+ {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
- State = #state{monitors = Monitors}) ->
+ State = #state{monitors = Monitors, subscribers = Subscribers}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+ [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
+ {noreply, handle_dead_rabbit_state(
+ State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state{subscribers = Subscribers}) ->
+ {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
- State = #state{partitions = Partitions}) ->
+ State = #state{partitions = Partitions,
+ monitors = Monitors}) ->
+ %% We will not get a node_up from this node - yet we should treat it as
+ %% up (mostly).
+ State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> State;
+ false -> State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ end,
+ ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -270,7 +292,65 @@ handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node).
+ ok = rabbit_mnesia:on_node_down(Node),
+ case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, pause_minority} ->
+ case majority() of
+ true -> ok;
+ false -> await_cluster_recovery()
+ end;
+ {ok, ignore} ->
+ ok;
+ {ok, Term} ->
+ rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
+ "assuming 'ignore'~n", [Term]),
+ ok
+ end,
+ ok.
+
+majority() ->
+ length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
+%% when partitioned.
+alive_nodes() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ [N || N <- Nodes, pong =:= net_adm:ping(N)].
+
+await_cluster_recovery() ->
+ rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
+ []),
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ spawn(fun () ->
+ %% If our group leader is inside an application we are about
+ %% to stop, application:stop/1 does not return.
+ group_leader(whereis(init), self()),
+ %% Ensure only one restarting process at a time, will
+ %% exit(badarg) (harmlessly) if one is already running
+ register(rabbit_restarting_process, self()),
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+
+wait_for_cluster_recovery(Nodes) ->
+ case majority() of
+ true -> rabbit:start();
+ false -> timer:sleep(1000),
+ wait_for_cluster_recovery(Nodes)
+ end.
+
+handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+ %% If we have been partitioned, and we are now in the only remaining
+ %% partition, we no longer care about partitions - forget them. Note
+ %% that we do not attempt to deal with individual (other) partitions
+ %% going away. It's only safe to forget anything about partitions when
+ %% there are no partitions.
+ Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of
+ [] -> [];
+ _ -> Partitions
+ end,
+ State#state{partitions = Partitions1}.
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index 39d0188c..a4bd5042 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -16,7 +16,7 @@
-module(rabbit_parameter_validation).
--export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3, enum/1]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -73,3 +73,15 @@ proplist(Name, Constraints, Term) when is_list(Term) ->
proplist(Name, _Constraints, Term) ->
{error, "~s not a list ~p", [Name, Term]}.
+
+enum(OptionsA) ->
+ Options = [list_to_binary(atom_to_list(O)) || O <- OptionsA],
+ fun (Name, Term) when is_binary(Term) ->
+ case lists:member(Term, Options) of
+ true -> ok;
+ false -> {error, "~s should be one of ~p, actually was ~p",
+ [Name, Options, Term]}
+ end;
+ (Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}
+ end.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index e712078b..7398cd2d 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -26,7 +26,7 @@
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
@@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(_VHost, <<"policy">>, _Name) ->
- ok.
-
notify(VHost, <<"policy">>, _Name, _Term) ->
update_policies(VHost).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3841b680..ea70208f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -162,7 +162,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_msg_ids }).
+ max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -190,7 +190,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: gb_set()
+ unconfirmed :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -210,7 +210,7 @@
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
--spec(needs_sync/1 :: (qistate()) -> boolean()).
+-spec(needs_sync/1 :: (qistate()) -> 'confirms' | 'other' | 'false').
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
{[{rabbit_types:msg_id(), seq_id(),
@@ -269,13 +269,16 @@ delete_and_terminate(State) ->
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ State = #qistate { unconfirmed = Unconfirmed })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- State #qistate {
- unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
+ case MsgProps#message_properties.needs_confirming of
+ true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
+ State #qistate { unconfirmed = Unconfirmed1 };
+ false -> State
+ end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -302,8 +305,14 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
needs_sync(#qistate { journal_handle = undefined }) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl }) ->
- file_handle_cache:needs_sync(JournalHdl).
+needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
+ case gb_sets:is_empty(UC) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -398,7 +407,7 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = gb_sets:new() }.
+ unconfirmed = gb_sets:new() }.
clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -607,19 +616,21 @@ add_to_journal(RelSeq, Action,
end};
add_to_journal(RelSeq, Action, JEntries) ->
- Val = case array:get(RelSeq, JEntries) of
- undefined ->
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end;
- ({Pub, no_del, no_ack}) when Action == del ->
- {Pub, del, no_ack};
- ({Pub, del, no_ack}) when Action == ack ->
- {Pub, del, ack}
- end,
- array:set(RelSeq, Val, JEntries).
+ case array:get(RelSeq, JEntries) of
+ undefined ->
+ array:set(RelSeq,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end, JEntries);
+ ({Pub, no_del, no_ack}) when Action == del ->
+ array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ ({no_pub, del, no_ack}) when Action == ack ->
+ array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ ({?PUB, del, no_ack}) when Action == ack ->
+ array:reset(RelSeq, JEntries)
+ end.
maybe_flush_journal(State = #qistate { dirty_count = DCount,
max_journal_entries = MaxJournal })
@@ -732,9 +743,12 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_msg_ids = gb_sets:new() }.
+notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State #qistate { unconfirmed = gb_sets:new() }
+ end.
%%----------------------------------------------------------------------------
%% segment manipulation
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index af7aac6f..61fac0e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -186,32 +186,37 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+socket_error(Reason) ->
+ log(error, "error on AMQP connection ~p: ~p (~s)~n",
+ [self(), Reason, rabbit_misc:format_inet_error(Reason)]).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
- [self(), Reason]),
+ {error, Reason} -> socket_error(Reason),
%% NB: this is tcp socket, even in case of ssl
rabbit_net:fast_close(Sock),
exit(normal)
end.
-name(Sock) ->
- socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
-
-socket_ends(Sock) ->
- socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-
start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- Name = name(Sock),
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Str} -> Str;
+ {error, enotconn} -> rabbit_net:fast_close(Sock),
+ exit(normal);
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(Sock),
+ exit(normal)
+ end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
- {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -245,7 +250,6 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never}},
try
- ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -295,26 +299,37 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
- {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
- closed -> case State#v1.connection_state of
- closed -> State;
- _ -> throw(connection_closed_abruptly)
- end;
- {error, Reason} -> throw({inet_error, Reason});
- {other, Other} -> handle_other(Other, Deb, State)
+ {data, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ ok;
+ closed ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+ {error, Reason} ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason});
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb,
+handle_other({conserve_resources, Conserve},
State = #v1{throttle = Throttle}) ->
Throttle1 = Throttle#throttle{conserve_resources = Conserve},
- recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+ control_throttle(State#v1{throttle = Throttle1});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -325,59 +340,62 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, State) ->
+ maybe_emit_stats(State),
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
- State;
-handle_other(handshake_timeout, Deb, State)
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, State) ->
+ maybe_emit_stats(State),
+ stop;
+handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, State) ->
+ maybe_emit_stats(State),
throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(ensure_stats, Deb, State) ->
- mainloop(Deb, ensure_stats_timer(State));
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+ control_throttle(State);
+handle_other(Other, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
@@ -437,13 +455,13 @@ close_connection(State = #v1{queue_collector = Collector,
handle_dependent_exit(ChPid, Reason, State) ->
case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, uncontrolled} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- maybe_close(control_throttle(State));
- {Channel, uncontrolled} ->
- maybe_close(handle_exception(control_throttle(State),
- Channel, Reason))
+ {undefined, controlled} -> State;
+ {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_Channel, controlled} -> maybe_close(control_throttle(State));
+ {Channel, uncontrolled} -> State1 = handle_exception(
+ State, Channel, Reason),
+ maybe_close(control_throttle(State1))
end.
terminate_channels() ->
@@ -636,7 +654,10 @@ process_frame(Frame, Channel, State) ->
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
@@ -774,7 +795,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
- auth_mechanism = AuthMechanism,
+ auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
@@ -837,8 +858,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -902,15 +922,14 @@ auth_mechanisms_binary(Sock) ->
auth_phase(Response,
State = #v1{connection = Connection =
#connection{protocol = Protocol,
- auth_mechanism = AuthMechanism,
+ auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
rabbit_misc:protocol_error(
access_refused, "~s login refused: ~s",
- [proplists:get_value(name, AuthMechanism:description()),
- io_lib:format(Msg, Args)]);
+ [Name, io_lib:format(Msg, Args)]);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
@@ -970,10 +989,8 @@ ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
-ic(auth_mechanism, #connection{auth_mechanism = none}) ->
- none;
-ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
- proplists:get_value(name, Mechanism:description());
+ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
+ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
@@ -1000,6 +1017,10 @@ cert_info(F, #v1{sock = Sock}) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 60419856..acdc2cff 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -84,12 +84,34 @@ internal_binary_to_type(TypeBin) when is_binary(TypeBin) ->
internal_register(Class, TypeName, ModuleName)
when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) ->
ok = sanity_check_module(class_module(Class), ModuleName),
- true = ets:insert(?ETS_NAME,
- {{Class, internal_binary_to_type(TypeName)}, ModuleName}),
+ RegArg = {{Class, internal_binary_to_type(TypeName)}, ModuleName},
+ true = ets:insert(?ETS_NAME, RegArg),
+ conditional_register(RegArg),
ok.
internal_unregister(Class, TypeName) ->
- true = ets:delete(?ETS_NAME, {Class, internal_binary_to_type(TypeName)}),
+ UnregArg = {Class, internal_binary_to_type(TypeName)},
+ conditional_unregister(UnregArg),
+ true = ets:delete(?ETS_NAME, UnregArg),
+ ok.
+
+%% register exchange decorator route callback only when implemented,
+%% in order to avoid unnecessary decorator calls on the fast
+%% publishing path
+conditional_register({{exchange_decorator, Type}, ModuleName}) ->
+ case erlang:function_exported(ModuleName, route, 2) of
+ true -> true = ets:insert(?ETS_NAME,
+ {{exchange_decorator_route, Type},
+ ModuleName});
+ false -> ok
+ end;
+conditional_register(_) ->
+ ok.
+
+conditional_unregister({exchange_decorator, Type}) ->
+ true = ets:delete(?ETS_NAME, {exchange_decorator_route, Type}),
+ ok;
+conditional_unregister(_) ->
ok.
sanity_check_module(ClassModule, Module) ->
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index 8a237105..6b62c974 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -23,8 +23,6 @@
-callback validate(rabbit_types:vhost(), binary(), binary(),
term()) -> validate_results().
--callback validate_clear(rabbit_types:vhost(), binary(),
- binary()) -> validate_results().
-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
@@ -35,7 +33,6 @@
behaviour_info(callbacks) ->
[
{validate, 4},
- {validate_clear, 3},
{notify, 4},
{notify_clear, 3}
];
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 2615372c..05520170 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
- list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3,
+ list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -40,12 +40,9 @@
-> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]).
--spec(list_strict/1 :: (binary() | '_')
- -> [rabbit_types:infos()] | 'not_found').
+-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]).
-spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_')
-> [rabbit_types:infos()]).
--spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_')
- -> [rabbit_types:infos()] | 'not_found').
-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
-> rabbit_types:infos() | 'not_found').
@@ -120,21 +117,13 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case clear_any0(VHost, Component, Name) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
-
-clear_any0(VHost, Component, Name) ->
- case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Name)) of
- ok -> mnesia_clear(VHost, Component, Name),
- Mod:notify_clear(VHost, Component, Name),
- ok;
- E -> E
- end;
- E -> E
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case lookup_component(Component) of
+ {ok, Mod} -> Mod:notify_clear(VHost, Component, Name);
+ _ -> ok
+ end
end.
mnesia_clear(VHost, Component, Name) ->
@@ -147,21 +136,14 @@ list() ->
[p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
-list(VHost) -> list(VHost, '_', []).
-list_strict(Component) -> list('_', Component, not_found).
-list(VHost, Component) -> list(VHost, Component, []).
-list_strict(VHost, Component) -> list(VHost, Component, not_found).
-
-list(VHost, Component, Default) ->
- case component_good(Component) of
- true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
- _ = '_'},
- [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
- mnesia:dirty_match_object(?TABLE, Match),
- Comp =/= <<"policy">> orelse
- Component =:= <<"policy">>];
- _ -> Default
- end.
+list(VHost) -> list(VHost, '_').
+list_component(Component) -> list('_', Component).
+
+list(VHost, Component) ->
+ Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'},
+ [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <-
+ mnesia:dirty_match_object(?TABLE, Match),
+ Comp =/= <<"policy">> orelse Component =:= <<"policy">>].
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
@@ -216,12 +198,6 @@ info_keys() -> [component, name, value].
%%---------------------------------------------------------------------------
-component_good('_') -> true;
-component_good(Component) -> case lookup_component(Component) of
- {ok, _} -> true;
- _ -> false
- end.
-
lookup_component(Component) ->
case rabbit_registry:lookup_module(
runtime_parameter, list_to_atom(binary_to_list(Component))) of
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index c27f1b4a..05c1dbc1 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -18,7 +18,7 @@
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok;
validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(_, <<"test">>, <<"good">>) -> ok;
-validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
-validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f5ea4fba..e7b69879 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -912,10 +912,10 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, {OldM, OldSs}, All),
+ Policy, Params, CurrentState, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;
@@ -923,28 +923,36 @@ test_dynamic_mirroring() ->
end
end,
- Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
%% Add a node
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed
- Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
passed.
@@ -1062,7 +1070,11 @@ test_runtime_parameters() ->
ok = control_action(clear_parameter, ["test", "maybe"]),
{error_string, _} =
control_action(clear_parameter, ["test", "neverexisted"]),
+
+ %% We can delete for a component that no longer exists
+ Good(["test", "good", "\"ignore\""]),
rabbit_runtime_parameters_test:unregister(),
+ ok = control_action(clear_parameter, ["test", "good"]),
passed.
test_policy_validation() ->
@@ -1082,25 +1094,20 @@ test_policy_validation() ->
{error_string, _} = SetPol("testpos", [-1, 0, 1]),
{error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun test_writer/0),
- {ok, Ch} = rabbit_channel:start_link(
- 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
- user(<<"user">>), <<"/">>, [], self(),
- rabbit_limiter:make_token(self())),
+ {_Writer, Limiter, Ch} = test_channel(),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
-
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, rabbit_limiter:make_token(),
- <<"ctag">>, true, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, none, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1178,8 +1185,6 @@ find_listener() ->
N =:= node()],
{H, P}.
-test_writer() -> test_writer(none).
-
test_writer(Pid) ->
receive
{'$gen_call', From, flush} -> gen_server:reply(From, ok),
@@ -1189,13 +1194,17 @@ test_writer(Pid) ->
shutdown -> ok
end.
-test_spawn() ->
+test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
+ {ok, Limiter} = rabbit_limiter:start_link(),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
- user(<<"guest">>), <<"/">>, [], Me,
- rabbit_limiter:make_token(self())),
+ user(<<"guest">>), <<"/">>, [], Me, Limiter),
+ {Writer, Limiter, Ch}.
+
+test_spawn() ->
+ {Writer, _Limiter, Ch} = test_channel(),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)
@@ -1567,7 +1576,7 @@ control_action(Command, Node, Args, Opts) ->
info_action(Command, Args, CheckVHost) ->
ok = control_action(Command, []),
- if CheckVHost -> ok = control_action(Command, []);
+ if CheckVHost -> ok = control_action(Command, [], ["-p", "/"]);
true -> ok
end,
ok = control_action(Command, lists:map(fun atom_to_list/1, Args)),
@@ -2709,12 +2718,13 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ {ok, Limiter} = rabbit_limiter:start_link(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -2735,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
+ {ok, Limiter} = rabbit_limiter:start_link(),
+
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 18cab48b..f7c6c729 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -262,8 +262,6 @@
durable,
transient_threshold,
- async_callback,
-
len,
persistent_count,
@@ -356,8 +354,6 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: rabbit_backing_queue:async_callback(),
-
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -426,7 +422,7 @@ init(Queue, Recover, AsyncCallback) ->
init(#amqqueue { name = QueueName, durable = IsDurable }, false,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
- init(IsDurable, IndexState, 0, [], AsyncCallback,
+ init(IsDurable, IndexState, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
@@ -454,7 +450,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
rabbit_msg_store:contains(MsgId, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1, AsyncCallback,
+ init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
terminate(_Reason, State) ->
@@ -772,24 +768,18 @@ ram_duration(State = #vqstate {
needs_timeout(State = #vqstate { index_state = IndexState,
target_ram_count = TargetRamCount }) ->
- case must_sync_index(State) of
- true -> timed;
- false ->
- case rabbit_queue_index:needs_sync(IndexState) of
- true -> idle;
- false -> case TargetRamCount of
- infinity -> false;
- _ -> case
- reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end
- end
- end
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false when TargetRamCount == infinity -> false;
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end.
timeout(State = #vqstate { index_state = IndexState }) ->
@@ -883,8 +873,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a msg_id to the unconfirmed set
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, IsDelivered, SeqId,
Msg = #basic_message {id = MsgId}, MsgProps) ->
@@ -1011,7 +1000,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
+init(IsDurable, IndexState, DeltaCount, Terms,
PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
@@ -1037,8 +1026,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
durable = IsDurable,
transient_threshold = NextSeqId,
- async_callback = AsyncCallback,
-
len = DeltaCount1,
persistent_count = DeltaCount1,
@@ -1338,21 +1325,6 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
confirmed = gb_sets:union(C, MsgIdSet) }.
-must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- %% If UC is empty then by definition, MIOD and MOD are also empty
- %% and there's nothing that can be pending a sync.
-
- %% If UC is not empty, then we want to find is_empty(UC - MIOD),
- %% but the subtraction can be expensive. Thus instead, we test to
- %% see if UC is a subset of MIOD. This can only be the case if
- %% MIOD == UC, which would indicate that every message in UC is
- %% also in MIOD and is thus _all_ pending on a msg_store sync, not
- %% on a qi sync. Thus the negation of this is sufficient. Because
- %% is_subset is short circuiting, this is more efficient than the
- %% subtraction.
- not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d2cbc41..2858cf58 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -70,6 +70,7 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
+ rabbit_event:notify(vhost_created, info(VHostPath)),
R.
delete(VHostPath) ->
@@ -87,6 +88,7 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
internal_delete(VHostPath) ->
@@ -95,9 +97,9 @@ internal_delete(VHostPath) ->
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
[ok = rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info),
- proplists:get_value(key, Info))
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 0248f878..2725be31 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,21 +55,30 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
inet_db:register_socket(Sock, Mod),
%% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain(),
+ case tune_buffer_size(Sock) of
+ ok -> file_handle_cache:transfer(
+ apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain();
+ {error, enotconn} -> catch port_close(Sock);
+ {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock),
+ error_logger:error_msg(
+ "failed to tune buffer size of "
+ "connection accepted on ~s:~p - ~p (~s)~n",
+ [rabbit_misc:ntoab(IPAddress), Port,
+ Err, rabbit_misc:format_inet_error(Err)]),
+ catch port_close(Sock)
+ end,
%% accept more
accept(State);
-handle_info({inet_async, LSock, Ref, {error, closed}},
- State=#state{sock=LSock, ref=Ref}) ->
- %% It would be wrong to attempt to restart the acceptor when we
- %% know this will fail.
- {stop, normal, State};
-
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {stop, {accept_failed, Reason}, State};
+ case Reason of
+ closed -> {stop, normal, State}; %% listening socket closed
+ econnaborted -> accept(State); %% client sent RST before we accepted
+ _ -> {stop, {accept_failed, Reason}, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -87,3 +96,10 @@ accept(State = #state{sock=LSock}) ->
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.
+
+tune_buffer_size(Sock) ->
+ case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of
+ {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ inet:setopts(Sock, [{buffer, BufSz}]);
+ Error -> Error
+ end.