diff options
36 files changed, 2233 insertions, 502 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ee000215..4d3065b7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -163,20 +163,28 @@ </varlistentry> <varlistentry> - <term><cmdsynopsis><command>wait</command></cmdsynopsis></term> + <term><cmdsynopsis><command>wait</command> <arg choice="req"><replaceable>pid_file</replaceable></arg></cmdsynopsis></term> <listitem> <para> Wait for the RabbitMQ application to start. </para> <para> This command will wait for the RabbitMQ application to - start at the node. As long as the Erlang node is up but - the RabbitMQ application is down it will wait - indefinitely. If the node itself goes down, or takes - more than five seconds to come up, it will fail. + start at the node. It will wait for the pid file to + be created, then for a process with a pid specified in the + pid file to start, and then for the RabbitMQ application + to start in that process. It will fail if the process + terminates without starting the RabbitMQ application. + </para> + <para> + A suitable pid file is created by + the <command>rabbitmq-server</command> script. By + default this is located in the Mnesia directory. Modify + the <command>RABBITMQ_PID_FILE</command> environment + variable to change the location. </para> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl wait</screen> + <screen role="example">rabbitmqctl wait /var/run/rabbitmq/pid</screen> <para role="example"> This command will return when the RabbitMQ node has started up. @@ -860,6 +868,17 @@ <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> </varlistentry> + <varlistentry> + <term>slave_pids</term> + <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem> + </varlistentry> + <varlistentry> + <term>synchronised_slave_pids</term> + <listitem><para>If the queue is mirrored, this gives the IDs of + the current slaves which are synchronised with the master - + i.e. those which could take over from the master without + message loss.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>queueinfoitem</command>s are specified then queue name and depth are diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index ffc826eb..bdd6c4a1 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -50,6 +50,7 @@ make install TARGET_DIR=%{_maindir} \ mkdir -p %{buildroot}%{_localstatedir}/lib/rabbitmq/mnesia mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq +mkdir -p %{buildroot}%{_localstatedir}/run/rabbitmq #Copy all necessary lib files etc. install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server @@ -111,6 +112,7 @@ done %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq +%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/run/rabbitmq %dir %{_sysconfdir}/rabbitmq %{_initrddir}/rabbitmq-server %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server @@ -120,6 +122,9 @@ done rm -rf %{buildroot} %changelog +* Mon Jun 27 2011 simon@rabbitmq.com 2.5.1-1 +- New Upstream Release + * Thu Jun 9 2011 jerryk@vmware.com 2.5.0-1 - New Upstream Release diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index d8a7a94d..e2815f04 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -24,6 +24,7 @@ DESC=rabbitmq-server USER=rabbitmq ROTATE_SUFFIX= INIT_LOG_DIR=/var/log/rabbitmq +PID_FILE=/var/run/rabbitmq/pid LOCK_FILE= # This is filled in when building packages @@ -40,9 +41,9 @@ start_rabbitmq () { else RETVAL=0 set +e - setsid sh -c "$DAEMON > ${INIT_LOG_DIR}/startup_log \ - 2> ${INIT_LOG_DIR}/startup_err" & - $CONTROL wait >/dev/null 2>&1 + setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \ + ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" & + $CONTROL wait $PID_FILE >/dev/null 2>&1 RETVAL=$? set -e case "$RETVAL" in @@ -53,6 +54,7 @@ start_rabbitmq () { fi ;; *) + rm -f $PID_FILE echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\} RETVAL=1 ;; @@ -68,6 +70,7 @@ stop_rabbitmq () { RETVAL=$? set -e if [ $RETVAL = 0 ] ; then + rm -f $PID_FILE if [ -n "$LOCK_FILE" ] ; then rm -f $LOCK_FILE fi diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index d58c48ed..51e16517 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -29,6 +29,7 @@ ## OCF_RESKEY_log_base ## OCF_RESKEY_mnesia_base ## OCF_RESKEY_server_start_args +## OCF_RESKEY_pid_file ####################################################################### # Initialization: @@ -42,10 +43,12 @@ OCF_RESKEY_server_default="/usr/sbin/rabbitmq-server" OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" OCF_RESKEY_nodename_default="rabbit@localhost" OCF_RESKEY_log_base_default="/var/log/rabbitmq" +OCF_RESKEY_pid_file_default="/var/lib/rabbitmq/pid" : ${OCF_RESKEY_server=${OCF_RESKEY_server_default}} : ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} : ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} : ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} +: ${OCF_RESKEY_pid_file=${OCF_RESKEY_pid_file_default}} meta_data() { cat <<END @@ -133,6 +136,14 @@ Additional arguments provided to the server on startup <content type="string" default="" /> </parameter> +<parameter name="pid_file" unique="0" required="0"> +<longdesc lang="en"> +Location of the file in which the pid will be stored +</longdesc> +<shortdesc lang="en">Pid file path</shortdesc> +<content type="string" default="${OCF_RESKEY_pid_file_default}" /> +</parameter> + </parameters> <actions> @@ -164,6 +175,7 @@ RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args +RABBITMQ_PID_FILE=$OCF_RESKEY_pid_file [ ! -z $RABBITMQ_NODENAME ] && NODENAME_ARG="-n $RABBITMQ_NODENAME" [ ! -z $RABBITMQ_NODENAME ] && export RABBITMQ_NODENAME @@ -174,6 +186,7 @@ export_vars() { [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE [ ! -z $RABBITMQ_SERVER_START_ARGS ] && export RABBITMQ_SERVER_START_ARGS + [ ! -z $RABBITMQ_PID_FILE ] && export RABBITMQ_PID_FILE } rabbit_validate_partial() { @@ -214,13 +227,13 @@ rabbit_status() { } rabbit_wait() { - rabbitmqctl_action "wait" + rabbitmqctl_action "wait" $1 } rabbitmqctl_action() { local rc local action - action=$1 + action=$@ $RABBITMQ_CTL $NODENAME_ARG $action > /dev/null 2> /dev/null rc=$? case "$rc" in @@ -252,9 +265,10 @@ rabbit_start() { # Wait for the server to come up. # Let the CRM/LRM time us out if required - rabbit_wait + rabbit_wait $RABBITMQ_PID_FILE rc=$? if [ "$rc" != $OCF_SUCCESS ]; then + rm -f $RABBITMQ_PID_FILE ocf_log info "rabbitmq-server start failed: $rc" exit $OCF_ERR_GENERIC fi @@ -285,6 +299,7 @@ rabbit_stop() { rabbit_status rc=$? if [ "$rc" = $OCF_NOT_RUNNING ]; then + rm -f $RABBITMQ_PID_FILE stop_wait=0 break elif [ "$rc" != $OCF_SUCCESS ]; then diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 1cab4235..9063a6ed 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.5.1-1) lucid; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Mon, 27 Jun 2011 11:21:49 +0100 + rabbitmq-server (2.5.0-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/dirs b/packaging/debs/Debian/debian/dirs index 625b7d41..5cf167d5 100644 --- a/packaging/debs/Debian/debian/dirs +++ b/packaging/debs/Debian/debian/dirs @@ -4,6 +4,7 @@ usr/sbin usr/share/man var/lib/rabbitmq/mnesia var/log/rabbitmq +var/run/rabbitmq etc/logrotate.d etc/rabbitmq diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index b11340ef..ca531f14 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -32,6 +32,7 @@ fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq chown -R rabbitmq:rabbitmq /var/log/rabbitmq +chown -R rabbitmq:rabbitmq /var/run/rabbitmq case "$1" in configure) diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 2f80eb96..7176d801 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -47,6 +47,7 @@ fi [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} +[ "x" = "x$RABBITMQ_PID_FILE" ] && RABBITMQ_PID_FILE=${RABBITMQ_MNESIA_DIR}.pid [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${PLUGINS_EXPAND_DIR} [ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}-plugins-expand @@ -67,6 +68,9 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' +mkdir -p $(dirname ${RABBITMQ_PID_FILE}) +echo $$ > ${RABBITMQ_PID_FILE} + RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then if erl \ diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 9f6b2317..776ac43a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -44,7 +44,6 @@ %% 4) You can find out what your 'real' offset is, and what your %% 'virtual' offset is (i.e. where the hdl really is, and where it %% would be after the write buffer is written out). -%% 5) You can find out what the offset was when you last sync'd. %% %% There is also a server component which serves to limit the number %% of open file descriptors. This is a hard limit: the server @@ -144,8 +143,8 @@ -export([register_callback/3]). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, - flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). + current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, + set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -172,7 +171,6 @@ -record(handle, { hdl, offset, - trusted_offset, is_dirty, write_buffer_size, write_buffer_size_limit, @@ -240,7 +238,6 @@ -spec(sync/1 :: (ref()) -> ok_or_error()). -spec(position/2 :: (ref(), position()) -> val_or_error(offset())). -spec(truncate/1 :: (ref()) -> ok_or_error()). --spec(last_sync_offset/1 :: (ref()) -> val_or_error(offset())). -spec(current_virtual_offset/1 :: (ref()) -> val_or_error(offset())). -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). -spec(flush/1 :: (ref()) -> ok_or_error()). @@ -365,11 +362,10 @@ sync(Ref) -> [Ref], fun ([#handle { is_dirty = false, write_buffer = [] }]) -> ok; - ([Handle = #handle { hdl = Hdl, offset = Offset, + ([Handle = #handle { hdl = Hdl, is_dirty = true, write_buffer = [] }]) -> case file:sync(Hdl) of - ok -> {ok, [Handle #handle { trusted_offset = Offset, - is_dirty = false }]}; + ok -> {ok, [Handle #handle { is_dirty = false }]}; Error -> {Error, [Handle]} end end). @@ -384,21 +380,13 @@ position(Ref, NewOffset) -> truncate(Ref) -> with_flushed_handles( [Ref], - fun ([Handle1 = #handle { hdl = Hdl, offset = Offset, - trusted_offset = TOffset }]) -> + fun ([Handle1 = #handle { hdl = Hdl }]) -> case file:truncate(Hdl) of - ok -> TOffset1 = lists:min([Offset, TOffset]), - {ok, [Handle1 #handle { trusted_offset = TOffset1, - at_eof = true }]}; + ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end end). -last_sync_offset(Ref) -> - with_handles([Ref], fun ([#handle { trusted_offset = TOffset }]) -> - {ok, TOffset} - end). - current_virtual_offset(Ref) -> with_handles([Ref], fun ([#handle { at_eof = true, is_write = true, offset = Offset, @@ -456,8 +444,7 @@ clear(Ref) -> write_buffer_size = 0 }) of {{ok, 0}, Handle1 = #handle { hdl = Hdl }} -> case file:truncate(Hdl) of - ok -> {ok, [Handle1 #handle {trusted_offset = 0, - at_eof = true }]}; + ok -> {ok, [Handle1 #handle { at_eof = true }]}; Error -> {Error, [Handle1]} end; {{error, _} = Error, Handle1} -> @@ -585,14 +572,13 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, end) of {ok, Hdl} -> Now = now(), - {{ok, Offset1}, Handle1} = + {{ok, _Offset}, Handle1} = maybe_seek(Offset, Handle #handle { hdl = Hdl, offset = 0, last_used_at = Now }), - Handle2 = Handle1 #handle { trusted_offset = Offset1 }, - put({Ref, fhc_handle}, Handle2), + put({Ref, fhc_handle}, Handle1), reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), - [{Ref, Handle2} | RefHdls]); + [{Ref, Handle1} | RefHdls]); Error -> %% NB: none of the handles in ToOpen are in the age tree Oldest = oldest(Tree, fun () -> undefined end), @@ -677,7 +663,6 @@ new_closed_handle(Path, Mode, Options) -> Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, offset = 0, - trusted_offset = 0, is_dirty = false, write_buffer_size = 0, write_buffer_size_limit = WriteBufferSize, @@ -705,7 +690,6 @@ soft_close(Handle = #handle { hdl = closed }) -> soft_close(Handle) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, - offset = Offset, is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of @@ -715,7 +699,6 @@ soft_close(Handle) -> ok = file:close(Hdl), age_tree_delete(Then), {ok, Handle1 #handle { hdl = closed, - trusted_offset = Offset, is_dirty = false, last_used_at = undefined }}; {_Error, _Handle} = Result -> diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl new file mode 100644 index 00000000..8dfe39f8 --- /dev/null +++ b/src/mirrored_supervisor.erl @@ -0,0 +1,542 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(mirrored_supervisor). + +%% Mirrored Supervisor +%% =================== +%% +%% This module implements a new type of supervisor. It acts like a +%% normal supervisor, but at creation time you also provide the name +%% of a process group to join. All the supervisors within the +%% process group act like a single large distributed supervisor: +%% +%% * A process with a given child_id will only exist on one +%% supervisor within the group. +%% +%% * If one supervisor fails, children may migrate to surviving +%% supervisors within the group. +%% +%% In almost all cases you will want to use the module name for the +%% process group. Using multiple process groups with the same module +%% name is supported. Having multiple module names for the same +%% process group will lead to undefined behaviour. +%% +%% Motivation +%% ---------- +%% +%% Sometimes you have processes which: +%% +%% * Only need to exist once per cluster. +%% +%% * Does not contain much state (or can reconstruct its state easily). +%% +%% * Needs to be restarted elsewhere should it be running on a node +%% which fails. +%% +%% By creating a mirrored supervisor group with one supervisor on +%% each node, that's what you get. +%% +%% +%% API use +%% ------- +%% +%% This is basically the same as for supervisor, except that: +%% +%% 1) start_link(Module, Args) becomes +%% start_link(Group, Module, Args). +%% +%% 2) start_link({local, Name}, Module, Args) becomes +%% start_link({local, Name}, Group, Module, Args). +%% +%% 3) start_link({global, Name}, Module, Args) is not available. +%% +%% 4) The restart strategy simple_one_for_one is not available. +%% +%% 5) Mnesia is used to hold global state. At some point your +%% application should invoke create_tables() (or table_definitions() +%% if it wants to manage table creation itself). +%% +%% Internals +%% --------- +%% +%% Each mirrored_supervisor consists of three processes - the overall +%% supervisor, the delegate supervisor and the mirroring server. The +%% overall supervisor supervises the other two processes. Its pid is +%% the one returned from start_link; the pids of the other two +%% processes are effectively hidden in the API. +%% +%% The delegate supervisor is in charge of supervising all the child +%% processes that are added to the supervisor as usual. +%% +%% The mirroring server intercepts calls to the supervisor API +%% (directed at the overall supervisor), does any special handling, +%% and forwards everything to the delegate supervisor. +%% +%% This module implements all three, hence init/1 is somewhat overloaded. +%% +%% The mirroring server creates and joins a process group on +%% startup. It monitors all the existing members of this group, and +%% broadcasts a "hello" message to them so that they can monitor it in +%% turn. When it receives a 'DOWN' message, it checks to see if it's +%% the "first" server in the group and restarts all the child +%% processes from the dead supervisor if so. +%% +%% In the future we might load balance this. +%% +%% Startup is slightly fiddly. The mirroring server needs to know the +%% Pid of the overall supervisor, but we don't have that until it has +%% started. Therefore we set this after the fact. We also start any +%% children we found in Module:init() at this point, since starting +%% children requires knowing the overall supervisor pid. + +-define(SUPERVISOR, supervisor2). +-define(GEN_SERVER, gen_server2). +-define(PG2, pg2_fixed). + +-define(TABLE, mirrored_sup_childspec). +-define(TABLE_DEF, + {?TABLE, + [{record_name, mirrored_sup_childspec}, + {type, ordered_set}, + {attributes, record_info(fields, mirrored_sup_childspec)}]}). +-define(TABLE_MATCH, {match, #mirrored_sup_childspec{ _ = '_' }}). + +-export([start_link/3, start_link/4, + start_child/2, restart_child/2, + delete_child/2, terminate_child/2, + which_children/1, count_children/1, check_childspecs/1]). + +-export([behaviour_info/1]). + +-behaviour(?GEN_SERVER). +-behaviour(?SUPERVISOR). + +-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, + handle_cast/2]). + +-export([start_internal/2]). +-export([create_tables/0, table_definitions/0]). + +-record(mirrored_sup_childspec, {key, mirroring_pid, childspec}). + +-record(state, {overall, + delegate, + group, + initial_childspecs}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type child() :: pid() | 'undefined'. +-type child_id() :: term(). +-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. +-type modules() :: [module()] | 'dynamic'. +-type restart() :: 'permanent' | 'transient' | 'temporary'. +-type shutdown() :: 'brutal_kill' | timeout(). +-type worker() :: 'worker' | 'supervisor'. +-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. +-type sup_ref() :: (Name :: atom()) + | {Name :: atom(), Node :: node()} + | {'global', Name :: atom()} + | pid(). +-type child_spec() :: {Id :: child_id(), + StartFunc :: mfargs(), + Restart :: restart(), + Shutdown :: shutdown(), + Type :: worker(), + Modules :: modules()}. + +-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. + +-type startchild_err() :: 'already_present' + | {'already_started', Child :: child()} | term(). +-type startchild_ret() :: {'ok', Child :: child()} + | {'ok', Child :: child(), Info :: term()} + | {'error', startchild_err()}. + +-type group_name() :: any(). + +-spec start_link(GroupName, Module, Args) -> startlink_ret() when + GroupName :: group_name(), + Module :: module(), + Args :: term(). + +-spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when + SupName :: sup_name(), + GroupName :: group_name(), + Module :: module(), + Args :: term(). + +-spec start_child(SupRef, ChildSpec) -> startchild_ret() when + SupRef :: sup_ref(), + ChildSpec :: child_spec() | (List :: [term()]). + +-spec restart_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: child_id(), + Result :: {'ok', Child :: child()} + | {'ok', Child :: child(), Info :: term()} + | {'error', Error}, + Error :: 'running' | 'not_found' | 'simple_one_for_one' | term(). + +-spec delete_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'running' | 'not_found' | 'simple_one_for_one'. + +-spec terminate_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: pid() | child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'not_found' | 'simple_one_for_one'. + +-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when + SupRef :: sup_ref(), + Id :: child_id() | 'undefined', + Child :: child(), + Type :: worker(), + Modules :: modules(). + +-spec check_childspecs(ChildSpecs) -> Result when + ChildSpecs :: [child_spec()], + Result :: 'ok' | {'error', Error :: term()}. + +-spec start_internal(Group, ChildSpecs) -> Result when + Group :: group_name(), + ChildSpecs :: [child_spec()], + Result :: startlink_ret(). + +-spec create_tables() -> Result when + Result :: 'ok'. + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Group, Mod, Args) -> + start_link0([], Group, init(Mod, Args)). + +start_link({local, SupName}, Group, Mod, Args) -> + start_link0([{local, SupName}], Group, init(Mod, Args)); + +start_link({global, _SupName}, _Group, _Mod, _Args) -> + erlang:error(badarg). + +start_link0(Prefix, Group, Init) -> + case apply(?SUPERVISOR, start_link, + Prefix ++ [?MODULE, {overall, Group, Init}]) of + {ok, Pid} -> call(Pid, {init, Pid}), + {ok, Pid}; + Other -> Other + end. + +init(Mod, Args) -> + case Mod:init(Args) of + {ok, {{Bad, _, _}, _ChildSpecs}} when + Bad =:= simple_one_for_one orelse + Bad =:= simple_one_for_one_terminate -> erlang:error(badarg); + Init -> Init + end. + +start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). +delete_child(Sup, Id) -> find_call(Sup, Id, {delete_child, Id}). +restart_child(Sup, Id) -> find_call(Sup, Id, {msg, restart_child, [Id]}). +terminate_child(Sup, Id) -> find_call(Sup, Id, {msg, terminate_child, [Id]}). +which_children(Sup) -> fold(which_children, Sup, fun lists:append/2). +count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). +check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). + +behaviour_info(callbacks) -> [{init,1}]; +behaviour_info(_Other) -> undefined. + +call(Sup, Msg) -> + ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). + +find_call(Sup, Id, Msg) -> + Group = call(Sup, group), + MatchHead = #mirrored_sup_childspec{mirroring_pid = '$1', + key = {Group, Id}, + _ = '_'}, + %% If we did this inside a tx we could still have failover + %% immediately after the tx - we can't be 100% here. So we may as + %% well dirty_select. + case mnesia:dirty_select(?TABLE, [{MatchHead, [], ['$1']}]) of + [Mirror] -> ?GEN_SERVER:call(Mirror, Msg, infinity); + [] -> {error, not_found} + end. + +fold(FunAtom, Sup, AggFun) -> + Group = call(Sup, group), + lists:foldl(AggFun, [], + [apply(?SUPERVISOR, FunAtom, [D]) || + M <- ?PG2:get_members(Group), + D <- [?GEN_SERVER:call(M, delegate_supervisor, infinity)]]). + +child(Sup, Id) -> + [Pid] = [Pid || {Id1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup), + Id1 =:= Id], + Pid. + +%%---------------------------------------------------------------------------- + +start_internal(Group, ChildSpecs) -> + ?GEN_SERVER:start_link(?MODULE, {mirroring, Group, ChildSpecs}, + [{timeout, infinity}]). + +%%---------------------------------------------------------------------------- + +init({overall, Group, Init}) -> + case Init of + {ok, {Restart, ChildSpecs}} -> + Delegate = {delegate, {?SUPERVISOR, start_link, + [?MODULE, {delegate, Restart}]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, + Mirroring = {mirroring, {?MODULE, start_internal, + [Group, ChildSpecs]}, + permanent, 16#ffffffff, worker, [?MODULE]}, + %% Important: Delegate MUST start before Mirroring so that + %% when we shut down from above it shuts down last, so + %% Mirroring does not see it die. + %% + %% See comment in handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; + ignore -> + ignore + end; + +init({delegate, Restart}) -> + {ok, {Restart, []}}; + +init({mirroring, Group, ChildSpecs}) -> + {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. + +handle_call({init, Overall}, _From, + State = #state{overall = undefined, + delegate = undefined, + group = Group, + initial_childspecs = ChildSpecs}) -> + process_flag(trap_exit, true), + ?PG2:create(Group), + ok = ?PG2:join(Group, self()), + Rest = ?PG2:get_members(Group) -- [self()], + case Rest of + [] -> {atomic, _} = mnesia:transaction(fun() -> delete_all(Group) end); + _ -> ok + end, + [begin + ?GEN_SERVER:cast(Pid, {ensure_monitoring, self()}), + erlang:monitor(process, Pid) + end || Pid <- Rest], + Delegate = child(Overall, delegate), + erlang:monitor(process, Delegate), + [maybe_start(Group, Delegate, S) || S <- ChildSpecs], + {reply, ok, State#state{overall = Overall, delegate = Delegate}}; + +handle_call({start_child, ChildSpec}, _From, + State = #state{delegate = Delegate, + group = Group}) -> + {reply, maybe_start(Group, Delegate, ChildSpec), State}; + +handle_call({delete_child, Id}, _From, State = #state{delegate = Delegate, + group = Group}) -> + {reply, stop(Group, Delegate, Id), State}; + +handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> + {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; + +handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) -> + {reply, Delegate, State}; + +handle_call(group, _From, State = #state{group = Group}) -> + {reply, Group, State}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({ensure_monitoring, Pid}, State) -> + erlang:monitor(process, Pid), + {noreply, State}; + +handle_cast({die, Reason}, State = #state{group = Group}) -> + tell_all_peers_to_die(Group, Reason), + {stop, Reason, State}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{delegate = Pid, group = Group}) -> + %% Since the delegate is temporary, its death won't cause us to + %% die. Since the overall supervisor kills processes in reverse + %% order when shutting down "from above" and we started after the + %% delegate, if we see the delegate die then that means it died + %% "from below" i.e. due to the behaviour of its children, not + %% because the whole app was being torn down. + %% + %% Therefore if we get here we know we need to cause the entire + %% mirrored sup to shut down, not just fail over. + tell_all_peers_to_die(Group, Reason), + {stop, Reason, State}; + +handle_info({'DOWN', _Ref, process, Pid, _Reason}, + State = #state{delegate = Delegate, group = Group}) -> + %% TODO load balance this + %% No guarantee pg2 will have received the DOWN before us. + Self = self(), + case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = + mnesia:transaction(fun() -> update_all(Pid) end), + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> ok + end, + {noreply, State}; + +handle_info(Info, State) -> + {stop, {unexpected_info, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +tell_all_peers_to_die(Group, Reason) -> + [?GEN_SERVER:cast(P, {die, Reason}) || + P <- ?PG2:get_members(Group) -- [self()]]. + +maybe_start(Group, Delegate, ChildSpec) -> + case mnesia:transaction(fun() -> + check_start(Group, Delegate, ChildSpec) + end) of + {atomic, start} -> start(Delegate, ChildSpec); + {atomic, undefined} -> {error, already_present}; + {atomic, Pid} -> {error, {already_started, Pid}}; + %% If we are torn down while in the transaction... + {aborted, E} -> {error, E} + end. + +check_start(Group, Delegate, ChildSpec) -> + case mnesia:wread({?TABLE, {Group, id(ChildSpec)}}) of + [] -> write(Group, ChildSpec), + start; + [S] -> #mirrored_sup_childspec{key = {Group, Id}, + mirroring_pid = Pid} = S, + case self() of + Pid -> child(Delegate, Id); + _ -> case supervisor(Pid) of + dead -> write(Group, ChildSpec), + start; + Delegate0 -> child(Delegate0, Id) + end + end + end. + +supervisor(Pid) -> + with_exit_handler( + fun() -> dead end, + fun() -> gen_server:call(Pid, delegate_supervisor, infinity) end). + +write(Group, ChildSpec) -> + ok = mnesia:write( + #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, + mirroring_pid = self(), + childspec = ChildSpec}), + ChildSpec. + +delete(Group, Id) -> + ok = mnesia:delete({?TABLE, {Group, Id}}). + +start(Delegate, ChildSpec) -> + apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). + +stop(Group, Delegate, Id) -> + case mnesia:transaction(fun() -> check_stop(Group, Delegate, Id) end) of + {atomic, deleted} -> apply(?SUPERVISOR, delete_child, [Delegate, Id]); + {atomic, running} -> {error, running}; + {aborted, E} -> {error, E} + end. + +check_stop(Group, Delegate, Id) -> + case child(Delegate, Id) of + undefined -> delete(Group, Id), + deleted; + _ -> running + end. + +id({Id, _, _, _, _, _}) -> Id. + +update_all(OldPid) -> + MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid, + key = '$1', + childspec = '$2', + _ = '_'}, + [write(Group, C) || + [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]. + +delete_all(Group) -> + MatchHead = #mirrored_sup_childspec{key = {Group, '_'}, + childspec = '$1', + _ = '_'}, + [delete(Group, id(C)) || + C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. + +%%---------------------------------------------------------------------------- + +create_tables() -> + create_tables([?TABLE_DEF]). + +create_tables([]) -> + ok; +create_tables([{Table, Attributes} | Ts]) -> + case mnesia:create_table(Table, Attributes) of + {atomic, ok} -> create_tables(Ts); + {aborted, {already_exists, ?TABLE}} -> create_tables(Ts); + Err -> Err + end. + +table_definitions() -> + {Name, Attributes} = ?TABLE_DEF, + [{Name, [?TABLE_MATCH | Attributes]}]. + +%%---------------------------------------------------------------------------- + +with_exit_handler(Handler, Thunk) -> + try + Thunk() + catch + exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> + Handler(); + exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown -> + Handler() + end. + +add_proplists(P1, P2) -> + add_proplists(lists:keysort(1, P1), lists:keysort(1, P2), []). +add_proplists([], P2, Acc) -> P2 ++ Acc; +add_proplists(P1, [], Acc) -> P1 ++ Acc; +add_proplists([{K, V1} | P1], [{K, V2} | P2], Acc) -> + add_proplists(P1, P2, [{K, V1 + V2} | Acc]); +add_proplists([{K1, _} = KV | P1], [{K2, _} | _] = P2, Acc) when K1 < K2 -> + add_proplists(P1, P2, [KV | Acc]); +add_proplists(P1, [KV | P2], Acc) -> + add_proplists(P1, P2, [KV | Acc]). diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl new file mode 100644 index 00000000..ee9c7593 --- /dev/null +++ b/src/mirrored_supervisor_tests.erl @@ -0,0 +1,309 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011 VMware, Inc. All rights reserved. +%% + +-module(mirrored_supervisor_tests). + +-compile([export_all]). + +-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, + handle_cast/2]). + +-behaviour(gen_server). +-behaviour(mirrored_supervisor). + +-define(MS, mirrored_supervisor). + +%% --------------------------------------------------------------------------- +%% Functional tests +%% --------------------------------------------------------------------------- + +all_tests() -> + passed = test_migrate(), + passed = test_migrate_twice(), + passed = test_already_there(), + passed = test_delete_restart(), + passed = test_which_children(), + passed = test_large_group(), + passed = test_childspecs_at_init(), + passed = test_anonymous_supervisors(), + passed = test_no_migration_on_shutdown(), + passed = test_start_idempotence(), + passed = test_unsupported(), + passed = test_ignore(), + passed. + +%% Simplest test +test_migrate() -> + with_sups(fun([A, _]) -> + ?MS:start_child(a, childspec(worker)), + Pid1 = pid_of(worker), + kill(A, Pid1), + Pid2 = pid_of(worker), + false = (Pid1 =:= Pid2) + end, [a, b]). + +%% Is migration transitive? +test_migrate_twice() -> + with_sups(fun([A, B]) -> + ?MS:start_child(a, childspec(worker)), + Pid1 = pid_of(worker), + kill(A, Pid1), + {ok, C} = start_sup(c), + Pid2 = pid_of(worker), + kill(B, Pid2), + Pid3 = pid_of(worker), + false = (Pid1 =:= Pid3), + kill(C) + end, [a, b]). + +%% Can't start the same child twice +test_already_there() -> + with_sups(fun([_, _]) -> + S = childspec(worker), + {ok, Pid} = ?MS:start_child(a, S), + {error, {already_started, Pid}} = ?MS:start_child(b, S) + end, [a, b]). + +%% Deleting and restarting should work as per a normal supervisor +test_delete_restart() -> + with_sups(fun([_, _]) -> + S = childspec(worker), + {ok, Pid1} = ?MS:start_child(a, S), + {error, running} = ?MS:delete_child(a, worker), + ok = ?MS:terminate_child(a, worker), + ok = ?MS:delete_child(a, worker), + {ok, Pid2} = ?MS:start_child(b, S), + false = (Pid1 =:= Pid2), + ok = ?MS:terminate_child(b, worker), + {ok, Pid3} = ?MS:restart_child(b, worker), + Pid3 = pid_of(worker), + false = (Pid2 =:= Pid3), + %% Not the same supervisor as the worker is on + ok = ?MS:terminate_child(a, worker), + ok = ?MS:delete_child(a, worker), + {ok, Pid4} = ?MS:start_child(a, S), + false = (Pid3 =:= Pid4) + end, [a, b]). + +test_which_children() -> + with_sups( + fun([A, B] = Both) -> + ?MS:start_child(A, childspec(worker)), + assert_wc(Both, fun ([C]) -> true = is_pid(wc_pid(C)) end), + ok = ?MS:terminate_child(a, worker), + assert_wc(Both, fun ([C]) -> undefined = wc_pid(C) end), + {ok, _} = ?MS:restart_child(a, worker), + assert_wc(Both, fun ([C]) -> true = is_pid(wc_pid(C)) end), + ?MS:start_child(B, childspec(worker2)), + assert_wc(Both, fun (C) -> 2 = length(C) end) + end, [a, b]). + +assert_wc(Sups, Fun) -> + [Fun(?MS:which_children(Sup)) || Sup <- Sups]. + +wc_pid(Child) -> + {worker, Pid, worker, [mirrored_supervisor_tests]} = Child, + Pid. + +%% Not all the members of the group should actually do the failover +test_large_group() -> + with_sups(fun([A, _, _, _]) -> + ?MS:start_child(a, childspec(worker)), + Pid1 = pid_of(worker), + kill(A, Pid1), + Pid2 = pid_of(worker), + false = (Pid1 =:= Pid2) + end, [a, b, c, d]). + +%% Do childspecs work when returned from init? +test_childspecs_at_init() -> + S = childspec(worker), + with_sups(fun([A, _]) -> + Pid1 = pid_of(worker), + kill(A, Pid1), + Pid2 = pid_of(worker), + false = (Pid1 =:= Pid2) + end, [{a, [S]}, {b, [S]}]). + +test_anonymous_supervisors() -> + with_sups(fun([A, _B]) -> + ?MS:start_child(A, childspec(worker)), + Pid1 = pid_of(worker), + kill(A, Pid1), + Pid2 = pid_of(worker), + false = (Pid1 =:= Pid2) + end, [anon, anon]). + +%% When a mirrored_supervisor terminates, we should not migrate, but +%% the whole supervisor group should shut down. To test this we set up +%% a situation where the gen_server will only fail if it's running +%% under the supervisor called 'evil'. It should not migrate to +%% 'good' and survive, rather the whole group should go away. +test_no_migration_on_shutdown() -> + with_sups(fun([Evil, _]) -> + ?MS:start_child(Evil, childspec(worker)), + try + call(worker, ping), + exit(worker_should_not_have_migrated) + catch exit:{timeout_waiting_for_server, _} -> + ok + end + end, [evil, good]). + +test_start_idempotence() -> + with_sups(fun([_]) -> + CS = childspec(worker), + {ok, Pid} = ?MS:start_child(a, CS), + {error, {already_started, Pid}} = ?MS:start_child(a, CS), + ?MS:terminate_child(a, worker), + {error, already_present} = ?MS:start_child(a, CS) + end, [a]). + +test_unsupported() -> + try + ?MS:start_link({global, foo}, get_group(group), ?MODULE, + {sup, one_for_one, []}), + exit(no_global) + catch error:badarg -> + ok + end, + try + ?MS:start_link({local, foo}, get_group(group), ?MODULE, + {sup, simple_one_for_one, []}), + exit(no_sofo) + catch error:badarg -> + ok + end, + passed. + +%% Just test we don't blow up +test_ignore() -> + ?MS:start_link({local, foo}, get_group(group), ?MODULE, + {sup, fake_strategy_for_ignore, []}), + passed. + +%% --------------------------------------------------------------------------- + +with_sups(Fun, Sups) -> + inc_group(), + Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups], + Fun(Pids), + [kill(Pid) || Pid <- Pids, is_process_alive(Pid)], + passed. + +start_sup(Spec) -> + start_sup(Spec, group). + +start_sup({Name, ChildSpecs}, Group) -> + {ok, Pid} = start_sup0(Name, get_group(Group), ChildSpecs), + %% We are not a supervisor, when we kill the supervisor we do not + %% want to die! + unlink(Pid), + {ok, Pid}; + +start_sup(Name, Group) -> + start_sup({Name, []}, Group). + +start_sup0(anon, Group, ChildSpecs) -> + ?MS:start_link(Group, ?MODULE, {sup, one_for_one, ChildSpecs}); + +start_sup0(Name, Group, ChildSpecs) -> + ?MS:start_link({local, Name}, Group, ?MODULE, + {sup, one_for_one, ChildSpecs}). + +childspec(Id) -> + {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. + +start_gs(Id) -> + gen_server:start_link({local, Id}, ?MODULE, server, []). + +pid_of(Id) -> + {received, Pid, ping} = call(Id, ping), + Pid. + +inc_group() -> + Count = case get(counter) of + undefined -> 0; + C -> C + end + 1, + put(counter, Count). + +get_group(Group) -> + {Group, get(counter)}. + +call(Id, Msg) -> call(Id, Msg, 100, 10). + +call(Id, Msg, 0, _Decr) -> + exit({timeout_waiting_for_server, {Id, Msg}}); + +call(Id, Msg, MaxDelay, Decr) -> + try + gen_server:call(Id, Msg, infinity) + catch exit:_ -> timer:sleep(Decr), + call(Id, Msg, MaxDelay - Decr, Decr) + end. + +kill(Pid) -> kill(Pid, []). +kill(Pid, Wait) when is_pid(Wait) -> kill(Pid, [Wait]); +kill(Pid, Waits) -> + erlang:monitor(process, Pid), + [erlang:monitor(process, P) || P <- Waits], + exit(Pid, kill), + kill_wait(Pid), + [kill_wait(P) || P <- Waits]. + +kill_wait(Pid) -> + receive + {'DOWN', _Ref, process, Pid, _Reason} -> + ok + end. + +%% --------------------------------------------------------------------------- +%% Dumb gen_server we can supervise +%% --------------------------------------------------------------------------- + +init({sup, fake_strategy_for_ignore, _ChildSpecs}) -> + ignore; + +init({sup, Strategy, ChildSpecs}) -> + {ok, {{Strategy, 0, 1}, ChildSpecs}}; + +init(server) -> + {ok, state}. + +handle_call(Msg, _From, State) -> + die_if_my_supervisor_is_evil(), + {reply, {received, self(), Msg}, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +die_if_my_supervisor_is_evil() -> + try lists:keysearch(self(), 2, ?MS:which_children(evil)) of + false -> ok; + _ -> exit(doooom) + catch + exit:{noproc, _} -> ok + end. diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl new file mode 100644 index 00000000..8926b83b --- /dev/null +++ b/src/pg2_fixed.erl @@ -0,0 +1,400 @@ +%% This is the version of pg2 from R14B02, which contains the fix +%% described at +%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html. +%% The changes are a search-and-replace to rename the module and avoid +%% clashes with other versions of pg2, and also a simple rewrite of +%% "andalso" and "orelse" expressions to case statements where the second +%% operand is not a boolean since R12B does not allow this. + +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2010. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(pg2_fixed). + +-export([create/1, delete/1, join/2, leave/2]). +-export([get_members/1, get_local_members/1]). +-export([get_closest_pid/1, which_groups/0]). +-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, + terminate/2]). + +%%% As of R13B03 monitors are used instead of links. + +%%% +%%% Exported functions +%%% + +-spec start_link() -> {'ok', pid()} | {'error', term()}. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +-spec start() -> {'ok', pid()} | {'error', term()}. + +start() -> + ensure_started(). + +-spec create(term()) -> 'ok'. + +create(Name) -> + ensure_started(), + case ets:member(pg2_fixed_table, {group, Name}) of + false -> + global:trans({{?MODULE, Name}, self()}, + fun() -> + gen_server:multi_call(?MODULE, {create, Name}) + end), + ok; + true -> + ok + end. + +-type name() :: term(). + +-spec delete(name()) -> 'ok'. + +delete(Name) -> + ensure_started(), + global:trans({{?MODULE, Name}, self()}, + fun() -> + gen_server:multi_call(?MODULE, {delete, Name}) + end), + ok. + +-spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}. + +join(Name, Pid) when is_pid(Pid) -> + ensure_started(), + case ets:member(pg2_fixed_table, {group, Name}) of + false -> + {error, {no_such_group, Name}}; + true -> + global:trans({{?MODULE, Name}, self()}, + fun() -> + gen_server:multi_call(?MODULE, + {join, Name, Pid}) + end), + ok + end. + +-spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}. + +leave(Name, Pid) when is_pid(Pid) -> + ensure_started(), + case ets:member(pg2_fixed_table, {group, Name}) of + false -> + {error, {no_such_group, Name}}; + true -> + global:trans({{?MODULE, Name}, self()}, + fun() -> + gen_server:multi_call(?MODULE, + {leave, Name, Pid}) + end), + ok + end. + +-type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}. + +-spec get_members(name()) -> get_members_ret(). + +get_members(Name) -> + ensure_started(), + case ets:member(pg2_fixed_table, {group, Name}) of + true -> + group_members(Name); + false -> + {error, {no_such_group, Name}} + end. + +-spec get_local_members(name()) -> get_members_ret(). + +get_local_members(Name) -> + ensure_started(), + case ets:member(pg2_fixed_table, {group, Name}) of + true -> + local_group_members(Name); + false -> + {error, {no_such_group, Name}} + end. + +-spec which_groups() -> [name()]. + +which_groups() -> + ensure_started(), + all_groups(). + +-type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}. + +-spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}. + +get_closest_pid(Name) -> + case get_local_members(Name) of + [Pid] -> + Pid; + [] -> + {_,_,X} = erlang:now(), + case get_members(Name) of + [] -> {error, {no_process, Name}}; + Members -> + lists:nth((X rem length(Members))+1, Members) + end; + Members when is_list(Members) -> + {_,_,X} = erlang:now(), + lists:nth((X rem length(Members))+1, Members); + Else -> + Else + end. + +%%% +%%% Callback functions from gen_server +%%% + +-record(state, {}). + +-spec init([]) -> {'ok', #state{}}. + +init([]) -> + Ns = nodes(), + net_kernel:monitor_nodes(true), + lists:foreach(fun(N) -> + {?MODULE, N} ! {new_pg2_fixed, node()}, + self() ! {nodeup, N} + end, Ns), + pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]), + {ok, #state{}}. + +-type call() :: {'create', name()} + | {'delete', name()} + | {'join', name(), pid()} + | {'leave', name(), pid()}. + +-spec handle_call(call(), _, #state{}) -> + {'reply', 'ok', #state{}}. + +handle_call({create, Name}, _From, S) -> + assure_group(Name), + {reply, ok, S}; +handle_call({join, Name, Pid}, _From, S) -> + case ets:member(pg2_fixed_table, {group, Name}) of + true -> join_group(Name, Pid); + _ -> ok + end, + {reply, ok, S}; +handle_call({leave, Name, Pid}, _From, S) -> + case ets:member(pg2_fixed_table, {group, Name}) of + true -> leave_group(Name, Pid); + _ -> ok + end, + {reply, ok, S}; +handle_call({delete, Name}, _From, S) -> + delete_group(Name), + {reply, ok, S}; +handle_call(Request, From, S) -> + error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n" + "handle_call(~p, ~p, _)\n", + [Request, From]), + {noreply, S}. + +-type all_members() :: [[name(),...]]. +-type cast() :: {'exchange', node(), all_members()} + | {'del_member', name(), pid()}. + +-spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}. + +handle_cast({exchange, _Node, List}, S) -> + store(List), + {noreply, S}; +handle_cast(_, S) -> + %% Ignore {del_member, Name, Pid}. + {noreply, S}. + +-spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}. + +handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> + member_died(MonitorRef), + {noreply, S}; +handle_info({nodeup, Node}, S) -> + gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), + {noreply, S}; +handle_info({new_pg2_fixed, Node}, S) -> + gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), + {noreply, S}; +handle_info(_, S) -> + {noreply, S}. + +-spec terminate(term(), #state{}) -> 'ok'. + +terminate(_Reason, _S) -> + true = ets:delete(pg2_fixed_table), + ok. + +%%% +%%% Local functions +%%% + +%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the +%%% table is ordered_set, and the fast matching of partially +%%% instantiated keys is used extensively. +%%% +%%% {{group, Name}} +%%% Process group Name. +%%% {{ref, Pid}, RPid, MonitorRef, Counter} +%%% {{ref, MonitorRef}, Pid} +%%% Each process has one monitor. Sometimes a process is spawned to +%%% monitor the pid (RPid). Counter is incremented when the Pid joins +%%% some group. +%%% {{member, Name, Pid}, GroupCounter} +%%% {{local_member, Name, Pid}} +%%% Pid is a member of group Name, GroupCounter is incremented when the +%%% Pid joins the group Name. +%%% {{pid, Pid, Name}} +%%% Pid is a member of group Name. + +store(List) -> + _ = [case assure_group(Name) of + true -> + [join_group(Name, P) || P <- Members -- group_members(Name)]; + _ -> + ok + end || [Name, Members] <- List], + ok. + +assure_group(Name) -> + Key = {group, Name}, + ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}). + +delete_group(Name) -> + _ = [leave_group(Name, Pid) || Pid <- group_members(Name)], + true = ets:delete(pg2_fixed_table, {group, Name}), + ok. + +member_died(Ref) -> + [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}), + Names = member_groups(Pid), + _ = [leave_group(Name, P) || + Name <- Names, + P <- member_in_group(Pid, Name)], + %% Kept for backward compatibility with links. Can be removed, eventually. + _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) || + Name <- Names], + ok. + +join_group(Name, Pid) -> + Ref_Pid = {ref, Pid}, + try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1}) + catch _:_ -> + {RPid, Ref} = do_monitor(Pid), + true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}), + true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid}) + end, + Member_Name_Pid = {member, Name, Pid}, + try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1}) + catch _:_ -> + true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}), + _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) || + node(Pid) =:= node()], + true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}}) + end. + +leave_group(Name, Pid) -> + Member_Name_Pid = {member, Name, Pid}, + try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of + N -> + if + N =:= 0 -> + true = ets:delete(pg2_fixed_table, {pid, Pid, Name}), + _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) || + node(Pid) =:= node()], + true = ets:delete(pg2_fixed_table, Member_Name_Pid); + true -> + ok + end, + Ref_Pid = {ref, Pid}, + case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of + 0 -> + [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid), + true = ets:delete(pg2_fixed_table, {ref, Ref}), + true = ets:delete(pg2_fixed_table, Ref_Pid), + true = erlang:demonitor(Ref, [flush]), + kill_monitor_proc(RPid, Pid); + _ -> + ok + end + catch _:_ -> + ok + end. + +all_members() -> + [[G, group_members(G)] || G <- all_groups()]. + +group_members(Name) -> + [P || + [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}), + _ <- lists:seq(1, N)]. + +local_group_members(Name) -> + [P || + [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}), + P <- member_in_group(Pid, Name)]. + +member_in_group(Pid, Name) -> + case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of + [] -> []; + [{{member, Name, Pid}, N}] -> + lists:duplicate(N, Pid) + end. + +member_groups(Pid) -> + [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})]. + +all_groups() -> + [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})]. + +ensure_started() -> + case whereis(?MODULE) of + undefined -> + C = {pg2_fixed, {?MODULE, start_link, []}, permanent, + 1000, worker, [?MODULE]}, + supervisor:start_child(kernel_safe_sup, C); + Pg2_FixedPid -> + {ok, Pg2_FixedPid} + end. + + +kill_monitor_proc(RPid, Pid) -> + case RPid of + Pid -> ok; + _ -> exit(RPid, kill) + end. + +%% When/if erlang:monitor() returns before trying to connect to the +%% other node this function can be removed. +do_monitor(Pid) -> + case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of + true -> + %% Assume the node is still up + {Pid, erlang:monitor(process, Pid)}; + false -> + F = fun() -> + Ref = erlang:monitor(process, Pid), + receive + {'DOWN', Ref, process, Pid, _Info} -> + exit(normal) + end + end, + erlang:spawn_monitor(F) + end. diff --git a/src/rabbit.erl b/src/rabbit.erl index b1a8dc46..20b3e275 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -19,7 +19,7 @@ -behaviour(application). -export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, environment/0, - rotate_logs/1]). + rotate_logs/1, force_event_refresh/0]). -export([start/2, stop/1]). @@ -187,8 +187,9 @@ -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(stop_and_halt/0 :: () -> 'ok'). +-spec(stop_and_halt/0 :: () -> no_return()). -spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -520,6 +521,12 @@ log_rotation_result(ok, {error, SaslLogError}) -> log_rotation_result(ok, ok) -> ok. +force_event_refresh() -> + rabbit_direct:force_event_refresh(), + rabbit_networking:force_connection_event_refresh(), + rabbit_channel:force_event_refresh(), + rabbit_amqqueue:force_event_refresh(). + %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0d8b9a82..b3e92b69 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -21,7 +21,8 @@ -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/3, reject/4]). --export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). @@ -82,6 +83,7 @@ -> 'ok' | rabbit_types:channel_exit()). -spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()). +-spec(list/0 :: () -> [rabbit_types:amqqueue()]). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()). @@ -91,6 +93,7 @@ -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]). +-spec(force_event_refresh/0 :: () -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean()}]). @@ -119,12 +122,13 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) -> + ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', - rabbit_types:ctag(), boolean(), any()) + (rabbit_types:amqqueue(), boolean(), pid(), + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -317,7 +321,7 @@ check_declare_arguments(QueueName, Args) -> ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, - "invalid arg '~s' for ~s: ~w", + "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- [{<<"x-expires">>, fun check_integer_argument/2}, @@ -360,6 +364,9 @@ check_ha_policy_argument({longstr, Policy}, _Args) -> check_ha_policy_argument({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. +list() -> + mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -382,6 +389,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +force_event_refresh() -> + [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()], + ok. + consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). @@ -431,19 +442,17 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). -limit_all(QPids, ChPid, LimiterPid) -> +limit_all(QPids, ChPid, Limiter) -> delegate:invoke_no_result( - QPids, fun (QPid) -> - gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) - end). + QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate_call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5279c07e..734b2291 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -58,7 +58,7 @@ %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, - limiter_pid, + limiter, monitor_ref, acktags, is_limit_active, @@ -88,8 +88,8 @@ messages, consumers, memory, - backing_queue_status, - slave_pids + slave_pids, + backing_queue_status ]). -define(CREATION_EVENT_KEYS, @@ -99,10 +99,12 @@ auto_delete, arguments, owner_pid, - mirror_nodes + slave_pids, + synchronised_slave_pids ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, + ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]). %%---------------------------------------------------------------------------- @@ -164,11 +166,13 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); -terminate(Reason, State = #q{backing_queue = BQ}) -> +terminate(Reason, State = #q{q = #amqqueue{name = QName}, + backing_queue = BQ}) -> %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> rabbit_event:notify( - queue_deleted, [{pid, self()}]), + queue_deleted, [{pid, self()}, + {name, QName}]), BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. @@ -337,6 +341,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), is_limit_active = false, + limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, put(Key, C), C; @@ -357,9 +362,9 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, end. erase_ch_record(#cr{ch_pid = ChPid, - limiter_pid = LimiterPid, + limiter = Limiter, monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(LimiterPid, self()), + ok = rabbit_limiter:unregister(Limiter, self()), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. @@ -384,12 +389,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, ActiveConsumersTail} -> - C = #cr{limiter_pid = LimiterPid, + C = #cr{limiter = Limiter, unsent_message_count = Count, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send( LimiterPid, self(), AckRequired )) of + rabbit_limiter:can_send(Limiter, self(), AckRequired)) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -718,7 +723,40 @@ ensure_ttl_timer(State) -> now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, State) -> + {Prefix, Items1} = + case lists:member(synchronised_slave_pids, Items) of + true -> Prefix1 = slaves_status(State), + case lists:member(slave_pids, Items) of + true -> {Prefix1, Items -- [slave_pids]}; + false -> {proplists:delete(slave_pids, Prefix1), Items} + end; + false -> {[], Items} + end, + Prefix ++ [{Item, i(Item, State)} + || Item <- (Items1 -- [synchronised_slave_pids])]. + +slaves_status(#q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> + [{slave_pids, ''}, {synchronised_slave_pids, ''}]; + _ -> + {Results, _Bad} = + delegate:invoke( + SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + {SPids1, SSPids} = + lists:foldl( + fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> + {[Pid | SPidsN], + case proplists:get_bool(is_synchronised, Infos) of + true -> [Pid | SSPidsN]; + false -> SSPidsN + end} + end, {[], []}, Results), + [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] + end. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -750,14 +788,15 @@ i(consumers, State) -> i(memory, _) -> {memory, M} = process_info(self(), memory), M; +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, + slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> []; + _ -> SPids + end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); -i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - SPids; -i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name), - MNodes; i(Item, _) -> throw({bad_argument, Item}). @@ -793,10 +832,13 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {basic_consume, _, _, _, _, _, _} -> 7; + {basic_cancel, _, _, _} -> 7; + stat -> 7; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -915,7 +957,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, Remaining, Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, LimiterPid, +handle_call({basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{exclusive_consumer = ExistingHolder}) -> case check_exclusive_access(ExistingHolder, ExclusiveConsume, @@ -926,10 +968,11 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record( + C#cr{consumer_count = ConsumerCount +1, + limiter = Limiter}), ok = case ConsumerCount of - 0 -> rabbit_limiter:register(LimiterPid, self()); + 0 -> rabbit_limiter:register(Limiter, self()); _ -> ok end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -962,12 +1005,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid} -> + limiter = Limiter} -> C1 = C#cr{consumer_count = ConsumerCount -1}, maybe_store_ch_record( case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), - C1#cr{limiter_pid = undefined}; + 1 -> ok = rabbit_limiter:unregister(Limiter, self()), + C1#cr{limiter = rabbit_limiter:make_token()}; _ -> C1 end), emit_consumer_deleted(ChPid, ConsumerTag), @@ -1073,20 +1116,20 @@ handle_cast({notify_sent, ChPid}, State) -> C#cr{unsent_message_count = Count - 1} end)); -handle_cast({limit, ChPid, LimiterPid}, State) -> +handle_cast({limit, ChPid, Limiter}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter_pid = OldLimiterPid, - is_limit_active = Limited}) -> - if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> - ok = rabbit_limiter:register(LimiterPid, self()); - true -> - ok + fun (C = #cr{consumer_count = ConsumerCount, + limiter = OldLimiter, + is_limit_active = OldLimited}) -> + case (ConsumerCount =/= 0 andalso + not rabbit_limiter:is_enabled(OldLimiter)) of + true -> ok = rabbit_limiter:register(Limiter, self()); + false -> ok end, - NewLimited = Limited andalso LimiterPid =/= undefined, - C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), + C#cr{limiter = Limiter, is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> @@ -1100,6 +1143,16 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), + emit_consumer_created(Ch, CTag, true, AckRequired) + end, noreply(State). handle_info(maybe_expire, State) -> diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index d358a041..22691ef9 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -106,7 +106,7 @@ qc_publish(#state{bqstate = BQ}) -> [qc_message(), #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), - expiry = choose(0, 10)}, + expiry = oneof([undefined | lists:seq(1, 10)])}, self(), BQ]}. qc_publish_multiple(#state{bqstate = BQ}) -> @@ -375,7 +375,7 @@ rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. dropfun(Props) -> Expiry = eval({call, erlang, element, [?RECORD_INDEX(expiry, message_properties), Props]}), - Expiry =/= 0. + Expiry =/= 1. drop_messages(Messages) -> case queue:out(Messages) of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 45f0032d..dfe84644 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -23,14 +23,17 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([refresh_config_all/0, ready_for_close/1]). +-export([refresh_config_local/0, ready_for_close/1]). +-export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2, format_message_queue/2]). +%% Internal +-export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter_pid, start_limiter_fun, tx_status, next_tag, + limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_ack_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, @@ -71,8 +74,7 @@ -spec(start_link/10 :: (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> - rabbit_types:ok_pid_or_error()). + pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -85,23 +87,25 @@ -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). +-spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). --spec(refresh_config_all/0 :: () -> 'ok'). +-spec(refresh_config_local/0 :: () -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). +-spec(force_event_refresh/0 :: () -> 'ok'). -endif. %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun) -> + Capabilities, CollectorPid, Limiter) -> gen_server2:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, - VHost, Capabilities, CollectorPid, StartLimiterFun], []). + VHost, Capabilities, CollectorPid, Limiter], []). do(Pid, Method) -> do(Pid, Method, none). @@ -128,6 +132,10 @@ confirm(Pid, MsgSeqNos) -> gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). list() -> + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_channel, list_local, []). + +list_local() -> pg_local:get_members(rabbit_channels). info_keys() -> ?INFO_KEYS. @@ -147,18 +155,22 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). -refresh_config_all() -> +refresh_config_local() -> rabbit_misc:upmap( - fun (C) -> gen_server2:call(C, refresh_config) end, list()), + fun (C) -> gen_server2:call(C, refresh_config) end, list_local()), ok. ready_for_close(Pid) -> gen_server2:cast(Pid, ready_for_close). +force_event_refresh() -> + [gen_server2:cast(C, force_event_refresh) || C <- list()], + ok. + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun]) -> + Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -168,8 +180,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, reader_pid = ReaderPid, writer_pid = WriterPid, conn_pid = ConnPid, - limiter_pid = undefined, - start_limiter_fun = StartLimiterFun, + limiter = Limiter, tx_status = none, next_tag = 1, unacked_message_q = queue:new(), @@ -296,6 +307,10 @@ handle_cast({deliver, ConsumerTag, AckRequired, rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); + +handle_cast(force_event_refresh, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), + noreply(State); handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). @@ -704,7 +719,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid, - limiter_pid = LimiterPid, + limiter = Limiter, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -723,7 +738,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), LimiterPid, + Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -797,22 +812,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> undefined; - {undefined, _} -> start_limiter(State); - {_, _} -> LimiterPid - end, - LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of - ok -> LimiterPid1; - stopped -> unlimit_queues(State) - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, + State = #ch{limiter = Limiter}) -> + Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of + {false, 0} -> Limiter; + {false, _} -> enable_limiter(State); + {_, _} -> Limiter + end, + Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of + ok -> Limiter1; + {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), + Limiter2 + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, - limiter_pid = LimiterPid}) -> + limiter = Limiter}) -> OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -826,7 +842,7 @@ handle_method(#'basic.recover_async'{requeue = true}, QPid, lists:reverse(MsgIds), self()) end) end, ok, UAMQ), - ok = notify_limiter(LimiterPid, UAMQ), + ok = notify_limiter(Limiter, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1073,23 +1089,23 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> NoWait, #'confirm.select_ok'{}); handle_method(#'channel.flow'{active = true}, _, - State = #ch{limiter_pid = LimiterPid}) -> - LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of - ok -> LimiterPid; - stopped -> unlimit_queues(State) - end, - {reply, #'channel.flow_ok'{active = true}, - State#ch{limiter_pid = LimiterPid1}}; + State = #ch{limiter = Limiter}) -> + Limiter2 = case rabbit_limiter:unblock(Limiter) of + ok -> Limiter; + {disabled, Limiter1} -> ok = limit_queues(Limiter1, State), + Limiter1 + end, + {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}}; handle_method(#'channel.flow'{active = false}, _, - State = #ch{limiter_pid = LimiterPid, - consumer_mapping = Consumers}) -> - LimiterPid1 = case LimiterPid of - undefined -> start_limiter(State); - Other -> Other - end, - State1 = State#ch{limiter_pid = LimiterPid1}, - ok = rabbit_limiter:block(LimiterPid1), + State = #ch{consumer_mapping = Consumers, + limiter = Limiter}) -> + Limiter1 = case rabbit_limiter:is_enabled(Limiter) of + true -> Limiter; + false -> enable_limiter(State) + end, + State1 = State#ch{limiter = Limiter1}, + ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} || @@ -1219,7 +1235,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter, Acked), {noreply, State#ch{unacked_message_q = Remaining}}. ack_record(DeliveryTag, ConsumerTag, @@ -1256,7 +1272,7 @@ ack(Acked, State) -> [{QPid, length(MsgIds)} | L] end, [], Acked), maybe_incr_stats(QIncs, ack, State), - ok = notify_limiter(State#ch.limiter_pid, Acked), + ok = notify_limiter(State#ch.limiter, Acked), State. new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), @@ -1280,17 +1296,14 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) -> - {ok, LPid} = SLF(queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid. - -unlimit_queues(State) -> - ok = limit_queues(undefined, State), - undefined. +enable_limiter(State = #ch{unacked_message_q = UAMQ, + limiter = Limiter}) -> + Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), + ok = limit_queues(Limiter1, State), + Limiter1. -limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). +limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter). consumer_queues(Consumers) -> lists:usort([QPid || @@ -1301,14 +1314,15 @@ consumer_queues(Consumers) -> %% for messages delivered to subscribed consumers, but not acks for %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) -notify_limiter(undefined, _Acked) -> - ok; -notify_limiter(LimiterPid, Acked) -> - case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of - 0 -> ok; - Count -> rabbit_limiter:ack(LimiterPid, Count) +notify_limiter(Limiter, Acked) -> + case rabbit_limiter:is_enabled(Limiter) of + false -> ok; + true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of + 0 -> ok; + Count -> rabbit_limiter:ack(Limiter, Count) + end end. deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ @@ -1446,10 +1460,10 @@ i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> queue:len(TAQ); -i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:get_limit(LimiterPid); -i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) -> - rabbit_limiter:is_blocked(LimiterPid); +i(prefetch_count, #ch{limiter = Limiter}) -> + rabbit_limiter:get_limit(Limiter); +i(client_flow_blocked, #ch{limiter = Limiter}) -> + rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 65ccca02..a19b6bfd 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -47,47 +47,44 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, []), - {ok, WriterPid} = - supervisor2:start_child( - SupPid, - {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), + {ok, SupPid} = supervisor2:start_link(?MODULE, + {tcp, Sock, Channel, FrameMax, + ReaderPid, Protocol}), + [LimiterPid] = supervisor2:find_child(SupPid, limiter), + [WriterPid] = supervisor2:find_child(SupPid, writer), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}) -> - {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, SupPid} = supervisor2:start_link(?MODULE, direct), + [LimiterPid] = supervisor2:find_child(SupPid, limiter), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + rabbit_limiter:make_token(LimiterPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- -init([]) -> - {ok, {{one_for_all, 0, 1}, []}}. - -start_limiter_fun(SupPid) -> - fun (UnackedCount) -> - Me = self(), - {ok, _Pid} = - supervisor2:start_child( - SupPid, - {limiter, {rabbit_limiter, start_link, [Me, UnackedCount]}, - transient, ?MAX_WAIT, worker, [rabbit_limiter]}) - end. +init(Type) -> + {ok, {{one_for_all, 0, 1}, child_specs(Type)}}. + +child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) -> + [{writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)]; +child_specs(direct) -> + [{limiter, {rabbit_limiter, start_link, []}, + transient, ?MAX_WAIT, worker, [rabbit_limiter]}]. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index e8afed0c..b9e550c9 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,7 +20,6 @@ -export([start/0, stop/0, action/5, diagnostics/1, log_action/3]). -define(RPC_TIMEOUT, infinity). --define(WAIT_FOR_VM_ATTEMPTS, 5). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -193,9 +192,9 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); -action(wait, Node, [], _Opts, Inform) -> +action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), - wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS); + wait_for_application(Node, PidFile, Inform); action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), @@ -356,23 +355,69 @@ action(report, Node, _Args, _Opts, Inform) -> %%---------------------------------------------------------------------------- -wait_for_application(Node, Attempts) -> +wait_for_application(Node, PidFile, Inform) -> + Pid = wait_and_read_pid_file(PidFile), + Inform("pid is ~s", [Pid]), + wait_for_application(Node, Pid). + +wait_for_application(Node, Pid) -> + case process_up(Pid) of + true -> case node_up(Node) of + true -> ok; + false -> timer:sleep(1000), + wait_for_application(Node, Pid) + end; + false -> {error, process_not_running} + end. + +wait_and_read_pid_file(PidFile) -> + case file:read_file(PidFile) of + {ok, Bin} -> string:strip(binary_to_list(Bin), right, $\n); + {error, enoent} -> timer:sleep(500), + wait_and_read_pid_file(PidFile); + {error, _} = E -> exit({error, {could_not_read_pid, E}}) + end. + +node_up(Node) -> case rpc_call(Node, application, which_applications, [infinity]) of - {badrpc, _} = E -> case Attempts of - 0 -> E; - _ -> wait_for_application0(Node, Attempts - 1) - end; - Apps -> case proplists:is_defined(rabbit, Apps) of - %% We've seen the node up; if it goes down - %% die immediately. - true -> ok; - false -> wait_for_application0(Node, 0) - end + {badrpc, _} -> false; + Apps -> proplists:is_defined(rabbit, Apps) end. -wait_for_application0(Node, Attempts) -> - timer:sleep(1000), - wait_for_application(Node, Attempts). +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +process_up(Pid) -> + with_os([{unix, fun () -> + system("ps -p " ++ Pid + ++ " >/dev/null 2>&1") =:= 0 + end}, + {win32, fun () -> + Res = os:cmd("tasklist /nh /fi \"pid eq " ++ + Pid ++ "\" 2>&1"), + case re:run(Res, "erl\\.exe", [{capture, none}]) of + match -> true; + _ -> false + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +% Like system(3) +system(Cmd) -> + ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", + Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), + receive {Port, {exit_status, Status}} -> Status end. + +% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" +escape_quotes(Cmd) -> + lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). + +%%---------------------------------------------------------------------------- default_if_empty(List, Default) when is_list(List) -> if List == [] -> Default; diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 7ff534ee..68afaf5d 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,10 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/8, disconnect/1]). +-export([boot/0, force_event_refresh/0, list/0, connect/5, + start_channel/8, disconnect/2]). +%% Internal +-export([list_local/0]). -include("rabbit.hrl"). @@ -25,8 +28,12 @@ -ifdef(use_specs). -spec(boot/0 :: () -> 'ok'). --spec(connect/4 :: (rabbit_types:username(), rabbit_types:vhost(), - rabbit_types:protocol(), rabbit_event:event_props()) -> +-spec(force_event_refresh/0 :: () -> 'ok'). +-spec(list/0 :: () -> [pid()]). +-spec(list_local/0 :: () -> [pid()]). +-spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(), + rabbit_types:protocol(), pid(), + rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). -spec(start_channel/8 :: @@ -34,7 +41,7 @@ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). --spec(disconnect/1 :: (rabbit_event:event_props()) -> 'ok'). +-spec(disconnect/2 :: (pid(), rabbit_event:event_props()) -> 'ok'). -endif. @@ -51,15 +58,27 @@ boot() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. +force_event_refresh() -> + [Pid ! force_event_refresh || Pid<- list()], + ok. + +list_local() -> + pg_local:get_members(rabbit_direct). + +list() -> + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_direct, list_local, []). + %%---------------------------------------------------------------------------- -connect(Username, VHost, Protocol, Infos) -> +connect(Username, VHost, Protocol, Pid, Infos) -> case lists:keymember(rabbit, 1, application:which_applications()) of true -> case rabbit_access_control:check_user_login(Username, []) of {ok, User} -> try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> rabbit_event:notify(connection_created, Infos), + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), {ok, {User, rabbit_reader:server_properties(Protocol)}} catch @@ -82,5 +101,6 @@ start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector}]), {ok, ChannelPid}. -disconnect(Infos) -> +disconnect(Pid, Infos) -> + pg_local:leave(rabbit_direct, Pid), rabbit_event:notify(connection_closed, Infos). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8f9ab032..24468a01 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -20,27 +20,36 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). --export([start_link/2]). +-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, + disable/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --ifdef(use_specs). +-record(token, {pid, enabled}). --type(maybe_pid() :: pid() | 'undefined'). +-ifdef(use_specs). --spec(start_link/2 :: (pid(), non_neg_integer()) -> - rabbit_types:ok_pid_or_error()). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). --spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). --spec(block/1 :: (maybe_pid()) -> 'ok'). --spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). --spec(is_blocked/1 :: (maybe_pid()) -> boolean()). +-export_type([token/0]). + +-opaque(token() :: #token{}). + +-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(make_token/0 :: () -> token()). +-spec(make_token/1 :: ('undefined' | pid()) -> token()). +-spec(is_enabled/1 :: (token()) -> boolean()). +-spec(enable/2 :: (token(), non_neg_integer()) -> token()). +-spec(disable/1 :: (token()) -> token()). +-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). +-spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). +-spec(register/2 :: (token(), pid()) -> 'ok'). +-spec(unregister/2 :: (token(), pid()) -> 'ok'). +-spec(get_limit/1 :: (token()) -> non_neg_integer()). +-spec(block/1 :: (token()) -> 'ok'). +-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). +-spec(is_blocked/1 :: (token()) -> boolean()). -endif. @@ -59,63 +68,63 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid, UnackedMsgCount) -> - gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). +start_link() -> gen_server2:start_link(?MODULE, [], []). + +make_token() -> make_token(undefined). +make_token(Pid) -> #token{pid = Pid, enabled = false}. + +is_enabled(#token{enabled = Enabled}) -> Enabled. + +enable(#token{pid = Pid} = Token, Volume) -> + gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity). -limit(undefined, 0) -> - ok; -limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). +disable(#token{pid = Pid} = Token) -> + gen_server2:call(Pid, {disable, Token}, infinity). + +limit(Limiter, PrefetchCount) -> + maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). %% Ask the limiter whether the queue can deliver a message without -%% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> - true; -can_send(LimiterPid, QPid, AckRequired) -> +%% breaching a limit. Note that we don't use maybe_call here in order +%% to avoid always going through with_exit_handler/2, even when the +%% limiter is disabled. +can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, - infinity) end). + fun () -> + gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) + end); +can_send(_, _, _) -> + true. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(undefined, _Count) -> ok; -ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}). +ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). -register(undefined, _QPid) -> ok; -register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}). +register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). -unregister(undefined, _QPid) -> ok; -unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}). +unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). -get_limit(undefined) -> - 0; -get_limit(Pid) -> +get_limit(Limiter) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:call(Pid, get_limit, infinity) end). + fun () -> maybe_call(Limiter, get_limit, 0) end). -block(undefined) -> - ok; -block(LimiterPid) -> - gen_server2:call(LimiterPid, block, infinity). +block(Limiter) -> + maybe_call(Limiter, block, ok). -unblock(undefined) -> - ok; -unblock(LimiterPid) -> - gen_server2:call(LimiterPid, unblock, infinity). +unblock(Limiter) -> + maybe_call(Limiter, {unblock, Limiter}, ok). -is_blocked(undefined) -> - false; -is_blocked(LimiterPid) -> - gen_server2:call(LimiterPid, is_blocked, infinity). +is_blocked(Limiter) -> + maybe_call(Limiter, is_blocked, false). %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid, UnackedMsgCount]) -> - {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +init([]) -> + {ok, #lim{}}. prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. @@ -135,23 +144,33 @@ handle_call({can_send, QPid, AckRequired}, _From, handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({limit, PrefetchCount}, _From, State) -> +handle_call({limit, PrefetchCount, Token}, _From, State) -> case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; -handle_call(unblock, _From, State) -> +handle_call({unblock, Token}, _From, State) -> case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> {reply, ok, State1}; - {stop, State1} -> {stop, normal, stopped, State1} + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#token{enabled = false}}, State1} end; handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}. + {reply, blocked(State), State}; + +handle_call({enable, Token, Channel, Volume}, _From, State) -> + {reply, Token#token{enabled = true}, + State#lim{ch_pid = Channel, volume = Volume}}; +handle_call({disable, Token}, _From, State) -> + {reply, Token#token{enabled = false}, State}. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; @@ -190,6 +209,16 @@ maybe_notify(OldState, NewState) -> false -> {cont, NewState} end. +maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> + gen_server2:call(Pid, Call, infinity); +maybe_call(_, _Call, Default) -> + Default. + +maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> + gen_server2:cast(Pid, Cast); +maybe_cast(_, _Call) -> + ok. + limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. @@ -227,7 +256,8 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> %% thus ensuring that each queue has an equal chance of %% being notified first. {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], + [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3] + || L3 <- [L2, L1]], ok end, State#lim{queues = NewQueues}. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index f6664a27..8ed2bede 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -16,7 +16,7 @@ -module(rabbit_mirror_queue_coordinator). --export([start_link/3, get_gm/1, ensure_monitoring/2]). +-export([start_link/4, get_gm/1, ensure_monitoring/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,15 +32,17 @@ -record(state, { q, gm, monitors, - death_fun + death_fun, + length_fun }). -define(ONE_SECOND, 1000). -ifdef(use_specs). --spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined', - rabbit_mirror_queue_master:death_fun()) -> +-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', + rabbit_mirror_queue_master:death_fun(), + rabbit_mirror_queue_master:length_fun()) -> rabbit_types:ok_pid_or_error()). -spec(get_gm/1 :: (pid()) -> pid()). -spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). @@ -53,7 +55,7 @@ %% %% A queue with mirrors consists of the following: %% -%% #amqqueue{ pid, mirror_pids } +%% #amqqueue{ pid, slave_pids } %% | | %% +----------+ +-------+--------------+-----------...etc... %% | | | @@ -138,9 +140,28 @@ %% state of the master. The detection of the sync-status of a slave is %% done entirely based on length: if the slave and the master both %% agree on the length of the queue after the fetch of the head of the -%% queue, then the queues must be in sync. The only other possibility -%% is that the slave's queue is shorter, and thus the fetch should be -%% ignored. +%% queue (or a 'set_length' results in a slave having to drop some +%% messages from the head of its queue), then the queues must be in +%% sync. The only other possibility is that the slave's queue is +%% shorter, and thus the fetch should be ignored. In case slaves are +%% joined to an empty queue which only goes on to receive publishes, +%% they start by asking the master to broadcast its length. This is +%% enough for slaves to always be able to work out when their head +%% does not differ from the master (and is much simpler and cheaper +%% than getting the master to hang on to the guid of the msg at the +%% head of its queue). When a slave is promoted to a master, it +%% unilaterally broadcasts its length, in order to solve the problem +%% of length requests from new slaves being unanswered by a dead +%% master. +%% +%% Obviously, due to the async nature of communication across gm, the +%% slaves can fall behind. This does not matter from a sync pov: if +%% they fall behind and the master dies then a) no publishes are lost +%% because all publishes go to all mirrors anyway; b) the worst that +%% happens is that acks get lost and so messages come back to +%% life. This is no worse than normal given you never get confirmation +%% that an ack has been received (not quite true with QoS-prefetch, +%% but close enough for jazz). %% %% Because acktags are issued by the bq independently, and because %% there is no requirement for the master and all slaves to use the @@ -279,8 +300,8 @@ %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []). +start_link(Queue, GM, DeathFun, LengthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) -> end, {ok, _TRef} = timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), - {ok, #state { q = Q, - gm = GM1, - monitors = dict:new(), - death_fun = DeathFun }, + {ok, #state { q = Q, + gm = GM1, + monitors = dict:new(), + death_fun = DeathFun, + length_fun = LengthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -317,18 +339,21 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). handle_cast({gm_deaths, Deaths}, - State = #state { q = #amqqueue { name = QueueName } }) -> - rabbit_log:info("Mirrored-queue (~s): Master ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), + State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) + when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node() -> + {ok, MPid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, + DeadPids), noreply(State); {error, not_found} -> {stop, normal, State} end; +handle_cast(request_length, State = #state { length_fun = LengthFun }) -> + ok = LengthFun(), + noreply(State); + handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Monitors }) -> Monitors1 = @@ -343,13 +368,12 @@ handle_cast({ensure_monitoring, Pids}, handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, State = #state { monitors = Monitors, - death_fun = Fun }) -> - noreply( - case dict:is_key(Pid, Monitors) of - false -> State; - true -> ok = Fun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } - end); + death_fun = DeathFun }) -> + noreply(case dict:is_key(Pid, Monitors) of + false -> State; + true -> ok = DeathFun(Pid), + State #state { monitors = dict:erase(Pid, Monitors) } + end); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -379,6 +403,8 @@ members_changed([CPid], _Births, Deaths) -> handle_msg([_CPid], _From, heartbeat) -> ok; +handle_msg([CPid], _From, request_length = Msg) -> + ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([_CPid], _From, _Msg) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index fe01ae9a..5fc6341f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0]). +-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -44,9 +44,10 @@ -ifdef(use_specs). --export_type([death_fun/0]). +-export_type([death_fun/0, length_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). +-type(length_fun() :: fun (() -> 'ok')). -type(master_state() :: #state { gm :: pid(), coordinator :: pid(), backing_queue :: atom(), @@ -65,6 +66,7 @@ -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). +-spec(length_fun/0 :: () -> length_fun()). -endif. @@ -87,7 +89,7 @@ stop() -> init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun()), + Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), MNodes1 = (case MNodes of @@ -98,6 +100,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -353,11 +356,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% --------------------------------------------------------------------------- promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + Len = BQ:len(BQS), + ok = gm:broadcast(GM, {length, Len}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), + set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -375,9 +380,18 @@ sender_death_fun() -> end) end. -%% --------------------------------------------------------------------------- -%% Helpers -%% --------------------------------------------------------------------------- +length_fun() -> + Self = self(), + fun () -> + rabbit_amqqueue:run_backing_queue( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + State + end) + end. maybe_store_acktag(undefined, _MsgId, AM) -> AM; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 8a3f1bc3..725e0c18 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -17,7 +17,8 @@ -module(rabbit_mirror_queue_misc). -export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3]). + drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, + report_deaths/4]). -include("rabbit.hrl"). @@ -48,6 +49,7 @@ %% become the new master, which is bad because it could then mean the %% slave (now master) receives messages it's not ready for (for %% example, new consumers). +%% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadPids) -> DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], rabbit_misc:execute_mnesia_transaction( @@ -58,27 +60,27 @@ remove_from_queue(QueueName, DeadPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids }] -> - [QPid1 | SPids1] = + [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], not lists:member(node(Pid), DeadNodes)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - ok; + {ok, QPid1, []}; _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. Q1 = Q #amqqueue { pid = QPid1, slave_pids = SPids1 }, - ok = rabbit_amqqueue:store_queue(Q1); + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, %% so leave alone to allow the promoted %% slave to find it and make its %% promotion atomic. - ok - end, - {ok, QPid1} + {ok, QPid1, []} + end end end). @@ -153,3 +155,17 @@ if_mirrored_queue(Queue, Fun) -> _ -> Fun(Q) end end). + +report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> + ok; +report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> + rabbit_event:notify(queue_mirror_deaths, [{name, QueueName}, + {pids, DeadPids}]), + rabbit_log:info("Mirrored-queue (~s): ~s ~s saw deaths of mirrors ~s~n", + [rabbit_misc:rs(QueueName), + case IsMaster of + true -> "Master"; + false -> "Slave" + end, + rabbit_misc:pid_to_string(MirrorPid), + [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3371380f..43962491 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -33,7 +33,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2]). +-export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/3, @@ -58,6 +58,15 @@ %%---------------------------------------------------------------------------- +-define(CREATION_EVENT_KEYS, + [pid, + name, + master_pid, + is_synchronised + ]). + +-define(INFO_KEYS, ?CREATION_EVENT_KEYS). + -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(DEATH_TIMEOUT, 20000). %% 20 seconds @@ -75,7 +84,9 @@ ack_num, msg_id_status, - known_senders + known_senders, + + synchronised }). start_link(Q) -> @@ -84,6 +95,9 @@ start_link(Q) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +info(QPid) -> + gen_server2:call(QPid, info, infinity). + init([#amqqueue { name = QueueName } = Q]) -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -106,26 +120,32 @@ init([#amqqueue { name = QueueName } = Q]) -> end), erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( - rabbit_amqqueue, set_maximum_since_use, [self()]), + rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( - self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), BQS = bq_init(BQ, Q, false), - {ok, #state { q = Q, - gm = GM, - master_pid = MPid, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = undefined, - sync_timer_ref = undefined, - - sender_queues = dict:new(), - msg_id_ack = dict:new(), - ack_num = 0, - - msg_id_status = dict:new(), - known_senders = dict:new() - }, hibernate, + State = #state { q = Q, + gm = GM, + master_pid = MPid, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + ack_num = 0, + + msg_id_status = dict:new(), + known_senders = dict:new(), + + synchronised = false + }, + rabbit_event:notify(queue_slave_created, + infos(?CREATION_EVENT_KEYS, State)), + ok = gm:broadcast(GM, request_length), + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> @@ -155,29 +175,32 @@ handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, gm = GM, master_pid = MPid }) -> - rabbit_log:info("Mirrored-queue (~s): Slave ~s saw deaths of mirrors ~s~n", - [rabbit_misc:rs(QueueName), - rabbit_misc:pid_to_string(self()), - [[rabbit_misc:pid_to_string(Pid), $ ] || Pid <- Deaths]]), %% The GM has told us about deaths, which means we're not going to %% receive any more messages from GM case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, Pid} when node(Pid) =:= node(MPid) -> - %% master hasn't changed - reply(ok, State); - {ok, Pid} when node(Pid) =:= node() -> - %% we've become master - promote_me(From, State); - {ok, Pid} -> - %% master has changed to not us. - gen_server2:reply(From, ok), - erlang:monitor(process, Pid), - ok = gm:broadcast(GM, heartbeat), - noreply(State #state { master_pid = Pid }); {error, not_found} -> gen_server2:reply(From, ok), - {stop, normal, State} - end. + {stop, normal, State}; + {ok, Pid, DeadPids} -> + rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + DeadPids), + if node(Pid) =:= node(MPid) -> + %% master hasn't changed + reply(ok, State); + node(Pid) =:= node() -> + %% we've become master + promote_me(From, State); + true -> + %% master has changed to not us. + gen_server2:reply(From, ok), + erlang:monitor(process, Pid), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_pid = Pid }) + end + end; + +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -270,6 +293,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _State) -> case Msg of + info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. @@ -306,6 +330,9 @@ members_changed([SPid], _Births, Deaths) -> handle_msg([_SPid], _From, heartbeat) -> ok; +handle_msg([_SPid], _From, request_length) -> + %% This is only of value to the master + ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; @@ -330,6 +357,14 @@ inform_deaths(SPid, Deaths) -> %% Others %% --------------------------------------------------------------------------- +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(pid, _State) -> self(); +i(name, #state { q = #amqqueue { name = Name } }) -> Name; +i(master_pid, #state { master_pid = MPid }) -> MPid; +i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(Item, _State) -> throw({bad_argument, Item}). + bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, @@ -395,7 +430,7 @@ gb_trees_cons(Key, Value, Tree) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, #state { q = Q, +promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, backing_queue_state = BQS, @@ -404,12 +439,14 @@ promote_me(From, #state { q = Q, msg_id_ack = MA, msg_id_status = MS, known_senders = KS }) -> + rabbit_event:notify(queue_slave_promoted, [{pid, self()}, + {name, QName}]), rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n", - [rabbit_misc:rs(Q #amqqueue.name), - rabbit_misc:pid_to_string(self())]), + [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]), Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q1, GM, rabbit_mirror_queue_master:sender_death_fun()), + Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), + rabbit_mirror_queue_master:length_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), @@ -760,7 +797,7 @@ process_instruction({set_length, Length}, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {ok, case ToDrop > 0 of + {ok, case ToDrop >= 0 of true -> BQS1 = lists:foldl( fun (const, BQSN) -> @@ -768,7 +805,8 @@ process_instruction({set_length, Length}, BQSN1} = BQ:fetch(false, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; + set_synchronised( + true, State #state { backing_queue_state = BQS1 }); false -> State end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, @@ -781,6 +819,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), maybe_store_ack(AckRequired, MsgId, AckTag, State #state { backing_queue_state = BQS1 }); + Other when Other + 1 =:= Remaining -> + set_synchronised(true, State); Other when Other < Remaining -> %% we must be shorter than the master State @@ -833,6 +873,10 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = dict:erase(ChPid, KS) } end}; +process_instruction({length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -860,3 +904,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, ack_num = Num }) -> State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. + +%% We intentionally leave out the head where a slave becomes +%% unsynchronised: we assert that can never happen. +set_synchronised(true, State = #state { q = #amqqueue { name = QName }, + synchronised = false }) -> + rabbit_event:notify(queue_slave_synchronised, [{pid, self()}, + {name, QName}]), + State #state { synchronised = true }; +set_synchronised(true, State) -> + State; +set_synchronised(false, State = #state { synchronised = false }) -> + State. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b98dbd46..ae28722a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -58,6 +58,7 @@ -export([is_process_alive/1]). -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). +-export([append_rpc_all_nodes/4]). %%---------------------------------------------------------------------------- @@ -208,6 +209,7 @@ -spec(pget/3 :: (term(), [term()], term()) -> term()). -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). +-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). -endif. @@ -954,3 +956,10 @@ format_message_queue_entry(V) when is_tuple(V) -> list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); format_message_queue_entry(_V) -> '_'. + +append_rpc_all_nodes(Nodes, M, F, A) -> + {ResL, _} = rpc:multicall(Nodes, M, F, A), + lists:append([case Res of + {badrpc, _} -> []; + _ -> Res + end || Res <- ResL]). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index b8f31d4a..665b15c5 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -24,7 +24,7 @@ create_cluster_nodes_config/1, read_cluster_nodes_config/0, record_running_nodes/0, read_previously_running_nodes/0, delete_previously_running_nodes/0, running_nodes_filename/0, - is_disc_node/0]). + is_disc_node/0, on_node_down/1, on_node_up/1]). -export([table_names/0]). @@ -67,6 +67,8 @@ -spec(delete_previously_running_nodes/0 :: () -> 'ok'). -spec(running_nodes_filename/0 :: () -> file:filename()). -spec(is_disc_node/0 :: () -> boolean()). +-spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_down/1 :: (node()) -> 'ok'). -spec(table_names/0 :: () -> [atom()]). @@ -87,7 +89,9 @@ status() -> no -> case all_clustered_nodes() of [] -> []; Nodes -> [{unknown, Nodes}] - end + end; + Reason when Reason =:= starting; Reason =:= stopping -> + exit({rabbit_busy, try_again_later}) end}, {running_nodes, running_clustered_nodes()}]. @@ -120,10 +124,21 @@ cluster(ClusterNodes, Force) -> ensure_mnesia_not_running(), ensure_mnesia_dir(), + case not Force andalso is_clustered() andalso + is_only_disc_node(node(), false) andalso + not should_be_disc_node(ClusterNodes) + of + true -> log_both("last running disc node leaving cluster"); + _ -> ok + end, + %% Wipe mnesia if we're changing type from disc to ram case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> error_logger:warning_msg( - "changing node type; wiping mnesia...~n~n"), + {true, false} -> rabbit_misc:with_local_io( + fun () -> error_logger:warning_msg( + "changing node type; wiping " + "mnesia...~n~n") + end), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema); _ -> ok @@ -161,6 +176,7 @@ cluster(ClusterNodes, Force) -> after stop_mnesia() end, + ok. %% return node to its virgin state, where it is not member of any @@ -281,7 +297,8 @@ table_definitions() -> [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, {match, #amqqueue{name = queue_name_match(), _='_'}}]}] - ++ gm:table_definitions(). + ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), @@ -327,14 +344,24 @@ ensure_mnesia_dir() -> ensure_mnesia_running() -> case mnesia:system_info(is_running) of - yes -> ok; - no -> throw({error, mnesia_not_running}) + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) end. ensure_mnesia_not_running() -> case mnesia:system_info(is_running) of - no -> ok; - yes -> throw({error, mnesia_unexpectedly_running}) + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) end. ensure_schema_integrity() -> @@ -692,6 +719,12 @@ wait_for_tables(TableNames) -> reset(Force) -> ensure_mnesia_not_running(), + case not Force andalso is_clustered() andalso + is_only_disc_node(node(), false) + of + true -> log_both("no other disc nodes running"); + false -> ok + end, Node = node(), case Force of true -> ok; @@ -739,6 +772,43 @@ leave_cluster(Nodes, RunningNodes) -> Nodes, RunningNodes}}) end. +wait_for(Condition) -> + error_logger:info_msg("Waiting for ~p...~n", [Condition]), + timer:sleep(1000). + +on_node_up(Node) -> + case is_only_disc_node(Node, true) of + true -> rabbit_misc:with_local_io( + fun () -> rabbit_log:info("cluster contains disc " + "nodes again~n") + end); + false -> ok + end. + +on_node_down(Node) -> + case is_only_disc_node(Node, true) of + true -> rabbit_misc:with_local_io( + fun () -> rabbit_log:info("only running disc node " + "went down~n") + end); + false -> ok + end. + +is_only_disc_node(Node, _MnesiaRunning = true) -> + RunningSet = sets:from_list(running_clustered_nodes()), + DiscSet = sets:from_list(nodes_of_type(disc_copies)), + [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); +is_only_disc_node(Node, false) -> + start_mnesia(), + Res = is_only_disc_node(Node, true), + stop_mnesia(), + Res. + +log_both(Warning) -> + io:format("Warning: ~s~n", [Warning]), + rabbit_misc:with_local_io( + fun () -> error_logger:warning_msg("~s~n", [Warning]) end). + start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f9587d21..cc12eb5d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2, sync/3]). + write/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -36,7 +36,7 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 5). %% milliseconds +-define(SYNC_INTERVAL, 25). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). -define(TRANSFORM_TMP, "transform_tmp"). @@ -60,7 +60,6 @@ current_file, %% current file name as number current_file_handle, %% current file handle since the last fsync? file_handle_cache, %% file handle cache - on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes @@ -133,7 +132,8 @@ -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). --type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())). +-type(maybe_msg_id_fun() :: + 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). @@ -153,8 +153,6 @@ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). --spec(sync/3 :: - ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). @@ -443,7 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -640,7 +637,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> current_file = 0, current_file_handle = undefined, file_handle_cache = dict:new(), - on_sync = [], sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, @@ -762,21 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({sync, MsgIds, K}, - State = #msstate { current_file = CurFile, - current_file_handle = CurHdl, - on_sync = Syncs }) -> - {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds) of - false -> K(), - noreply(State); - true -> noreply(State #msstate { on_sync = [K | Syncs] }) - end; - handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, @@ -855,17 +836,15 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {State, hibernate}; - _ -> {start_sync_timer(State), 0} + case dict:size(CTM) of + 0 -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = Syncs, - cref_to_msg_ids = CTM }) -> - case {Syncs, dict:size(CTM)} of - {[], 0} -> {stop_sync_timer(State), hibernate}; - _ -> {State, 0} +next_state(State = #msstate { cref_to_msg_ids = CTM }) -> + case dict:size(CTM) of + 0 -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> @@ -879,7 +858,6 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs, cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), CGs = dict:fold(fun (CRef, MsgIds, NS) -> @@ -888,16 +866,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, false -> [{CRef, MsgIds} | NS] end end, [], CTM), - ok = case {Syncs, CGs} of - {[], []} -> ok; - _ -> file_handle_cache:sync(CurHdl) + ok = case CGs of + [] -> ok; + _ -> file_handle_cache:sync(CurHdl) end, - [K() || K <- lists:reverse(Syncs)], - State2 = lists:foldl( - fun ({CRef, MsgIds}, StateN) -> - client_confirm(CRef, MsgIds, written, StateN) - end, State1, CGs), - State2 #msstate { on_sync = [] }. + lists:foldl(fun ({CRef, MsgIds}, StateN) -> + client_confirm(CRef, MsgIds, written, StateN) + end, State1, CGs). write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c6b18248..31f476fc 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -21,7 +21,7 @@ node_listeners/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, - close_connection/2]). + close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/2, @@ -30,6 +30,9 @@ -export([tcp_listener_started/3, tcp_listener_stopped/3, start_client/1, start_ssl_client/2]). +%% Internal +-export([connections_local/0]). + -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). @@ -59,6 +62,7 @@ -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(connections/0 :: () -> [rabbit_types:connection()]). +-spec(connections_local/0 :: () -> [rabbit_types:connection()]). -spec(connection_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(connection_info/1 :: (rabbit_types:connection()) -> rabbit_types:infos()). @@ -69,6 +73,8 @@ -spec(connection_info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(close_connection/2 :: (pid(), string()) -> 'ok'). +-spec(force_connection_event_refresh/0 :: () -> 'ok'). + -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) -> [{inet:ip_address(), ip_port(), family(), atom()}]). @@ -297,10 +303,13 @@ start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). connections() -> + rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), + rabbit_networking, connections_local, []). + +connections_local() -> [rabbit_connection_sup:reader(ConnSup) || - Node <- rabbit_mnesia:running_clustered_nodes(), {_, ConnSup, supervisor, _} - <- supervisor:which_children({rabbit_tcp_client_sup, Node})]. + <- supervisor:which_children(rabbit_tcp_client_sup)]. connection_info_keys() -> rabbit_reader:info_keys(). @@ -316,6 +325,10 @@ close_connection(Pid, Explanation) -> false -> throw({error, {not_a_connection_pid, Pid}}) end. +force_connection_event_refresh() -> + [rabbit_reader:force_event_refresh(C) || C <- connections()], + ok. + %%-------------------------------------------------------------------- tcp_host({0,0,0,0}) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 281830c7..8aa24ab5 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -61,24 +61,19 @@ notify_cluster() -> %%-------------------------------------------------------------------- init([]) -> - ok = net_kernel:monitor_nodes(true), {ok, no_state}. handle_call(_Request, _From, State) -> {noreply, State}. handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("node ~p up~n", [Node]), + rabbit_log:info("rabbit on ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), - ok = rabbit_alarm:on_node_up(Node), + ok = handle_live_rabbit(Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({nodedown, Node}, State) -> - rabbit_log:info("node ~p down~n", [Node]), - ok = handle_dead_rabbit(Node), - {noreply, State}; handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), ok = handle_dead_rabbit(Node), @@ -100,4 +95,9 @@ code_change(_OldVsn, State, _Extra) -> handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), - ok = rabbit_alarm:on_node_down(Node). + ok = rabbit_alarm:on_node_down(Node), + ok = rabbit_mnesia:on_node_down(Node). + +handle_live_rabbit(Node) -> + ok = rabbit_alarm:on_node_up(Node), + ok = rabbit_mnesia:on_node_up(Node). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bf89cdb2..636913b5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -569,13 +569,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, unacked = UnackedCount }) -> - Segment1 = Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries) }, - case Action of - del -> Segment1; - ack -> Segment1 #segment { unacked = UnackedCount - 1 }; - ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 } - end; + Segment #segment { + journal_entries = add_to_journal(RelSeq, Action, JEntries), + unacked = UnackedCount + case Action of + ?PUB -> +1; + del -> 0; + ack -> -1 + end}; add_to_journal(RelSeq, Action, JEntries) -> Val = case array:get(RelSeq, JEntries) of @@ -1013,7 +1013,7 @@ add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>, MsgId, expiry_to_binary(undefined)], Rest}; add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS, Rest>>) -> + RelSeq:?REL_SEQ_BITS, Rest/binary>>) -> {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, Rest}; add_queue_ttl_segment(_) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bc1080f2..3822aaeb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -18,7 +18,8 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]). +-export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1, + shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -68,6 +69,7 @@ -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). +-spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> @@ -132,6 +134,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +force_event_refresh(Pid) -> + gen_server:cast(Pid, force_event_refresh). + conserve_memory(Pid, Conserve) -> Pid ! {conserve_memory, Conserve}, ok. @@ -326,6 +331,10 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> catch Error -> {error, Error} end), mainloop(Deb, State); +handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> + rabbit_event:notify(connection_created, + [{type, network} | infos(?CREATION_EVENT_KEYS, State)]), + mainloop(Deb, State); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ed4efb47..cd5d9be0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -20,6 +20,8 @@ -export([all_tests/0, test_parsing/0]). +-import(rabbit_misc, [pget/2]). + -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). @@ -36,6 +38,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = gm_tests:all_tests(), + passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), passed = test_file_handle_cache(), @@ -85,6 +88,7 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_sync(SecondaryNode), passed = test_queue_cleanup(SecondaryNode), passed = test_declare_on_dead_queue(SecondaryNode), + passed = test_refresh_events(SecondaryNode), %% we now run the tests remotely, so that code coverage on the %% local node picks up more of the delegate @@ -94,7 +98,8 @@ run_cluster_dependent_tests(SecondaryNode) -> fun () -> Rs = [ test_delegates_async(Node), test_delegates_sync(Node), test_queue_cleanup(Node), - test_declare_on_dead_queue(Node) ], + test_declare_on_dead_queue(Node), + test_refresh_events(Node) ], Self ! {self(), Rs} end), receive @@ -1199,15 +1204,16 @@ test_server_status() -> {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + rabbit_limiter:make_token(self())), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], - ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, - <<"ctag">>, true, undefined), + ok = rabbit_amqqueue:basic_consume( + Q, true, Ch, rabbit_limiter:make_token(), + <<"ctag">>, true, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -1265,14 +1271,34 @@ test_spawn() -> Writer = spawn(fun () -> test_writer(Me) end), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1, - user(<<"guest">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + user(<<"guest">>), <<"/">>, [], Me, + rabbit_limiter:make_token(self())), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) end, {Writer, Ch}. +test_spawn(Node) -> + rpc:call(Node, ?MODULE, test_spawn_remote, []). + +%% Spawn an arbitrary long lived process, so we don't end up linking +%% the channel to the short-lived process (RPC, here) spun up by the +%% RPC server. +test_spawn_remote() -> + RPC = self(), + spawn(fun () -> + {Writer, Ch} = test_spawn(), + RPC ! {Writer, Ch}, + link(Ch), + receive + _ -> ok + end + end), + receive Res -> Res + after 1000 -> throw(failed_to_receive_result) + end. + user(Username) -> #user{username = Username, tags = [administrator], @@ -1280,25 +1306,6 @@ user(Username) -> impl = #internal_user{username = Username, tags = [administrator]}}. -test_statistics_event_receiver(Pid) -> - receive - Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) - end. - -test_statistics_receive_event(Ch, Matcher) -> - rabbit_channel:flush(Ch), - Ch ! emit_stats, - test_statistics_receive_event1(Ch, Matcher). - -test_statistics_receive_event1(Ch, Matcher) -> - receive #event{type = channel_stats, props = Props} -> - case Matcher(Props) of - true -> Props; - _ -> test_statistics_receive_event1(Ch, Matcher) - end - after 1000 -> throw(failed_to_receive_event) - end. - test_confirms() -> {_Writer, Ch} = test_spawn(), DeclareBindDurableQueue = @@ -1359,6 +1366,25 @@ test_confirms() -> passed. +test_statistics_event_receiver(Pid) -> + receive + Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) + end. + +test_statistics_receive_event(Ch, Matcher) -> + rabbit_channel:flush(Ch), + Ch ! emit_stats, + test_statistics_receive_event1(Ch, Matcher). + +test_statistics_receive_event1(Ch, Matcher) -> + receive #event{type = channel_stats, props = Props} -> + case Matcher(Props) of + true -> Props; + _ -> test_statistics_receive_event1(Ch, Matcher) + end + after 1000 -> throw(failed_to_receive_event) + end. + test_statistics() -> application:set_env(rabbit, collect_statistics, fine), @@ -1376,7 +1402,7 @@ test_statistics() -> QPid = Q#amqqueue.pid, X = rabbit_misc:r(<<"/">>, exchange, <<"">>), - rabbit_tests_event_receiver:start(self()), + rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), %% Check stats empty Event = test_statistics_receive_event(Ch, fun (_) -> true end), @@ -1419,6 +1445,40 @@ test_statistics() -> rabbit_tests_event_receiver:stop(), passed. +test_refresh_events(SecondaryNode) -> + rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode], + [channel_created, queue_created]), + + {_Writer, Ch} = test_spawn(), + expect_events(Ch, channel_created), + rabbit_channel:shutdown(Ch), + + {_Writer2, Ch2} = test_spawn(SecondaryNode), + expect_events(Ch2, channel_created), + rabbit_channel:shutdown(Ch2), + + {new, #amqqueue { pid = QPid } = Q} = + rabbit_amqqueue:declare(test_queue(), false, false, [], none), + expect_events(QPid, queue_created), + rabbit_amqqueue:delete(Q, false, false), + + rabbit_tests_event_receiver:stop(), + passed. + +expect_events(Pid, Type) -> + expect_event(Pid, Type), + rabbit:force_event_refresh(), + expect_event(Pid, Type). + +expect_event(Pid, Type) -> + receive #event{type = Type, props = Props} -> + case pget(pid, Props) of + Pid -> ok; + _ -> expect_event(Pid, Type) + end + after 1000 -> throw({failed_to_receive_event, Type}) + end. + test_delegates_async(SecondaryNode) -> Self = self(), Sender = fun (Pid) -> Pid ! {invoked, Self} end, @@ -1524,16 +1584,19 @@ test_queue_cleanup(_SecondaryNode) -> ok after 1000 -> throw(failed_to_receive_queue_declare_ok) end, + rabbit_channel:shutdown(Ch), rabbit:stop(), rabbit:start(), - rabbit_channel:do(Ch, #'queue.declare'{ passive = true, - queue = ?CLEANUP_QUEUE_NAME }), + {_Writer2, Ch2} = test_spawn(), + rabbit_channel:do(Ch2, #'queue.declare'{ passive = true, + queue = ?CLEANUP_QUEUE_NAME }), receive #'channel.close'{reply_code = ?NOT_FOUND} -> ok after 2000 -> throw(failed_to_receive_channel_exit) end, + rabbit_channel:shutdown(Ch2), passed. test_declare_on_dead_queue(SecondaryNode) -> @@ -1767,25 +1830,49 @@ msg_id_bin(X) -> msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). +on_disk_capture() -> + on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). +on_disk_capture({OnDisk, Awaiting, Pid}) -> + Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of + true -> Pid ! {self(), arrived}, undefined; + false -> Pid + end, + receive + {await, MsgIds, Pid2} -> + true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting), + on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2}); + {on_disk, MsgIds} -> + on_disk_capture({gb_sets:union(OnDisk, MsgIds), + gb_sets:subtract(Awaiting, MsgIds), + Pid1}); + stop -> + done + end. + +on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> + Pid ! {await, gb_sets:from_list(MsgIds), self()}, + receive {Pid, arrived} -> ok end. + +on_disk_stop(Pid) -> + MRef = erlang:monitor(process, Pid), + Pid ! stop, + receive {'DOWN', MRef, process, Pid, _Reason} -> + ok + end. + +msg_store_client_init_capture(MsgStore, Ref) -> + Pid = spawn(fun on_disk_capture/0), + {Pid, rabbit_msg_store:client_init( + MsgStore, Ref, fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. + msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( fun (MsgId, Atom1) when Atom1 =:= Atom -> rabbit_msg_store:contains(MsgId, MSCState) end, Atom, MsgIds). -msg_store_sync(MsgIds, MSCState) -> - Ref = make_ref(), - Self = self(), - ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end, - MSCState), - receive - {sync, Ref} -> ok - after - 10000 -> - io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), - throw(timeout) - end. - msg_store_read(MsgIds, MSCState) -> lists:foldl(fun (MsgId, MSCStateM) -> {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( @@ -1819,22 +1906,18 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), - Self = self(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), Ref = rabbit_guid:guid(), - MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds1stHalf), %% publish the second half ok = msg_store_write(MsgIds2ndHalf, MSCState), - %% sync on the first half again - the msg_store will be dirty, but - %% we won't need the fsync - ok = msg_store_sync(MsgIds1stHalf, MSCState), %% check they're all in there true = msg_store_contains(true, MsgIds, MSCState), %% publish the latter half twice so we hit the caching and ref count code @@ -1843,25 +1926,8 @@ test_msg_store() -> true = msg_store_contains(true, MsgIds, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen - ok = lists:foldl( - fun (MsgId, ok) -> rabbit_msg_store:sync( - [MsgId], fun () -> Self ! {sync, MsgId} end, - MSCState) - end, ok, MsgIds2ndHalf), - lists:foldl( - fun(MsgId, ok) -> - receive - {sync, MsgId} -> ok - after - 10000 -> - io:format("Sync from msg_store missing (msg_id: ~p)~n", - [MsgId]), - throw(timeout) - end - end, ok, MsgIds2ndHalf), - %% it's very likely we're not dirty here, so the 1st half sync - %% should hit a different code path - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds2ndHalf), + ok = on_disk_stop(Cap), %% read them all MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl index 12c43faf..abcbe0b6 100644 --- a/src/rabbit_tests_event_receiver.erl +++ b/src/rabbit_tests_event_receiver.erl @@ -16,36 +16,43 @@ -module(rabbit_tests_event_receiver). --export([start/1, stop/0]). +-export([start/3, stop/0]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). -start(Pid) -> - gen_event:add_handler(rabbit_event, ?MODULE, [Pid]). +-include("rabbit.hrl"). + +start(Pid, Nodes, Types) -> + Oks = [ok || _ <- Nodes], + {Oks, _} = rpc:multicall(Nodes, gen_event, add_handler, + [rabbit_event, ?MODULE, [Pid, Types]]). stop() -> gen_event:delete_handler(rabbit_event, ?MODULE, []). %%---------------------------------------------------------------------------- -init([Pid]) -> - {ok, Pid}. +init([Pid, Types]) -> + {ok, {Pid, Types}}. -handle_call(_Request, Pid) -> - {ok, not_understood, Pid}. +handle_call(_Request, State) -> + {ok, not_understood, State}. -handle_event(Event, Pid) -> - Pid ! Event, - {ok, Pid}. +handle_event(Event = #event{type = Type}, State = {Pid, Types}) -> + case lists:member(Type, Types) of + true -> Pid ! Event; + false -> ok + end, + {ok, State}. -handle_info(_Info, Pid) -> - {ok, Pid}. +handle_info(_Info, State) -> + {ok, State}. -terminate(_Arg, _Pid) -> +terminate(_Arg, _State) -> ok. -code_change(_OldVsn, Pid, _Extra) -> - {ok, Pid}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 7d36856a..f9632324 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -76,7 +76,7 @@ update_config(Fun) -> {ok, VHosts0} = application:get_env(rabbit, ?TRACE_VHOSTS), VHosts = Fun(VHosts0), application:set_env(rabbit, ?TRACE_VHOSTS, VHosts), - rabbit_channel:refresh_config_all(), + rabbit_channel:refresh_config_local(), ok. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 8d26866b..e0ca8cbb 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -34,6 +34,7 @@ -rabbit_upgrade({ha_mirrors, mnesia, []}). -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). +-rabbit_upgrade({mirrored_supervisor, mnesia, []}). %% ------------------------------------------------------------------- @@ -52,6 +53,7 @@ -spec(ha_mirrors/0 :: () -> 'ok'). -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). +-spec(mirrored_supervisor/0 :: () -> 'ok'). -endif. @@ -170,6 +172,11 @@ exchange_scratch(Table) -> end, [name, type, durable, auto_delete, internal, arguments, scratch]). +mirrored_supervisor() -> + create(mirrored_sup_childspec, + [{record_name, mirrored_sup_childspec}, + {attributes, [key, mirroring_pid, childspec]}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> |