diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-21 10:42:41 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-21 10:42:41 +0000 |
commit | e222e21a4a18a674a82a90f8e23acbfb42d2fc02 (patch) | |
tree | 4e29c1d677aa3819d606c94072c2d363b9872ddb | |
parent | 3a9022f793632b7ab779cc0dedfe1083f6b85cd9 (diff) | |
parent | 904c58e4fcce4938c6a8725f484e2b78813c6986 (diff) | |
download | rabbitmq-server-bug23378.tar.gz |
Merge in defaultbug23378
32 files changed, 614 insertions, 170 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index ad961a44..339fa69e 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -44,6 +44,7 @@ {log_levels, [{connection, info}]}, {ssl_cert_login_from, distinguished_name}, {reverse_dns_lookups, false}, + {cluster_partition_handling, ignore}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/packaging/debs/apt-repository/distributions b/packaging/debs/apt-repository/distributions index 183eb034..75b9fe46 100644 --- a/packaging/debs/apt-repository/distributions +++ b/packaging/debs/apt-repository/distributions @@ -2,6 +2,6 @@ Origin: RabbitMQ Label: RabbitMQ Repository for Debian / Ubuntu etc Suite: testing Codename: kitten -Architectures: arm hppa ia64 mips mipsel s390 sparc i386 amd64 powerpc source +Architectures: AVR32 alpha amd64 arm armel armhf hppa hurd-i386 i386 ia64 kfreebsd-amd64 kfreebsd-i386 m32 m68k mips mipsel netbsd-alpha netbsd-i386 powerpc s390 s390x sh sparc source Components: main Description: RabbitMQ Repository for Debian / Ubuntu etc diff --git a/packaging/standalone/Makefile b/packaging/standalone/Makefile new file mode 100644 index 00000000..89ccde93 --- /dev/null +++ b/packaging/standalone/Makefile @@ -0,0 +1,82 @@ +VERSION=0.0.0 +SOURCE_DIR=rabbitmq-server-$(VERSION) +TARGET_DIR=rabbitmq_server-$(VERSION) +TARGET_TARBALL=rabbitmq-server-$(OS)-standalone-$(VERSION) +RLS_DIR=$(TARGET_DIR)/release/$(TARGET_DIR) + +ERTS_VSN=$(shell erl -noshell -eval 'io:format("~s", [erlang:system_info(version)]), halt().') +ERTS_ROOT_DIR=$(shell erl -noshell -eval 'io:format("~s", [code:root_dir()]), halt().') + +# used to generate the erlang release +RABBITMQ_HOME=$(TARGET_DIR) +RABBITMQ_EBIN_ROOT=$(RABBITMQ_HOME)/ebin +RABBITMQ_PLUGINS_DIR=$(RABBITMQ_HOME)/plugins +RABBITMQ_PLUGINS_EXPAND_DIR=$(RABBITMQ_PLUGINS_DIR)/expand + +RABBITMQ_DEFAULTS=$(TARGET_DIR)/sbin/rabbitmq-defaults +fix_defaults = sed -e $(1) $(RABBITMQ_DEFAULTS) > $(RABBITMQ_DEFAULTS).tmp \ + && mv $(RABBITMQ_DEFAULTS).tmp $(RABBITMQ_DEFAULTS) + +dist: + tar -zxf ../../dist/$(SOURCE_DIR).tar.gz + + $(MAKE) -C $(SOURCE_DIR) \ + TARGET_DIR=`pwd`/$(TARGET_DIR) \ + SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ + MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ + install + +## Here we set the RABBITMQ_HOME variable, +## then we make ERL_DIR point to our released erl +## and we add the paths to our released start_clean and start_sasl boot scripts + $(call fix_defaults,'s:^SYS_PREFIX=$$:SYS_PREFIX=\$${RABBITMQ_HOME}:') + $(call fix_defaults,'s:^ERL_DIR=$$:ERL_DIR=\$${RABBITMQ_HOME}/erts-$(ERTS_VSN)/bin/:') + $(call fix_defaults,'s:start_clean$$:"\$${SYS_PREFIX}/releases/$(VERSION)/start_clean":') + $(call fix_defaults,'s:start_sasl:"\$${SYS_PREFIX}/releases/$(VERSION)/start_sasl":') + + chmod 0755 $(RABBITMQ_DEFAULTS) + + mkdir -p $(TARGET_DIR)/etc/rabbitmq + + $(MAKE) generate_release + + mkdir -p $(RLS_DIR) + tar -C $(RLS_DIR) -xzf $(RABBITMQ_HOME)/rabbit.tar.gz + +# add minimal boot file + cp $(ERTS_ROOT_DIR)/bin/start_clean.boot $(RLS_DIR)/releases/$(VERSION) + cp $(ERTS_ROOT_DIR)/bin/start_sasl.boot $(RLS_DIR)/releases/$(VERSION) + +# move rabbitmq files to top level folder + mv $(RLS_DIR)/lib/rabbit-$(VERSION)/* $(RLS_DIR) + +# remove empty lib/rabbit-$(VERSION) folder + rm -rf $(RLS_DIR)/lib/rabbit-$(VERSION) + +# fix Erlang ROOTDIR + patch -o $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl $(RLS_DIR)/erts-$(ERTS_VSN)/bin/erl.src < erl.diff + + tar -zcf $(TARGET_TARBALL).tar.gz -C $(TARGET_DIR)/release $(TARGET_DIR) + rm -rf $(SOURCE_DIR) $(TARGET_DIR) + +clean: clean_partial + rm -f rabbitmq-server-$(OS)-standalone-*.tar.gz + +clean_partial: + rm -rf $(SOURCE_DIR) + rm -rf $(TARGET_DIR) + +.PHONY : generate_release +generate_release: + erlc \ + -I $(TARGET_DIR)/include/ -o src -Wall \ + -v +debug_info -Duse_specs -Duse_proper_qc \ + -pa $(TARGET_DIR)/ebin/ src/rabbit_release.erl + erl \ + -pa "$(RABBITMQ_EBIN_ROOT)" \ + -pa src \ + -noinput \ + -hidden \ + -s rabbit_release \ + -extra "$(RABBITMQ_PLUGINS_DIR)" "$(RABBITMQ_PLUGINS_EXPAND_DIR)" "$(RABBITMQ_HOME)" + rm src/rabbit_release.beam diff --git a/packaging/standalone/erl.diff b/packaging/standalone/erl.diff new file mode 100644 index 00000000..c51bfe22 --- /dev/null +++ b/packaging/standalone/erl.diff @@ -0,0 +1,5 @@ +20c20,21 +< ROOTDIR="%FINAL_ROOTDIR%" +--- +> realpath() { [[ $1 = /* ]] && echo "$1" || echo "$PWD/${1#./}" ; } +> ROOTDIR="$(dirname `realpath $0`)/../.." diff --git a/packaging/standalone/src/rabbit_release.erl b/packaging/standalone/src/rabbit_release.erl new file mode 100644 index 00000000..26f36d68 --- /dev/null +++ b/packaging/standalone/src/rabbit_release.erl @@ -0,0 +1,152 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% +-module(rabbit_release). + +-export([start/0]). + +-include("rabbit.hrl"). + +-define(BaseApps, [rabbit]). +-define(ERROR_CODE, 1). + +%% We need to calculate all the ERTS apps we need to ship with a +%% standalone rabbit. To acomplish that we need to unpack and load the plugins +%% apps that are shiped with rabbit. +%% Once we get that we generate an erlang release inside a tarball. +%% Our make file will work with that release to generate our final rabbitmq +%% package. +start() -> + %% Determine our various directories + [PluginsDistDir, UnpackedPluginDir, RabbitHome] = + init:get_plain_arguments(), + RootName = UnpackedPluginDir ++ "/rabbit", + + %% extract the plugins so we can load their apps later + prepare_plugins(PluginsDistDir, UnpackedPluginDir), + + %% add the plugin ebin folder to the code path. + add_plugins_to_path(UnpackedPluginDir), + + PluginAppNames = [P#plugin.name || + P <- rabbit_plugins:list(PluginsDistDir)], + + %% Build the entire set of dependencies - this will load the + %% applications along the way + AllApps = case catch sets:to_list(expand_dependencies(PluginAppNames)) of + {failed_to_load_app, App, Err} -> + terminate("failed to load application ~s:~n~p", + [App, Err]); + AppList -> + AppList + end, + + %% we need a list of ERTS apps we need to ship with rabbit + BaseApps = AllApps -- PluginAppNames, + + AppVersions = [determine_version(App) || App <- BaseApps], + RabbitVersion = proplists:get_value(rabbit, AppVersions), + + %% Build the overall release descriptor + RDesc = {release, + {"rabbit", RabbitVersion}, + {erts, erlang:system_info(version)}, + AppVersions}, + + %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel + rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), + + %% Compile the script + systools:make_script(RootName), + systools:script2boot(RootName), + %% Make release tarfile + make_tar(RootName, RabbitHome), + rabbit_misc:quit(0). + +make_tar(Release, RabbitHome) -> + systools:make_tar(Release, + [ + {dirs, [docs, etc, include, plugins, sbin, share]}, + {erts, code:root_dir()}, + {outdir, RabbitHome} + ]). + +determine_version(App) -> + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), + {App, Vsn}. + +delete_recursively(Fn) -> + case rabbit_file:recursive_delete([Fn]) of + ok -> ok; + {error, {Path, E}} -> {error, {cannot_delete, Path, E}}; + Error -> Error + end. + +prepare_plugins(PluginsDistDir, DestDir) -> + %% Eliminate the contents of the destination directory + case delete_recursively(DestDir) of + ok -> ok; + {error, E} -> terminate("Could not delete dir ~s (~p)", [DestDir, E]) + end, + case filelib:ensure_dir(DestDir ++ "/") of + ok -> ok; + {error, E2} -> terminate("Could not create dir ~s (~p)", [DestDir, E2]) + end, + + [prepare_plugin(Plugin, DestDir) || + Plugin <- rabbit_plugins:list(PluginsDistDir)]. + +prepare_plugin(#plugin{type = ez, location = Location}, PluginDestDir) -> + zip:unzip(Location, [{cwd, PluginDestDir}]); +prepare_plugin(#plugin{type = dir, name = Name, location = Location}, + PluginsDestDir) -> + rabbit_file:recursive_copy(Location, + filename:join([PluginsDestDir, Name])). + +expand_dependencies(Pending) -> + expand_dependencies(sets:new(), Pending). +expand_dependencies(Current, []) -> + Current; +expand_dependencies(Current, [Next|Rest]) -> + case sets:is_element(Next, Current) of + true -> + expand_dependencies(Current, Rest); + false -> + case application:load(Next) of + ok -> + ok; + {error, {already_loaded, _}} -> + ok; + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) + end, + {ok, Required} = application:get_key(Next, applications), + Unique = [A || A <- Required, not(sets:is_element(A, Current))], + expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) + end. + +add_plugins_to_path(PluginDir) -> + [add_plugin_to_path(PluginName) || + PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. + +add_plugin_to_path(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN). + +terminate(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + rabbit_misc:quit(?ERROR_CODE). diff --git a/scripts/rabbitmq-defaults b/scripts/rabbitmq-defaults index db1d4f2b..83c5639d 100644 --- a/scripts/rabbitmq-defaults +++ b/scripts/rabbitmq-defaults @@ -18,6 +18,12 @@ ### next line potentially updated in package install steps SYS_PREFIX= +### next line will be updated when generating a standalone release +ERL_DIR= + +CLEAN_BOOT_FILE=start_clean +SASL_BOOT_FILE=start_sasl + ## Set default values CONFIG_FILE=${SYS_PREFIX}/etc/rabbitmq/rabbitmq diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 43f450c0..c043c90a 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -26,11 +26,12 @@ ##--- End of overridden <var_name> variables -exec erl \ +exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ -sname rabbitmq-plugins$$ \ + -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 184ae931..161ec2e6 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -82,7 +82,8 @@ case "$(uname -s)" in esac RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" -if ! erl -pa "$RABBITMQ_EBIN_ROOT" \ +if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ + -boot "${CLEAN_BOOT_FILE}" \ -noinput \ -hidden \ -s rabbit_prelaunch \ @@ -103,11 +104,11 @@ RABBITMQ_LISTEN_ARG= # there is no other way of preventing their expansion. set -f -exec erl \ +exec ${ERL_DIR}erl \ -pa ${RABBITMQ_EBIN_ROOT} \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ - -boot start_sasl \ + -boot "${SASL_BOOT_FILE}" \ ${RABBITMQ_CONFIG_ARG} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 00fffa9f..0368db3f 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -26,12 +26,13 @@ ##--- End of overridden <var_name> variables -exec erl \ +exec ${ERL_DIR}erl \ -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ + -boot "${CLEAN_BOOT_FILE}" \ -s rabbit_control_main \ -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/src/rabbit.erl b/src/rabbit.erl index f3ba022a..3cfa21ba 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -236,7 +236,7 @@ {memory, any()}]). -spec(is_running/0 :: () -> boolean()). -spec(is_running/1 :: (node()) -> boolean()). --spec(environment/0 :: () -> [{param() | term()}]). +-spec(environment/0 :: () -> [{param(), term()}]). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). -spec(force_event_refresh/0 :: () -> 'ok'). diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 362b11aa..6d24d130 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -208,11 +208,19 @@ internal_register(Pid, {M, F, A} = AlertMFA, State#alarms{alertees = NewAlertees}. handle_set_alarm({{resource_limit, Source, Node}, []}, State) -> - rabbit_log:warning("~s resource limit alarm set on node ~p~n", - [Source, Node]), + rabbit_log:warning( + "~s resource limit alarm set on node ~p.~n~n" + "**********************************************************~n" + "*** Publishers will be blocked until this alarm clears ***~n" + "**********************************************************~n", + [Source, Node]), {ok, maybe_alert(fun dict:append/3, Node, Source, State)}; handle_set_alarm({file_descriptor_limit, []}, State) -> - rabbit_log:warning("file descriptor limit alarm set~n"), + rabbit_log:warning( + "file descriptor limit alarm set.~n~n" + "********************************************************************~n" + "*** New connections will not be accepted until this alarm clears ***~n" + "********************************************************************~n"), {ok, State}; handle_set_alarm(Alarm, State) -> rabbit_log:warning("alarm '~p' set~n", [Alarm]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ae7fe5c5..82ac74fa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -407,7 +407,8 @@ args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, - {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}]. + {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, + {<<"x-max-length">>, fun check_max_length_arg/2}]. check_string_arg({longstr, _}, _Args) -> ok; check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. @@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_max_length_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_negative, Val}}; + Error -> Error + end. + check_expires_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1d332f67..0d5fd304 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,6 +55,7 @@ queue_monitors, dlx, dlx_routing_key, + max_length, status }). @@ -244,7 +245,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -256,6 +258,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -559,27 +563,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - IsEmpty = BQ:is_empty(BQS), BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - State3 = State2#q{backing_queue_state = BQS1}, + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so - %% we only do that IFF the new message ends up at the head - %% of the queue (because the queue was empty) and has an - %% expiry. Only then may it need expiring straight away, - %% or, if expiry is not due yet, the expiry timer may need - %% (re)scheduling. - case {IsEmpty, Props#message_properties.expiry} of - {false, _} -> State3; - {true, undefined} -> State3; - {true, _} -> drop_expired_msgs(State3) + %% we only do that if a new message that might have an + %% expiry ends up at the head of the queue. If the head + %% remains unchanged, or if the newly published message + %% has no expiry and becomes the head of the queue then + %% the call is unnecessary. + case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. +maybe_drop_head(State = #q{max_length = undefined}) -> + {0, State}; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) - MaxLen of + Excess when Excess > 0 -> + {Excess, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, + fun () -> + {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> + BQ:drop(false, BQS0) + end, {ok, BQS}, + lists:seq(1, Excess)), + State#q{backing_queue_state = BQS1} + end)}; + _ -> {0, State} + end. + requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})). + {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), + run_message_queue(drop_expired_msgs(State1)). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -763,6 +790,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + lists:foldl(fun (_, {ok, Acc0, BQS0}) -> + {{Msg, _, AckTag}, BQS1} = + BQ:fetch(true, BQS0), + {ok, DLFun(Msg, AckTag, Acc0), BQS1} + end, {ok, Acc, BQS}, lists:seq(1, Excess)) + end, maxlen, X, State), + State1. + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 6096e07b..cb86e5ae 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -40,8 +40,11 @@ [{'not_found', (rabbit_types:binding_source() | rabbit_types:binding_destination())} | {'absent', rabbit_types:amqqueue()}]})). + -type(bind_ok_or_error() :: 'ok' | bind_errors() | - rabbit_types:error('binding_not_found')). + rabbit_types:error( + 'binding_not_found' | + {'binding_invalid', string(), [any()]})). -type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())). -type(inner_fun() :: fun((rabbit_types:exchange(), @@ -157,15 +160,22 @@ add(Binding, InnerFun) -> binding_action( Binding, fun (Src, Dst, B) -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - case InnerFun(Src, Dst) of - ok -> case mnesia:read({rabbit_route, B}) of - [] -> add(Src, Dst, B); - [_] -> fun rabbit_misc:const_ok/0 - end; - {error, _} = Err -> rabbit_misc:const(Err) + case rabbit_exchange:validate_binding(Src, B) of + ok -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + case InnerFun(Src, Dst) of + ok -> + case mnesia:read({rabbit_route, B}) of + [] -> add(Src, Dst, B); + [_] -> fun rabbit_misc:const_ok/0 + end; + {error, _} = Err -> + rabbit_misc:const(Err) + end; + {error, _} = Err -> + rabbit_misc:const(Err) end end). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7b185568..bf3a9fd7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1196,6 +1196,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(DestinationName)]); + {error, {binding_invalid, Fmt, Args}} -> + rabbit_misc:protocol_error(precondition_failed, Fmt, Args); {error, #amqp_error{} = Error} -> rabbit_misc:protocol_error(Error); ok -> return_ok(State, NoWait, ReturnMethod) diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index b396b289..3bb163a1 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -31,6 +31,7 @@ -record(state, {dir, limit, + actual, timeout, timer, alarmed @@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) -> {ok, cancel} = timer:cancel(State#state.timer), {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; -handle_call(get_disk_free, _From, State = #state { dir = Dir }) -> - {reply, get_disk_free(Dir), State}; +handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> + {reply, Actual, State}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit, _ -> ok end, - State #state {alarmed = NewAlarmed}. + State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}. get_disk_free(Dir) -> get_disk_free(Dir, os:type()). diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index 3efc9c0c..eb6247e0 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -76,6 +76,9 @@ init_file(File, PrevHandler) -> Error -> Error end. +%% filter out "application: foo; exited: stopped; type: temporary" +handle_event({info_report, _, {_, std_info, _}}, State) -> + {ok, State}; handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 88033f77..5f4fb9ec 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -22,7 +22,7 @@ assert_equivalence/6, assert_args_equivalence/2, check_type/1, lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3, info_keys/0, info/1, info/2, info_all/1, info_all/2, - route/2, delete/2]). + route/2, delete/2, validate_binding/2]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]). @@ -83,6 +83,9 @@ (name(), boolean())-> 'ok' | rabbit_types:error('not_found') | rabbit_types:error('in_use')). +-spec(validate_binding/2 :: + (rabbit_types:exchange(), rabbit_types:binding()) + -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})). -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). @@ -117,14 +120,18 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) -> is_atom(Serial0) -> fun (_Bool) -> Serial0 end end, [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) || - M <- decorators()], + M <- registry_lookup(exchange_decorator)], Module = type_to_module(XType), apply(Module, Fun, [Serial(Module:serialise_events()) | Args]). -policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]). +policy_changed(X = #exchange{type = XType}, X1) -> + [ok = M:policy_changed(X, X1) || + M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]], + ok. serialise_events(X = #exchange{type = Type}) -> - lists:any(fun (M) -> M:serialise_events(X) end, decorators()) + lists:any(fun (M) -> M:serialise_events(X) end, + registry_lookup(exchange_decorator)) orelse (type_to_module(Type)):serialise_events(). serial(#exchange{name = XName} = X) -> @@ -136,8 +143,15 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. -decorators() -> - [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)]. +registry_lookup(exchange_decorator_route = Class) -> + case get(exchange_decorator_route_modules) of + undefined -> Mods = [M || {_, M} <- rabbit_registry:lookup_all(Class)], + put(exchange_decorator_route_modules, Mods), + Mods; + Mods -> Mods + end; +registry_lookup(Class) -> + [M || {_, M} <- rabbit_registry:lookup_all(Class)]. declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = rabbit_policy:set(#exchange{name = XName, @@ -304,16 +318,26 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -%% Optimisation -route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, - #delivery{message = #basic_message{routing_keys = RKs}}) -> - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; +route(#exchange{name = #resource{virtual_host = VHost, + name = RName} = XName} = X, + #delivery{message = #basic_message{routing_keys = RKs}} = Delivery) -> + case {registry_lookup(exchange_decorator_route), RName == <<"">>} of + {[], true} -> + %% Optimisation + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + {Decorators, _} -> + QNames = route1(Delivery, {[X], XName, []}), + lists:usort(decorate_route(Decorators, X, Delivery, QNames)) + end. -route(X = #exchange{name = XName}, Delivery) -> - route1(Delivery, {[X], XName, []}). +decorate_route([], _X, _Delivery, QNames) -> + QNames; +decorate_route(Decorators, X, Delivery, QNames) -> + QNames ++ + lists:append([Decorator:route(X, Delivery) || Decorator <- Decorators]). route1(_, {[], _, QNames}) -> - lists:usort(QNames); + QNames; route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) -> DstNames = process_alternate( X, ((type_to_module(Type)):route(X, Delivery))), @@ -381,6 +405,10 @@ delete(XName, IfUnused) -> end end). +validate_binding(X = #exchange{type = XType}, Binding) -> + Module = type_to_module(XType), + Module:validate_binding(X, Binding). + maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; maybe_auto_delete(#exchange{auto_delete = true} = X) -> @@ -422,8 +450,7 @@ peek_serial(XName, LockType) -> end. invalid_module(T) -> - rabbit_log:warning( - "Could not find exchange type ~s.~n", [T]), + rabbit_log:warning("Could not find exchange type ~s.~n", [T]), put({xtype_to_module, T}, rabbit_exchange_type_invalid), rabbit_exchange_type_invalid. diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index befbc462..8f17adfc 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -21,9 +21,8 @@ %% 1) It applies to all exchanges as soon as it is installed, therefore %% 2) It is not allowed to affect validation, so no validate/1 or %% assert_args_equivalence/2 -%% 3) It also can't affect routing %% -%% It's possible in the future we might relax 3), or even make these +%% It's possible in the future we might make decorators %% able to manipulate messages as they are published. -ifdef(use_specs). @@ -46,6 +45,10 @@ -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. +%% called when the policy attached to this exchange changes. +-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> + 'ok'. + %% called after a binding has been added or recovered -callback add_binding(serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'. @@ -54,9 +57,12 @@ -callback remove_bindings(serial(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. -%% called when the policy attached to this exchange changes. --callback policy_changed ( - serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'. +%% called after exchange routing +%% return value is a list of queues to be added to the list of +%% destination queues. decorators must register separately for +%% this callback using exchange_decorator_route. +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + [rabbit_amqqueue:name()]. -else. @@ -64,7 +70,7 @@ behaviour_info(callbacks) -> [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3}, - {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}]; + {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}, {route, 2}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 1fbcb2d8..ebc59501 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -37,6 +37,10 @@ %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} -callback validate(rabbit_types:exchange()) -> 'ok'. +%% called BEFORE declaration, to check args etc +-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) -> + rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}). + %% called after declaration and recovery -callback create(tx(), rabbit_types:exchange()) -> 'ok'. @@ -44,6 +48,10 @@ -callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'. +%% called when the policy attached to this exchange changes. +-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) -> + 'ok'. + %% called after a binding has been added or recovered -callback add_binding(serial(), rabbit_types:exchange(), rabbit_types:binding()) -> 'ok'. @@ -58,18 +66,15 @@ rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit(). -%% called when the policy attached to this exchange changes. --callback policy_changed(serial(), rabbit_types:exchange(), - rabbit_types:exchange()) -> 'ok'. - -else. -export([behaviour_info/1]). behaviour_info(callbacks) -> - [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, + [{description, 0}, {serialise_events, 0}, {route, 2}, + {validate, 1}, {validate_binding, 2}, {policy_changed, 2}, {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, - {assert_args_equivalence, 2}, {policy_changed, 3}]; + {assert_args_equivalence, 2}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 213b24c4..10a79c55 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,8 +20,9 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, - add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, @@ -40,9 +41,10 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 5b17ed56..3ebd8548 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -39,9 +40,10 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 75899160..cf2d3140 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -50,14 +51,24 @@ route(#exchange{name = Name}, rabbit_router:match_bindings( Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end). -default_headers_match_kind() -> all. +validate_binding(_X, #binding{args = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-match">>) of + {longstr, <<"all">>} -> ok; + {longstr, <<"any">>} -> ok; + {longstr, Other} -> {error, + {binding_invalid, + "Invalid x-match field value ~p; " + "expected all or any", [Other]}}; + {Type, Other} -> {error, + {binding_invalid, + "Invalid x-match field type ~p (value ~p); " + "expected longstr", [Type, Other]}}; + undefined -> {error, + {binding_invalid, "x-match field missing", []}} + end. parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). +parse_x_match(<<"any">>) -> any. %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort @@ -68,17 +79,9 @@ parse_x_match(Other) -> %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). +headers_match(Args, Data) -> + {longstr, MK} = rabbit_misc:table_lookup(Args, <<"x-match">>), + headers_match(Args, Data, true, false, parse_x_match(MK)). headers_match([], _Data, AllMatch, _AnyMatch, all) -> AllMatch; @@ -115,7 +118,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], validate(_X) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 6b07351a..07a8004a 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -20,8 +20,9 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, - add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, + remove_bindings/3, assert_args_equivalence/2]). description() -> [{description, @@ -41,9 +42,10 @@ route(#exchange{name = Name, type = Type}, _) -> [rabbit_misc:rs(Name), Type]). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index bd8ad1ac..ce76ccb0 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3, +-export([validate/1, validate_binding/2, + create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). -rabbit_boot_step({?MODULE, @@ -47,6 +48,7 @@ route(#exchange{name = X}, end || RKey <- Routes]). validate(_X) -> ok. +validate_binding(_X, _B) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> @@ -57,7 +59,7 @@ delete(transaction, #exchange{name = X}, _Bs) -> delete(none, _Exchange, _Bs) -> ok. -policy_changed(_Tx, _X1, _X2) -> ok. +policy_changed(_X1, _X2) -> ok. add_binding(transaction, _Exchange, Binding) -> internal_add_binding(Binding); diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index b8b03f56..b53c16bf 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -20,7 +20,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1, peercert/1, - tune_buffer_size/1, connection_string/2, socket_ends/2]). + connection_string/2, socket_ends/2]). %%--------------------------------------------------------------------------- @@ -69,7 +69,6 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). --spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()). -spec(connection_string/2 :: (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). -spec(socket_ends/2 :: @@ -189,13 +188,6 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. -tune_buffer_size(Sock) -> - case getopts(Sock, [sndbuf, recbuf, buffer]) of - {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - setopts(Sock, [{buffer, BufSz}]); - Err -> Err - end. - connection_string(Sock, Direction) -> case socket_ends(Sock, Direction) of {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 71c2c80a..de53b7f0 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -24,7 +24,7 @@ write_cluster_status/1, read_cluster_status/0, update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). --export([partitions/0]). +-export([partitions/0, subscribe/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). --record(state, {monitors, partitions}). +-record(state, {monitors, partitions, subscribers}). %%---------------------------------------------------------------------------- @@ -54,6 +54,7 @@ -spec(notify_left_cluster/1 :: (node()) -> 'ok'). -spec(partitions/0 :: () -> {node(), [node()]}). +-spec(subscribe/1 :: (pid()) -> 'ok'). -endif. @@ -179,6 +180,9 @@ notify_left_cluster(Node) -> partitions() -> gen_server:call(?SERVER, partitions, infinity). +subscribe(Pid) -> + gen_server:cast(?SERVER, {subscribe, Pid}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -190,8 +194,9 @@ init([]) -> %% happen. process_flag(trap_exit, true), {ok, _} = mnesia:subscribe(system), - {ok, #state{monitors = pmon:new(), - partitions = []}}. + {ok, #state{monitors = pmon:new(), + subscribers = pmon:new(), + partitions = []}}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> {reply, {node(), Partitions}, State}; @@ -232,23 +237,40 @@ handle_cast({left_cluster, Node}, State) -> write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes), del_node(Node, RunningNodes)}), {noreply, State}; +handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) -> + {noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}}; handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, - State = #state{monitors = Monitors}) -> + State = #state{monitors = Monitors, subscribers = Subscribers}) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}}; + [P ! {node_down, Node} || P <- pmon:monitored(Subscribers)], + {noreply, handle_dead_rabbit_state( + State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})}; + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state{subscribers = Subscribers}) -> + {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, - State = #state{partitions = Partitions}) -> + State = #state{partitions = Partitions, + monitors = Monitors}) -> + %% We will not get a node_up from this node - yet we should treat it as + %% up (mostly). + State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of + true -> State; + false -> State#state{ + monitors = pmon:monitor({rabbit, Node}, Monitors)} + end, + ok = handle_live_rabbit(Node), Partitions1 = ordsets:to_list( ordsets:add_element(Node, ordsets:from_list(Partitions))), - {noreply, State#state{partitions = Partitions1}}; + {noreply, State1#state{partitions = Partitions1}}; handle_info(_Info, State) -> {noreply, State}. @@ -270,7 +292,65 @@ handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), ok = rabbit_alarm:on_node_down(Node), - ok = rabbit_mnesia:on_node_down(Node). + ok = rabbit_mnesia:on_node_down(Node), + case application:get_env(rabbit, cluster_partition_handling) of + {ok, pause_minority} -> + case majority() of + true -> ok; + false -> await_cluster_recovery() + end; + {ok, ignore} -> + ok; + {ok, Term} -> + rabbit_log:warning("cluster_partition_handling ~p unrecognised, " + "assuming 'ignore'~n", [Term]), + ok + end, + ok. + +majority() -> + length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5. + +%% mnesia:system_info(db_nodes) (and hence +%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results +%% when partitioned. +alive_nodes() -> + Nodes = rabbit_mnesia:cluster_nodes(all), + [N || N <- Nodes, pong =:= net_adm:ping(N)]. + +await_cluster_recovery() -> + rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", + []), + Nodes = rabbit_mnesia:cluster_nodes(all), + spawn(fun () -> + %% If our group leader is inside an application we are about + %% to stop, application:stop/1 does not return. + group_leader(whereis(init), self()), + %% Ensure only one restarting process at a time, will + %% exit(badarg) (harmlessly) if one is already running + register(rabbit_restarting_process, self()), + rabbit:stop(), + wait_for_cluster_recovery(Nodes) + end). + +wait_for_cluster_recovery(Nodes) -> + case majority() of + true -> rabbit:start(); + false -> timer:sleep(1000), + wait_for_cluster_recovery(Nodes) + end. + +handle_dead_rabbit_state(State = #state{partitions = Partitions}) -> + %% If we have been partitioned, and we are now in the only remaining + %% partition, we no longer care about partitions - forget them. Note + %% that we do not attempt to deal with individual (other) partitions + %% going away. It's only safe to forget anything about partitions when + %% there are no partitions. + Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of + [] -> []; + _ -> Partitions + end, + State#state{partitions = Partitions1}. handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ab952cd8..61fac0e2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -186,32 +186,37 @@ server_capabilities(_) -> log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args). +socket_error(Reason) -> + log(error, "error on AMQP connection ~p: ~p (~s)~n", + [self(), Reason, rabbit_misc:format_inet_error(Reason)]). + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> case Fun(Sock) of {ok, Res} -> Res; - {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n", - [self(), Reason]), + {error, Reason} -> socket_error(Reason), %% NB: this is tcp socket, even in case of ssl rabbit_net:fast_close(Sock), exit(normal) end. -name(Sock) -> - socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end). - -socket_ends(Sock) -> - socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end). - start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - Name = name(Sock), + Name = case rabbit_net:connection_string(Sock, inbound) of + {ok, Str} -> Str; + {error, enotconn} -> rabbit_net:fast_close(Sock), + exit(normal); + {error, Reason} -> socket_error(Reason), + rabbit_net:fast_close(Sock), + exit(normal) + end, log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]), ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), - {PeerHost, PeerPort, Host, Port} = socket_ends(Sock), + {PeerHost, PeerPort, Host, Port} = + socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), State = #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -245,7 +250,6 @@ start_connection(Parent, ConnSupPid, Collector, StartHeartbeatFun, Deb, last_blocked_by = none, last_blocked_at = never}}, try - ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end), run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -791,7 +795,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, Connection#connection{ client_properties = ClientProperties, capabilities = Capabilities, - auth_mechanism = AuthMechanism, + auth_mechanism = {Mechanism, AuthMechanism}, auth_state = AuthMechanism:init(Sock)}}, auth_phase(Response, State); @@ -918,15 +922,14 @@ auth_mechanisms_binary(Sock) -> auth_phase(Response, State = #v1{connection = Connection = #connection{protocol = Protocol, - auth_mechanism = AuthMechanism, + auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> rabbit_misc:protocol_error( access_refused, "~s login refused: ~s", - [proplists:get_value(name, AuthMechanism:description()), - io_lib:format(Msg, Args)]); + [Name, io_lib:format(Msg, Args)]); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> @@ -986,10 +989,8 @@ ic(vhost, #connection{vhost = VHost}) -> VHost; ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout; ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax; ic(client_properties, #connection{client_properties = CP}) -> CP; -ic(auth_mechanism, #connection{auth_mechanism = none}) -> - none; -ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) -> - proplists:get_value(name, Mechanism:description()); +ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; +ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 60419856..3514e780 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -104,11 +104,12 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(policy_validator) -> rabbit_policy_validator. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(exchange_decorator_route) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index b1100b65..05520170 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -19,7 +19,7 @@ -include("rabbit.hrl"). -export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, - list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3, + list_component/1, list/2, list_formatted/1, lookup/3, value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -40,12 +40,9 @@ -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]). --spec(list_strict/1 :: (binary() | '_') - -> [rabbit_types:infos()] | 'not_found'). +-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]). -spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_') -> [rabbit_types:infos()]). --spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_') - -> [rabbit_types:infos()] | 'not_found'). -spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary()) -> rabbit_types:infos() | 'not_found'). @@ -139,21 +136,14 @@ list() -> [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. -list(VHost) -> list(VHost, '_', []). -list_strict(Component) -> list('_', Component, not_found). -list(VHost, Component) -> list(VHost, Component, []). -list_strict(VHost, Component) -> list(VHost, Component, not_found). - -list(VHost, Component, Default) -> - case component_good(Component) of - true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, - _ = '_'}, - [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- - mnesia:dirty_match_object(?TABLE, Match), - Comp =/= <<"policy">> orelse - Component =:= <<"policy">>]; - _ -> Default - end. +list(VHost) -> list(VHost, '_'). +list_component(Component) -> list('_', Component). + +list(VHost, Component) -> + Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, + [p(P) || #runtime_parameters{key = {_VHost, Comp, _Name}} = P <- + mnesia:dirty_match_object(?TABLE, Match), + Comp =/= <<"policy">> orelse Component =:= <<"policy">>]. list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. @@ -208,12 +198,6 @@ info_keys() -> [component, name, value]. %%--------------------------------------------------------------------------- -component_good('_') -> true; -component_good(Component) -> case lookup_component(Component) of - {ok, _} -> true; - _ -> false - end. - lookup_component(Component) -> case rabbit_registry:lookup_module( runtime_parameter, list_to_atom(binary_to_list(Component))) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 27807b62..1188c554 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1094,6 +1094,7 @@ test_policy_validation() -> {error_string, _} = SetPol("testpos", [-1, 0, 1]), {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + ok = control_action(clear_policy, ["name"]), rabbit_runtime_parameters_test:unregister_policy_validator(), passed. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 0248f878..2725be31 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -55,21 +55,30 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, inet_db:register_socket(Sock, Mod), %% handle - file_handle_cache:transfer(apply(M, F, A ++ [Sock])), - ok = file_handle_cache:obtain(), + case tune_buffer_size(Sock) of + ok -> file_handle_cache:transfer( + apply(M, F, A ++ [Sock])), + ok = file_handle_cache:obtain(); + {error, enotconn} -> catch port_close(Sock); + {error, Err} -> {ok, {IPAddress, Port}} = inet:sockname(LSock), + error_logger:error_msg( + "failed to tune buffer size of " + "connection accepted on ~s:~p - ~p (~s)~n", + [rabbit_misc:ntoab(IPAddress), Port, + Err, rabbit_misc:format_inet_error(Err)]), + catch port_close(Sock) + end, %% accept more accept(State); -handle_info({inet_async, LSock, Ref, {error, closed}}, - State=#state{sock=LSock, ref=Ref}) -> - %% It would be wrong to attempt to restart the acceptor when we - %% know this will fail. - {stop, normal, State}; - handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {stop, {accept_failed, Reason}, State}; + case Reason of + closed -> {stop, normal, State}; %% listening socket closed + econnaborted -> accept(State); %% client sent RST before we accepted + _ -> {stop, {accept_failed, Reason}, State} + end; handle_info(_Info, State) -> {noreply, State}. @@ -87,3 +96,10 @@ accept(State = #state{sock=LSock}) -> {ok, Ref} -> {noreply, State#state{ref=Ref}}; Error -> {stop, {cannot_accept, Error}, State} end. + +tune_buffer_size(Sock) -> + case inet:getopts(Sock, [sndbuf, recbuf, buffer]) of + {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), + inet:setopts(Sock, [{buffer, BufSz}]); + Error -> Error + end. |