summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:42:41 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-21 10:42:41 +0000
commite222e21a4a18a674a82a90f8e23acbfb42d2fc02 (patch)
tree4e29c1d677aa3819d606c94072c2d363b9872ddb
parent3a9022f793632b7ab779cc0dedfe1083f6b85cd9 (diff)
parent904c58e4fcce4938c6a8725f484e2b78813c6986 (diff)
downloadrabbitmq-server-bug23378.tar.gz
Merge in defaultbug23378
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--packaging/debs/apt-repository/distributions2
-rw-r--r--packaging/standalone/Makefile82
-rw-r--r--packaging/standalone/erl.diff5
-rw-r--r--packaging/standalone/src/rabbit_release.erl152
-rw-r--r--scripts/rabbitmq-defaults6
-rwxr-xr-xscripts/rabbitmq-plugins3
-rwxr-xr-xscripts/rabbitmq-server7
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_alarm.erl14
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_binding.erl30
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_disk_monitor.erl7
-rw-r--r--src/rabbit_error_logger_file_h.erl3
-rw-r--r--src/rabbit_exchange.erl57
-rw-r--r--src/rabbit_exchange_decorator.erl18
-rw-r--r--src/rabbit_exchange_type.erl17
-rw-r--r--src/rabbit_exchange_type_direct.erl8
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl41
-rw-r--r--src/rabbit_exchange_type_invalid.erl8
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_node_monitor.erl98
-rw-r--r--src/rabbit_reader.erl39
-rw-r--r--src/rabbit_registry.erl11
-rw-r--r--src/rabbit_runtime_parameters.erl36
-rw-r--r--src/rabbit_tests.erl1
-rw-r--r--src/tcp_acceptor.erl34
32 files changed, 614 insertions, 170 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index ad961a44..339fa69e 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -44,6 +44,7 @@
{log_levels, [{connection, info}]},
{ssl_cert_login_from, distinguished_name},
{reverse_dns_lookups, false},
+ {cluster_partition_handling, ignore},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions
index 183eb034..75b9fe46 100644
--- a/packaging/debs/apt-repository/distributions
+++ b/packaging/debs/apt-repository/distributions
@@ -2,6 +2,6 @@ Origin: RabbitMQ
Label: RabbitMQ Repository for Debian / Ubuntu etc
Suite: testing
Codename: kitten
-Architectures: arm hppa ia64 mips mipsel s390 sparc i386 amd64 powerpc source
+Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source
Components: main
Description: RabbitMQ Repository for Debian / Ubuntu etc
diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile
new file mode 100644
index 00000000..89ccde93
--- /dev/null
+++ b/packaging/standalone/Makefile
@@ -0,0 +1,82 @@
+VERSION=0.0.0
+SOURCE_DIR=rabbitmq-server-$(VERSION)
+TARGET_DIR=rabbitmq_server-$(VERSION)
+TARGET_TARBALL=rabbitmq-server-$(OS)-standalone-$(VERSION)
+RLS_DIR=$(TARGET_DIR)/release/$(TARGET_DIR)
+
+ERTS_VSN=$(shell erl -noshell -eval 'io:format("~s", [erlang:system_info(version)]), halt().')
+ERTS_ROOT_DIR=$(shell erl -noshell -eval 'io:format("~s", [code:root_dir()]), halt().')
+
+# used to generate the erlang release
+RABBITMQ_HOME=$(TARGET_DIR)
+RABBITMQ_EBIN_ROOT=$(RABBITMQ_HOME)/ebin
+RABBITMQ_PLUGINS_DIR=$(RABBITMQ_HOME)/plugins
+RABBITMQ_PLUGINS_EXPAND_DIR=$(RABBITMQ_PLUGINS_DIR)/expand
+
+RABBITMQ_DEFAULTS=$(TARGET_DIR)/sbin/rabbitmq-defaults
+fix_defaults = sed -e $(1) $(RABBITMQ_DEFAULTS) > $(RABBITMQ_DEFAULTS).tmp \
+ && mv $(RABBITMQ_DEFAULTS).tmp $(RABBITMQ_DEFAULTS)
+
+dist:
+ tar -zxf ../../dist/$(SOURCE_DIR).tar.gz
+
+ $(MAKE) -C $(SOURCE_DIR) \
+ TARGET_DIR=`pwd`/$(TARGET_DIR) \
+ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
+ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \
+ install
+
+## Here we set the RABBITMQ_HOME variable,
+## then we make ERL_DIR point to our released erl
+## and we add the paths to our released start_clean and start_sasl boot scripts
+ $(call fix_defaults,'s:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:')
+ $(call fix_defaults,'s:^ERL_DIR=$$:ERL_DIR=\$${RABBITMQ_HOME}/erts-$(ERTS_VSN)/bin/:')
+ $(call fix_defaults,'s:start_clean$$:"\$${SYS_PREFIX}/releases/$(VERSION)/start_clean":')
+ $(call fix_defaults,'s:start_sasl:"\$${SYS_PREFIX}/releases/$(VERSION)/start_sasl":')
+
+ chmod 0755 $(RABBITMQ_DEFAULTS)
+
+ mkdir -p $(TARGET_DIR)/etc/rabbitmq
+
+ $(MAKE) generate_release
+
+ mkdir -p $(RLS_DIR)
+ tar -C $(RLS_DIR) -xzf $(RABBITMQ_HOME)/rabbit.tar.gz
+
+# add minimal boot file
+ cp $(ERTS_ROOT_DIR)/bin/start_clean.boot $(RLS_DIR)/releases/$(VERSION)
+ cp $(ERTS_ROOT_DIR)/bin/start_sasl.boot $(RLS_DIR)/releases/$(VERSION)
+
+# move rabbitmq files to top level folder
+ mv $(RLS_DIR)/lib/rabbit-$(VERSION)/* $(RLS_DIR)
+
+# remove empty lib/rabbit-$(VERSION) folder
+ rm -rf $(RLS_DIR)/lib/rabbit-$(VERSION)
+
+# fix Erlang ROOTDIR
+ patch -o $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl.src < erl.diff
+
+ tar -zcf $(TARGET_TARBALL).tar.gz -C $(TARGET_DIR)/release $(TARGET_DIR)
+ rm -rf $(SOURCE_DIR) $(TARGET_DIR)
+
+clean: clean_partial
+ rm -f rabbitmq-server-$(OS)-standalone-*.tar.gz
+
+clean_partial:
+ rm -rf $(SOURCE_DIR)
+ rm -rf $(TARGET_DIR)
+
+.PHONY : generate_release
+generate_release:
+ erlc \
+ -I $(TARGET_DIR)/include/ -o src -Wall \
+ -v +debug_info -Duse_specs -Duse_proper_qc \
+ -pa $(TARGET_DIR)/ebin/ src/rabbit_release.erl
+ erl \
+ -pa "$(RABBITMQ_EBIN_ROOT)" \
+ -pa src \
+ -noinput \
+ -hidden \
+ -s rabbit_release \
+ -extra "$(RABBITMQ_PLUGINS_DIR)" "$(RABBITMQ_PLUGINS_EXPAND_DIR)" "$(RABBITMQ_HOME)"
+ rm src/rabbit_release.beam
diff --git a/packaging/standalone/erl.diff b/packaging/standalone/erl.diff
new file mode 100644
index 00000000..c51bfe22
--- /dev/null
+++ b/packaging/standalone/erl.diff
@@ -0,0 +1,5 @@
+20c20,21
+< ROOTDIR="%FINAL_ROOTDIR%"
+---
+> realpath() { [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" ; }
+> ROOTDIR="$(dirname `realpath $0`)/../.."
diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl
new file mode 100644
index 00000000..26f36d68
--- /dev/null
+++ b/packaging/standalone/src/rabbit_release.erl
@@ -0,0 +1,152 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+-module(rabbit_release).
+
+-export([start/0]).
+
+-include("rabbit.hrl").
+
+-define(BaseApps, [rabbit]).
+-define(ERROR_CODE, 1).
+
+%% We need to calculate all the ERTS apps we need to ship with a
+%% standalone rabbit. To acomplish that we need to unpack and load the plugins
+%% apps that are shiped with rabbit.
+%% Once we get that we generate an erlang release inside a tarball.
+%% Our make file will work with that release to generate our final rabbitmq
+%% package.
+start() ->
+ %% Determine our various directories
+ [PluginsDistDir, UnpackedPluginDir, RabbitHome] =
+ init:get_plain_arguments(),
+ RootName = UnpackedPluginDir ++ "/rabbit",
+
+ %% extract the plugins so we can load their apps later
+ prepare_plugins(PluginsDistDir, UnpackedPluginDir),
+
+ %% add the plugin ebin folder to the code path.
+ add_plugins_to_path(UnpackedPluginDir),
+
+ PluginAppNames = [P#plugin.name ||
+ P <- rabbit_plugins:list(PluginsDistDir)],
+
+ %% Build the entire set of dependencies - this will load the
+ %% applications along the way
+ AllApps = case catch sets:to_list(expand_dependencies(PluginAppNames)) of
+ {failed_to_load_app, App, Err} ->
+ terminate("failed to load application ~s:~n~p",
+ [App, Err]);
+ AppList ->
+ AppList
+ end,
+
+ %% we need a list of ERTS apps we need to ship with rabbit
+ BaseApps = AllApps -- PluginAppNames,
+
+ AppVersions = [determine_version(App) || App <- BaseApps],
+ RabbitVersion = proplists:get_value(rabbit, AppVersions),
+
+ %% Build the overall release descriptor
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
+ {erts, erlang:system_info(version)},
+ AppVersions},
+
+ %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
+ rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
+
+ %% Compile the script
+ systools:make_script(RootName),
+ systools:script2boot(RootName),
+ %% Make release tarfile
+ make_tar(RootName, RabbitHome),
+ rabbit_misc:quit(0).
+
+make_tar(Release, RabbitHome) ->
+ systools:make_tar(Release,
+ [
+ {dirs, [docs, etc, include, plugins, sbin, share]},
+ {erts, code:root_dir()},
+ {outdir, RabbitHome}
+ ]).
+
+determine_version(App) ->
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
+ {App, Vsn}.
+
+delete_recursively(Fn) ->
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
+ end.
+
+prepare_plugins(PluginsDistDir, DestDir) ->
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(DestDir) of
+ ok -> ok;
+ {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E])
+ end,
+ case filelib:ensure_dir(DestDir ++ "/") of
+ ok -> ok;
+ {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2])
+ end,
+
+ [prepare_plugin(Plugin, DestDir) ||
+ Plugin <- rabbit_plugins:list(PluginsDistDir)].
+
+prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) ->
+ zip:unzip(Location, [{cwd, PluginDestDir}]);
+prepare_plugin(#plugin{type = dir, name = Name, location = Location},
+ PluginsDestDir) ->
+ rabbit_file:recursive_copy(Location,
+ filename:join([PluginsDestDir, Name])).
+
+expand_dependencies(Pending) ->
+ expand_dependencies(sets:new(), Pending).
+expand_dependencies(Current, []) ->
+ Current;
+expand_dependencies(Current, [Next|Rest]) ->
+ case sets:is_element(Next, Current) of
+ true ->
+ expand_dependencies(Current, Rest);
+ false ->
+ case application:load(Next) of
+ ok ->
+ ok;
+ {error, {already_loaded, _}} ->
+ ok;
+ {error, Reason} ->
+ throw({failed_to_load_app, Next, Reason})
+ end,
+ {ok, Required} = application:get_key(Next, applications),
+ Unique = [A || A <- Required, not(sets:is_element(A, Current))],
+ expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
+ end.
+
+add_plugins_to_path(PluginDir) ->
+ [add_plugin_to_path(PluginName) ||
+ PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
+
+add_plugin_to_path(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN).
+
+terminate(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ rabbit_misc:quit(?ERROR_CODE).
diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults
index db1d4f2b..83c5639d 100644
--- a/scripts/rabbitmq-defaults
+++ b/scripts/rabbitmq-defaults
@@ -18,6 +18,12 @@
### next line potentially updated in package install steps
SYS_PREFIX=
+### next line will be updated when generating a standalone release
+ERL_DIR=
+
+CLEAN_BOOT_FILE=start_clean
+SASL_BOOT_FILE=start_sasl
+
## Set default values
CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 43f450c0..c043c90a 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -26,11 +26,12 @@
##--- End of overridden <var_name> variables
-exec erl \
+exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
-sname rabbitmq-plugins$$ \
+ -boot "${CLEAN_BOOT_FILE}" \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 184ae931..161ec2e6 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -82,7 +82,8 @@ case "$(uname -s)" in
esac
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
-if ! erl -pa "$RABBITMQ_EBIN_ROOT" \
+if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
+ -boot "${CLEAN_BOOT_FILE}" \
-noinput \
-hidden \
-s rabbit_prelaunch \
@@ -103,11 +104,11 @@ RABBITMQ_LISTEN_ARG=
# there is no other way of preventing their expansion.
set -f
-exec erl \
+exec ${ERL_DIR}erl \
-pa ${RABBITMQ_EBIN_ROOT} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot start_sasl \
+ -boot "${SASL_BOOT_FILE}" \
${RABBITMQ_CONFIG_ARG} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 00fffa9f..0368db3f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -26,12 +26,13 @@
##--- End of overridden <var_name> variables
-exec erl \
+exec ${ERL_DIR}erl \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
+ -boot "${CLEAN_BOOT_FILE}" \
-s rabbit_control_main \
-nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f3ba022a..3cfa21ba 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -236,7 +236,7 @@
{memory, any()}]).
-spec(is_running/0 :: () -> boolean()).
-spec(is_running/1 :: (node()) -> boolean()).
--spec(environment/0 :: () -> [{param() | term()}]).
+-spec(environment/0 :: () -> [{param(), term()}]).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(force_event_refresh/0 :: () -> 'ok').
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 362b11aa..6d24d130 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -208,11 +208,19 @@ internal_register(Pid, {M, F, A} = AlertMFA,
State#alarms{alertees = NewAlertees}.
handle_set_alarm({{resource_limit, Source, Node}, []}, State) ->
- rabbit_log:warning("~s resource limit alarm set on node ~p~n",
- [Source, Node]),
+ rabbit_log:warning(
+ "~s resource limit alarm set on node ~p.~n~n"
+ "**********************************************************~n"
+ "*** Publishers will be blocked until this alarm clears ***~n"
+ "**********************************************************~n",
+ [Source, Node]),
{ok, maybe_alert(fun dict:append/3, Node, Source, State)};
handle_set_alarm({file_descriptor_limit, []}, State) ->
- rabbit_log:warning("file descriptor limit alarm set~n"),
+ rabbit_log:warning(
+ "file descriptor limit alarm set.~n~n"
+ "********************************************************************~n"
+ "*** New connections will not be accepted until this alarm clears ***~n"
+ "********************************************************************~n"),
{ok, State};
handle_set_alarm(Alarm, State) ->
rabbit_log:warning("alarm '~p' set~n", [Alarm]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ae7fe5c5..82ac74fa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -407,7 +407,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1d332f67..0d5fd304 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -244,7 +245,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -256,6 +258,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -559,27 +563,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- State3 = State2#q{backing_queue_state = BQS1},
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
- %% we only do that IFF the new message ends up at the head
- %% of the queue (because the queue was empty) and has an
- %% expiry. Only then may it need expiring straight away,
- %% or, if expiry is not due yet, the expiry timer may need
- %% (re)scheduling.
- case {IsEmpty, Props#message_properties.expiry} of
- {false, _} -> State3;
- {true, undefined} -> State3;
- {true, _} -> drop_expired_msgs(State3)
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
+ end.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(drop_expired_msgs(State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -763,6 +790,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6096e07b..cb86e5ae 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -40,8 +40,11 @@
[{'not_found', (rabbit_types:binding_source() |
rabbit_types:binding_destination())} |
{'absent', rabbit_types:amqqueue()}]})).
+
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
- rabbit_types:error('binding_not_found')).
+ rabbit_types:error(
+ 'binding_not_found' |
+ {'binding_invalid', string(), [any()]})).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
@@ -157,15 +160,22 @@ add(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(Src, Dst) of
- ok -> case mnesia:read({rabbit_route, B}) of
- [] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
- end;
- {error, _} = Err -> rabbit_misc:const(Err)
+ case rabbit_exchange:validate_binding(Src, B) of
+ ok ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
end
end).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7b185568..bf3a9fd7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1196,6 +1196,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(DestinationName)]);
+ {error, {binding_invalid, Fmt, Args}} ->
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b396b289..3bb163a1 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -31,6 +31,7 @@
-record(state, {dir,
limit,
+ actual,
timeout,
timer,
alarmed
@@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
- {reply, get_disk_free(Dir), State};
+handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
+ {reply, Actual, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
- State #state {alarmed = NewAlarmed}.
+ State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 3efc9c0c..eb6247e0 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -76,6 +76,9 @@ init_file(File, PrevHandler) ->
Error -> Error
end.
+%% filter out "application: foo; exited: stopped; type: temporary"
+handle_event({info_report, _, {_, std_info, _}}, State) ->
+ {ok, State};
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 88033f77..5f4fb9ec 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- route/2, delete/2]).
+ route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
@@ -83,6 +83,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
+-spec(validate_binding/2 ::
+ (rabbit_types:exchange(), rabbit_types:binding())
+ -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
@@ -117,14 +120,18 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
[ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
- M <- decorators()],
+ M <- registry_lookup(exchange_decorator)],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+policy_changed(X = #exchange{type = XType}, X1) ->
+ [ok = M:policy_changed(X, X1) ||
+ M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+ ok.
serialise_events(X = #exchange{type = Type}) ->
- lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ lists:any(fun (M) -> M:serialise_events(X) end,
+ registry_lookup(exchange_decorator))
orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
@@ -136,8 +143,15 @@ serial(#exchange{name = XName} = X) ->
(false) -> none
end.
-decorators() ->
- [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
+registry_lookup(exchange_decorator_route = Class) ->
+ case get(exchange_decorator_route_modules) of
+ undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)],
+ put(exchange_decorator_route_modules, Mods),
+ Mods;
+ Mods -> Mods
+ end;
+registry_lookup(Class) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(Class)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = rabbit_policy:set(#exchange{name = XName,
@@ -304,16 +318,26 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-%% Optimisation
-route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
- #delivery{message = #basic_message{routing_keys = RKs}}) ->
- [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+route(#exchange{name = #resource{virtual_host = VHost,
+ name = RName} = XName} = X,
+ #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) ->
+ case {registry_lookup(exchange_decorator_route), RName == <<"">>} of
+ {[], true} ->
+ %% Optimisation
+ [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+ {Decorators, _} ->
+ QNames = route1(Delivery, {[X], XName, []}),
+ lists:usort(decorate_route(Decorators, X, Delivery, QNames))
+ end.
-route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {[X], XName, []}).
+decorate_route([], _X, _Delivery, QNames) ->
+ QNames;
+decorate_route(Decorators, X, Delivery, QNames) ->
+ QNames ++
+ lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]).
route1(_, {[], _, QNames}) ->
- lists:usort(QNames);
+ QNames;
route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
DstNames = process_alternate(
X, ((type_to_module(Type)):route(X, Delivery))),
@@ -381,6 +405,10 @@ delete(XName, IfUnused) ->
end
end).
+validate_binding(X = #exchange{type = XType}, Binding) ->
+ Module = type_to_module(XType),
+ Module:validate_binding(X, Binding).
+
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
@@ -422,8 +450,7 @@ peek_serial(XName, LockType) ->
end.
invalid_module(T) ->
- rabbit_log:warning(
- "Could not find exchange type ~s.~n", [T]),
+ rabbit_log:warning("Could not find exchange type ~s.~n", [T]),
put({xtype_to_module, T}, rabbit_exchange_type_invalid),
rabbit_exchange_type_invalid.
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index befbc462..8f17adfc 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -21,9 +21,8 @@
%% 1) It applies to all exchanges as soon as it is installed, therefore
%% 2) It is not allowed to affect validation, so no validate/1 or
%% assert_args_equivalence/2
-%% 3) It also can't affect routing
%%
-%% It's possible in the future we might relax 3), or even make these
+%% It's possible in the future we might make decorators
%% able to manipulate messages as they are published.
-ifdef(use_specs).
@@ -46,6 +45,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -54,9 +57,12 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called when the policy attached to this exchange changes.
--callback policy_changed (
- serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+%% called after exchange routing
+%% return value is a list of queues to be added to the list of
+%% destination queues. decorators must register separately for
+%% this callback using exchange_decorator_route.
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ [rabbit_amqqueue:name()].
-else.
@@ -64,7 +70,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1fbcb2d8..ebc59501 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -37,6 +37,10 @@
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
-callback validate(rabbit_types:exchange()) -> 'ok'.
+%% called BEFORE declaration, to check args etc
+-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) ->
+ rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
+
%% called after declaration and recovery
-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
@@ -44,6 +48,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -58,18 +66,15 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
-%% called when the policy attached to this exchange changes.
--callback policy_changed(serial(), rabbit_types:exchange(),
- rabbit_types:exchange()) -> 'ok'.
-
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
+ [{description, 0}, {serialise_events, 0}, {route, 2},
+ {validate, 1}, {validate_binding, 2}, {policy_changed, 2},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}, {policy_changed, 3}];
+ {assert_args_equivalence, 2}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 213b24c4..10a79c55 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,8 +20,9 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
@@ -40,9 +41,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 5b17ed56..3ebd8548 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -39,9 +40,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 75899160..cf2d3140 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -50,14 +51,24 @@ route(#exchange{name = Name},
rabbit_router:match_bindings(
Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
-default_headers_match_kind() -> all.
+validate_binding(_X, #binding{args = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-match">>) of
+ {longstr, <<"all">>} -> ok;
+ {longstr, <<"any">>} -> ok;
+ {longstr, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field value ~p; "
+ "expected all or any", [Other]}};
+ {Type, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field type ~p (value ~p); "
+ "expected longstr", [Type, Other]}};
+ undefined -> {error,
+ {binding_invalid, "x-match field missing", []}}
+ end.
parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
+parse_x_match(<<"any">>) -> any.
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
@@ -68,17 +79,9 @@ parse_x_match(Other) ->
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
+headers_match(Args, Data) ->
+ {longstr, MK} = rabbit_misc:table_lookup(Args, <<"x-match">>),
+ headers_match(Args, Data, true, false, parse_x_match(MK)).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
AllMatch;
@@ -115,7 +118,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 6b07351a..07a8004a 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,8 +20,9 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
description() ->
[{description,
@@ -41,9 +42,10 @@ route(#exchange{name = Name, type = Type}, _) ->
[rabbit_misc:rs(Name), Type]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index bd8ad1ac..ce76ccb0 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -47,6 +48,7 @@ route(#exchange{name = X},
end || RKey <- Routes]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
@@ -57,7 +59,7 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b8b03f56..b53c16bf 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2, socket_ends/2]).
+ connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -69,7 +69,6 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
--spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-spec(socket_ends/2 ::
@@ -189,13 +188,6 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
-tune_buffer_size(Sock) ->
- case getopts(Sock, [sndbuf, recbuf, buffer]) of
- {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- setopts(Sock, [{buffer, BufSz}]);
- Err -> Err
- end.
-
connection_string(Sock, Direction) ->
case socket_ends(Sock, Direction) of
{ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 71c2c80a..de53b7f0 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -24,7 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
--export([partitions/0]).
+-export([partitions/0, subscribe/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
--record(state, {monitors, partitions}).
+-record(state, {monitors, partitions, subscribers}).
%%----------------------------------------------------------------------------
@@ -54,6 +54,7 @@
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
-spec(partitions/0 :: () -> {node(), [node()]}).
+-spec(subscribe/1 :: (pid()) -> 'ok').
-endif.
@@ -179,6 +180,9 @@ notify_left_cluster(Node) ->
partitions() ->
gen_server:call(?SERVER, partitions, infinity).
+subscribe(Pid) ->
+ gen_server:cast(?SERVER, {subscribe, Pid}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -190,8 +194,9 @@ init([]) ->
%% happen.
process_flag(trap_exit, true),
{ok, _} = mnesia:subscribe(system),
- {ok, #state{monitors = pmon:new(),
- partitions = []}}.
+ {ok, #state{monitors = pmon:new(),
+ subscribers = pmon:new(),
+ partitions = []}}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
@@ -232,23 +237,40 @@ handle_cast({left_cluster, Node}, State) ->
write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)}),
{noreply, State};
+handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
+ {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
- State = #state{monitors = Monitors}) ->
+ State = #state{monitors = Monitors, subscribers = Subscribers}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+ [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
+ {noreply, handle_dead_rabbit_state(
+ State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state{subscribers = Subscribers}) ->
+ {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
- State = #state{partitions = Partitions}) ->
+ State = #state{partitions = Partitions,
+ monitors = Monitors}) ->
+ %% We will not get a node_up from this node - yet we should treat it as
+ %% up (mostly).
+ State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> State;
+ false -> State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ end,
+ ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -270,7 +292,65 @@ handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node).
+ ok = rabbit_mnesia:on_node_down(Node),
+ case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, pause_minority} ->
+ case majority() of
+ true -> ok;
+ false -> await_cluster_recovery()
+ end;
+ {ok, ignore} ->
+ ok;
+ {ok, Term} ->
+ rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
+ "assuming 'ignore'~n", [Term]),
+ ok
+ end,
+ ok.
+
+majority() ->
+ length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
+%% when partitioned.
+alive_nodes() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ [N || N <- Nodes, pong =:= net_adm:ping(N)].
+
+await_cluster_recovery() ->
+ rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
+ []),
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ spawn(fun () ->
+ %% If our group leader is inside an application we are about
+ %% to stop, application:stop/1 does not return.
+ group_leader(whereis(init), self()),
+ %% Ensure only one restarting process at a time, will
+ %% exit(badarg) (harmlessly) if one is already running
+ register(rabbit_restarting_process, self()),
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+
+wait_for_cluster_recovery(Nodes) ->
+ case majority() of
+ true -> rabbit:start();
+ false -> timer:sleep(1000),
+ wait_for_cluster_recovery(Nodes)
+ end.
+
+handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+ %% If we have been partitioned, and we are now in the only remaining
+ %% partition, we no longer care about partitions - forget them. Note
+ %% that we do not attempt to deal with individual (other) partitions
+ %% going away. It's only safe to forget anything about partitions when
+ %% there are no partitions.
+ Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of
+ [] -> [];
+ _ -> Partitions
+ end,
+ State#state{partitions = Partitions1}.
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ab952cd8..61fac0e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -186,32 +186,37 @@ server_capabilities(_) ->
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+socket_error(Reason) ->
+ log(error, "error on AMQP connection ~p: ~p (~s)~n",
+ [self(), Reason, rabbit_misc:format_inet_error(Reason)]).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
- [self(), Reason]),
+ {error, Reason} -> socket_error(Reason),
%% NB: this is tcp socket, even in case of ssl
rabbit_net:fast_close(Sock),
exit(normal)
end.
-name(Sock) ->
- socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
-
-socket_ends(Sock) ->
- socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
-
start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- Name = name(Sock),
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Str} -> Str;
+ {error, enotconn} -> rabbit_net:fast_close(Sock),
+ exit(normal);
+ {error, Reason} -> socket_error(Reason),
+ rabbit_net:fast_close(Sock),
+ exit(normal)
+ end,
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
- {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -245,7 +250,6 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb,
last_blocked_by = none,
last_blocked_at = never}},
try
- ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -791,7 +795,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
- auth_mechanism = AuthMechanism,
+ auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
@@ -918,15 +922,14 @@ auth_mechanisms_binary(Sock) ->
auth_phase(Response,
State = #v1{connection = Connection =
#connection{protocol = Protocol,
- auth_mechanism = AuthMechanism,
+ auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
rabbit_misc:protocol_error(
access_refused, "~s login refused: ~s",
- [proplists:get_value(name, AuthMechanism:description()),
- io_lib:format(Msg, Args)]);
+ [Name, io_lib:format(Msg, Args)]);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
@@ -986,10 +989,8 @@ ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
-ic(auth_mechanism, #connection{auth_mechanism = none}) ->
- none;
-ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
- proplists:get_value(name, Mechanism:description());
+ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
+ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 60419856..3514e780 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -104,11 +104,12 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(policy_validator) -> rabbit_policy_validator.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(exchange_decorator_route) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index b1100b65..05520170 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
- list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3,
+ list_component/1, list/2, list_formatted/1, lookup/3,
value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -40,12 +40,9 @@
-> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]).
--spec(list_strict/1 :: (binary() | '_')
- -> [rabbit_types:infos()] | 'not_found').
+-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]).
-spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_')
-> [rabbit_types:infos()]).
--spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_')
- -> [rabbit_types:infos()] | 'not_found').
-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
-> rabbit_types:infos() | 'not_found').
@@ -139,21 +136,14 @@ list() ->
[p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
-list(VHost) -> list(VHost, '_', []).
-list_strict(Component) -> list('_', Component, not_found).
-list(VHost, Component) -> list(VHost, Component, []).
-list_strict(VHost, Component) -> list(VHost, Component, not_found).
-
-list(VHost, Component, Default) ->
- case component_good(Component) of
- true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
- _ = '_'},
- [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
- mnesia:dirty_match_object(?TABLE, Match),
- Comp =/= <<"policy">> orelse
- Component =:= <<"policy">>];
- _ -> Default
- end.
+list(VHost) -> list(VHost, '_').
+list_component(Component) -> list('_', Component).
+
+list(VHost, Component) ->
+ Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'},
+ [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <-
+ mnesia:dirty_match_object(?TABLE, Match),
+ Comp =/= <<"policy">> orelse Component =:= <<"policy">>].
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
@@ -208,12 +198,6 @@ info_keys() -> [component, name, value].
%%---------------------------------------------------------------------------
-component_good('_') -> true;
-component_good(Component) -> case lookup_component(Component) of
- {ok, _} -> true;
- _ -> false
- end.
-
lookup_component(Component) ->
case rabbit_registry:lookup_module(
runtime_parameter, list_to_atom(binary_to_list(Component))) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 27807b62..1188c554 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1094,6 +1094,7 @@ test_policy_validation() ->
{error_string, _} = SetPol("testpos", [-1, 0, 1]),
{error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 0248f878..2725be31 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -55,21 +55,30 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
inet_db:register_socket(Sock, Mod),
%% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain(),
+ case tune_buffer_size(Sock) of
+ ok -> file_handle_cache:transfer(
+ apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain();
+ {error, enotconn} -> catch port_close(Sock);
+ {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock),
+ error_logger:error_msg(
+ "failed to tune buffer size of "
+ "connection accepted on ~s:~p - ~p (~s)~n",
+ [rabbit_misc:ntoab(IPAddress), Port,
+ Err, rabbit_misc:format_inet_error(Err)]),
+ catch port_close(Sock)
+ end,
%% accept more
accept(State);
-handle_info({inet_async, LSock, Ref, {error, closed}},
- State=#state{sock=LSock, ref=Ref}) ->
- %% It would be wrong to attempt to restart the acceptor when we
- %% know this will fail.
- {stop, normal, State};
-
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {stop, {accept_failed, Reason}, State};
+ case Reason of
+ closed -> {stop, normal, State}; %% listening socket closed
+ econnaborted -> accept(State); %% client sent RST before we accepted
+ _ -> {stop, {accept_failed, Reason}, State}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -87,3 +96,10 @@ accept(State = #state{sock=LSock}) ->
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.
+
+tune_buffer_size(Sock) ->
+ case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of
+ {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
+ inet:setopts(Sock, [{buffer, BufSz}]);
+ Error -> Error
+ end.