summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile11
-rw-r--r--docs/rabbitmqctl.1.xml42
-rwxr-xr-xquickcheck9
-rwxr-xr-xscripts/rabbitmqctl1
-rwxr-xr-xscripts/rabbitmqctl.bat1
-rw-r--r--src/gm.erl94
-rw-r--r--src/gm_qc.erl386
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_autoheal.erl1
-rw-r--r--src/rabbit_control_main.erl8
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_networking.erl31
-rw-r--r--src/rabbit_node_monitor.erl27
-rw-r--r--src/rabbit_reader.erl8
-rw-r--r--src/rabbit_runtime_parameters.erl4
16 files changed, 562 insertions, 73 deletions
diff --git a/Makefile b/Makefile
index c54b44e5..0dc8661d 100644
--- a/Makefile
+++ b/Makefile
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml
USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML)))
-QC_MODULES := rabbit_backing_queue_qc
-QC_TRIALS ?= 100
ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes)
PYTHON=python
@@ -56,6 +54,12 @@ endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc)
+ifdef INSTRUMENT_FOR_QC
+ERLC_OPTS += -DINSTR_MOD=gm_qc
+else
+ERLC_OPTS += -DINSTR_MOD=gm
+endif
+
include version.mk
PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo )
@@ -217,7 +221,8 @@ run-tests: all
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
run-qc: all
- $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS))
+ ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40
+ ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200
start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c2dc1f62..328926df 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -458,6 +458,44 @@
</listitem>
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Ensure that the node will start next time, even if it
+ was not the last to shut down.
+ </para>
+ <para>
+ Normally when you shut down a RabbitMQ cluster
+ altogether, the first node you restart should be the
+ last one to go down, since it may have seen things
+ happen that other nodes did not. But sometimes
+ that's not possible: for instance if the entire cluster
+ loses power then all nodes may think they were not the
+ last to shut down.
+ </para>
+ <para>
+ In such a case you can invoke <command>rabbitmqctl
+ force_boot</command> while the node is down. This will
+ tell the node to unconditionally start next time you ask
+ it to. If any changes happened to the cluster after this
+ node shut down, they will be lost.
+ </para>
+ <para>
+ If the last node to go down is permanently lost then you
+ should use <command>rabbitmqctl forget_cluster_node
+ --offline</command> in preference to this command, as it
+ will ensure that mirrored queues which were mastered on
+ the lost node get promoted.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl force_boot</screen>
+ <para role="example">
+ This will force the node not to wait for other nodes
+ next time it is started.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
</term>
<listitem>
@@ -1479,6 +1517,10 @@
<term>send_pend</term>
<listitem><para>Send queue size.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>connected_at</term>
+ <listitem><para>Date and time this connection was established, as timestamp.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>connectioninfoitem</command>s are
diff --git a/quickcheck b/quickcheck
index 40f13091..59da3719 100755
--- a/quickcheck
+++ b/quickcheck
@@ -7,15 +7,18 @@
%% NodeStr is a local broker node name
%% ModStr is the module containing quickcheck properties
%% TrialsStr is the number of trials
-main([NodeStr, ModStr, TrialsStr]) ->
+main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) ->
{ok, Hostname} = inet:gethostname(),
Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
Mod = list_to_atom(ModStr),
- Trials = erlang:list_to_integer(TrialsStr),
+ NumTests = erlang:list_to_integer(NumTestsStr),
+ MaxSize = erlang:list_to_integer(MaxSizeStr),
case rpc:call(Node, code, ensure_loaded, [proper]) of
{module, proper} ->
case rpc:call(Node, proper, module,
- [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
+ [Mod] ++ [[{numtests, NumTests},
+ {max_size, MaxSize},
+ {constraint_tries, 200}]]) of
[] -> ok;
R -> io:format("~p.~n", [R]),
quit(1)
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 032d10bf..495b06b3 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -27,6 +27,7 @@ exec ${ERL_DIR}erl \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
+ -sname rabbitmqctl$$ \
-boot "${CLEAN_BOOT_FILE}" \
-sasl errlog_type error \
-mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index a3734088..22eabf94 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -62,6 +62,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
!RABBITMQ_CTL_ERL_ARGS! ^
-sasl errlog_type error ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-sname rabbitmqctl!RANDOM!!TIME:~9! ^
-s rabbit_control_main ^
-nodename !RABBITMQ_NODENAME! ^
-extra !STAR!
diff --git a/src/gm.erl b/src/gm.erl
index 2235da33..696b7fa3 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -388,6 +388,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
+%% For INSTR_MOD callbacks
+-export([call/3, cast/2, monitor/1, demonitor/1]).
+
-ifndef(use_specs).
-export([behaviour_info/1]).
-endif.
@@ -567,11 +570,6 @@ init([GroupName, Module, Args, TxnFun]) ->
txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({confirmed_broadcast, _Msg}, _From,
- State = #state { members_state = undefined }) ->
- reply(not_joined, State);
-
handle_call({confirmed_broadcast, Msg}, _From,
State = #state { self = Self,
right = {Self, undefined},
@@ -587,10 +585,6 @@ handle_call({confirmed_broadcast, Msg}, From, State) ->
handle_callback_result({Result, flush_broadcast_buffer(
State1 #state { confirms = Confirms1 })});
-handle_call(info, _From,
- State = #state { members_state = undefined }) ->
- reply(not_joined, State);
-
handle_call(info, _From, State = #state { group_name = GroupName,
module = Module,
view = View }) ->
@@ -598,10 +592,6 @@ handle_call(info, _From, State = #state { group_name = GroupName,
{module, Module},
{group_members, get_pids(alive_view_members(View))}], State);
-handle_call({add_on_right, _NewMember}, _From,
- State = #state { members_state = undefined }) ->
- reply(not_ready, State);
-
handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
@@ -610,11 +600,10 @@ handle_call({add_on_right, NewMember}, _From,
Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
View1 = group_to_view(Group),
MembersState1 = remove_erased_members(MembersState, View1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(MembersState1)}),
{Result, State1} = change_view(View1, State #state {
members_state = MembersState1 }),
- handle_callback_result({Result, {ok, Group}, State1}).
+ Reply = {ok, Group, prepare_members_state(MembersState1)},
+ handle_callback_result({Result, Reply, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
@@ -632,10 +621,6 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg, _SizeHint},
- State = #state { members_state = undefined }) ->
- noreply(State);
-
handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
@@ -654,16 +639,17 @@ handle_cast(join, State = #state { self = Self,
module = Module,
callback_args = Args,
txn_executor = TxnFun }) ->
- View = join_group(Self, GroupName, TxnFun),
- MembersState =
- case alive_view_members(View) of
- [Self] -> blank_member_state();
- _ -> undefined
- end,
- State1 = check_neighbours(State #state { view = View,
- members_state = MembersState }),
- handle_callback_result(
- {Module:joined(Args, get_pids(all_known_members(View))), State1});
+ State1 = case join_group(Self, GroupName, TxnFun) of
+ {ok, View} ->
+ check_neighbours(
+ State#state{view = View,
+ members_state = blank_member_state()});
+ {ok, View, Left, MembersState} ->
+ initial_catchup(Left, MembersState,
+ check_neighbours(State#state{view = View}))
+ end,
+ Members = get_pids(all_known_members(State1#state.view)),
+ handle_callback_result({Module:joined(Args, Members), State1});
handle_cast({validate_members, OldMembers},
State = #state { view = View,
@@ -752,21 +738,21 @@ prioritise_info(_, _Len, _State) ->
0.
+initial_catchup(Left, MembersStateLeft,
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ right = {Right, _MRefR},
+ view = View,
+ members_state = undefined }) ->
+ ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ State #state { members_state = MembersStateLeft1 }.
+
handle_msg(check_neighbours, State) ->
%% no-op - it's already been done by the calling handle_cast
{ok, State};
handle_msg({catchup, Left, MembersStateLeft},
- State = #state { self = Self,
- left = {Left, _MRefL},
- right = {Right, _MRefR},
- view = View,
- members_state = undefined }) ->
- ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
- MembersStateLeft1 = build_members_state(MembersStateLeft),
- {ok, State #state { members_state = MembersStateLeft1 }};
-
-handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
view = View,
@@ -1040,11 +1026,11 @@ join_group(Self, GroupName, {error, not_found}, TxnFun) ->
join_group(Self, GroupName,
prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
- group_to_view(Group);
+ {ok, group_to_view(Group)};
join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
- group_to_view(Group);
+ {ok, group_to_view(Group)};
false ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
@@ -1063,8 +1049,11 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
end,
try
case neighbour_call(Left, {add_on_right, Self}) of
- {ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName, TxnFun)
+ {ok, Group1, MembersState1} ->
+ {ok, group_to_view(Group1), Left,
+ build_members_state(MembersState1)};
+ not_ready ->
+ join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1177,8 +1166,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false;
can_erase_view_member(_Self, _Id, N, N) -> true;
can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
-neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg).
-neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity).
+neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
+neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
%% ---------------------------------------------------------------------------
%% View monitoring and maintanence
@@ -1192,7 +1181,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
- true = erlang:demonitor(MRef),
+ true = ?INSTR_MOD:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
ok = neighbour_cast(RealNeighbour, Msg),
ok = case Neighbour of
@@ -1202,7 +1191,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor( Self, Self) -> undefined;
-maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
+maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1461,3 +1450,12 @@ last_pub( [], LP) -> LP;
last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
true = PubNum > LP, %% ASSERTION
PubNum.
+
+%% ---------------------------------------------------------------------------
+
+%% Uninstrumented versions
+
+call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
+cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
+monitor(Pid) -> erlang:monitor(process, Pid).
+demonitor(MRef) -> erlang:demonitor(MRef).
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
new file mode 100644
index 00000000..96ef5035
--- /dev/null
+++ b/src/gm_qc.erl
@@ -0,0 +1,386 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(gm_qc).
+-ifdef(use_proper_qc).
+
+-include_lib("proper/include/proper.hrl").
+
+-define(GROUP, test_group).
+-define(MAX_SIZE, 5).
+-define(MSG_TIMEOUT, 1000000). %% micros
+
+-export([prop_gm_test/0]).
+
+-behaviour(proper_statem).
+-export([initial_state/0, command/1, precondition/2, postcondition/3,
+ next_state/3]).
+
+-behaviour(gm).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+%% Helpers
+-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]).
+
+%% For insertion into gm
+-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]).
+
+-record(state, {seq, %% symbolic and dynamic
+ instrumented, %% dynamic only
+ outstanding, %% dynamic only
+ monitors, %% dynamic only
+ all_join, %% for symbolic
+ to_join, %% dynamic only
+ to_leave %% for symbolic
+ }).
+
+prop_gm_test() ->
+ case ?INSTR_MOD of
+ ?MODULE -> ok;
+ _ -> exit(compile_with_INSTRUMENT_FOR_QC)
+ end,
+ process_flag(trap_exit, true),
+ erlang:register(?MODULE, self()),
+ ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)).
+
+gm_test(Cmds) ->
+ {_H, State, Res} = run_commands(?MODULE, Cmds),
+ cleanup(State),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
+
+cleanup(S) ->
+ S2 = ensure_joiners_joined_and_msgs_received(S),
+ All = gms_joined(S2),
+ All = gms(S2), %% assertion - none to join
+ check_stale_members(All),
+ [gm:leave(GM) || GM <- All],
+ drain_and_proceed_gms(S2),
+ [await_death(GM) || GM <- All],
+ gm:forget_group(?GROUP),
+ ok.
+
+check_stale_members(All) ->
+ GMs = [P || P <- processes(), is_gm_process(?GROUP, P)],
+ case GMs -- All of
+ [] -> ok;
+ Rest -> exit({forgot, Rest})
+ end.
+
+is_gm_process(Group, P) ->
+ case process_info(P, dictionary) of
+ undefined -> false;
+ {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D)
+ end.
+
+await_death(P) ->
+ MRef = erlang:monitor(process, P),
+ await_death(MRef, P).
+
+await_death(MRef, P) ->
+ receive
+ {'DOWN', MRef, process, P, _} -> ok;
+ {'DOWN', _, _, _, _} -> await_death(MRef, P);
+ {'EXIT', _, normal} -> await_death(MRef, P);
+ {'EXIT', _, Reason} -> exit(Reason);
+ {joined, _GM} -> await_death(MRef, P);
+ {left, _GM} -> await_death(MRef, P);
+ Anything -> exit({stray_msg, Anything})
+ end.
+
+%% ---------------------------------------------------------------------------
+%% proper_statem
+%% ---------------------------------------------------------------------------
+
+initial_state() -> #state{seq = 1,
+ outstanding = dict:new(),
+ instrumented = dict:new(),
+ monitors = dict:new(),
+ all_join = sets:new(),
+ to_join = sets:new(),
+ to_leave = sets:new()}.
+
+command(S) ->
+ case {length(gms_symb_not_left(S)), length(gms_symb(S))} of
+ {0, 0} -> qc_join(S);
+ {0, _} -> frequency([{1, qc_join(S)},
+ {3, qc_proceed1(S)},
+ {5, qc_proceed2(S)}]);
+ _ -> frequency([{1, qc_join(S)},
+ {1, qc_leave(S)},
+ {10, qc_send(S)},
+ {5, qc_proceed1(S)},
+ {15, qc_proceed2(S)}])
+ end.
+
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}.
+qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}.
+qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}.
+qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)),
+ oneof(gms_symb(S))]}.
+
+precondition(S, {call, ?MODULE, do_join, []}) ->
+ length(gms_symb(S)) < ?MAX_SIZE;
+
+precondition(_S, {call, ?MODULE, do_leave, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_send, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) ->
+ GM1 =/= GM2.
+
+postcondition(S = #state{}, {call, _M, _F, _A}, _Res) ->
+ true.
+
+next_state(S = #state{to_join = ToSet,
+ all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) ->
+ S#state{to_join = sets:add_element(GM, ToSet),
+ all_join = sets:add_element(GM, AllSet)};
+
+next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) ->
+ S#state{to_leave = sets:add_element(GM, Set)};
+
+next_state(S = #state{seq = Seq,
+ outstanding = Outstanding}, _Res,
+ {call, ?MODULE, do_send, [GM]}) ->
+ case is_pid(GM) andalso lists:member(GM, gms(S)) of
+ true ->
+ %% Dynamic state, i.e. runtime
+ Msg = [{sequence, Seq},
+ {sent_to, GM},
+ {dests, gms(S)}],
+ gm:broadcast(GM, Msg),
+ Outstanding1 = dict:map(
+ fun (_GM, Set) ->
+ gb_sets:add_element(Msg, Set)
+ end, Outstanding),
+ drain(S#state{seq = Seq + 1,
+ outstanding = Outstanding1});
+ false ->
+ S
+ end;
+
+next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) ->
+ proceed(Pid, S);
+
+next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) ->
+ proceed({From, To}, S).
+
+proceed(K, S = #state{instrumented = Msgs}) ->
+ case dict:find(K, Msgs) of
+ {ok, Q} -> case queue:out(Q) of
+ {{value, Thing}, Q2} ->
+ S2 = proceed(K, Thing, S),
+ S2#state{instrumented = dict:store(K, Q2, Msgs)};
+ {empty, _} ->
+ S
+ end;
+ error -> S
+ end.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined(Pid, _Members) -> Pid ! {joined, self()},
+ ok.
+members_changed(_Pid, _Bs, _Ds) -> ok.
+handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok.
+terminate(Pid, _Reason) -> Pid ! {left, self()}.
+
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+do_join() ->
+ {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
+ fun execute_mnesia_transaction/1),
+ GM.
+
+do_leave(GM) ->
+ gm:leave(GM),
+ GM.
+
+%% We need to update the state, so do the work in next_state
+do_send( _GM) -> ok.
+do_proceed1(_Pid) -> ok.
+do_proceed2(_From, _To) -> ok.
+
+%% All GMs, joined and to join
+gms(#state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin).
+
+%% All GMs, joined and to join
+gms_joined(#state{outstanding = Outstanding}) ->
+ dict:fetch_keys(Outstanding).
+
+%% All GMs including those that have left (symbolic)
+gms_symb(#state{all_join = AllJoin}) ->
+ sets:to_list(AllJoin).
+
+%% All GMs not including those that have left (symbolic)
+gms_symb_not_left(#state{all_join = AllJoin,
+ to_leave = ToLeave}) ->
+ sets:to_list(sets:subtract(AllJoin, ToLeave)).
+
+drain(S) ->
+ receive
+ Msg -> drain(handle_msg(Msg, S))
+ after 10 -> S
+ end.
+
+drain_and_proceed_gms(S0) ->
+ S = #state{instrumented = Msgs} = drain(S0),
+ case dict:size(Msgs) of
+ 0 -> S;
+ _ -> S1 = dict:fold(
+ fun (Key, Q, Si) ->
+ lists:foldl(
+ fun (Msg, Sij) ->
+ proceed(Key, Msg, Sij)
+ end, Si, queue:to_list(Q))
+ end, S, Msgs),
+ drain_and_proceed_gms(S1#state{instrumented = dict:new()})
+ end.
+
+handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
+ case dict:find(GM, Outstanding) of
+ {ok, Set} ->
+ Set2 = gb_sets:del_element(Msg, Set),
+ S#state{outstanding = dict:store(GM, Set2, Outstanding)};
+ error ->
+ %% Message from GM that has already died. OK.
+ S
+ end;
+handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) ->
+ Q1 = case dict:find(Key, Msgs) of
+ {ok, Q} -> queue:in(Thing, Q);
+ error -> queue:from_list([Thing])
+ end,
+ S#state{instrumented = dict:store(Key, Q1, Msgs)};
+handle_msg({joined, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({left, GM}, S = #state{outstanding = Outstanding,
+ to_join = ToJoin}) ->
+ true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin),
+ S#state{outstanding = dict:erase(GM, Outstanding),
+ to_join = sets:del_element(GM, ToJoin)};
+handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) ->
+ To = dict:fetch(MRef, Mons),
+ handle_msg({instrumented, {From, To}, {info, Msg}},
+ S#state{monitors = dict:erase(MRef, Mons)});
+handle_msg({'EXIT', _From, normal}, S) ->
+ S;
+handle_msg({'EXIT', _From, Reason}, _S) ->
+ %% We just trapped exits to get nicer SASL logging.
+ exit(Reason).
+
+proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S;
+proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S;
+proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S;
+proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S);
+proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S;
+proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S.
+
+%% NB From here is To in handle_msg/DOWN above, since the msg is going
+%% the other way
+add_monitor(From, To, Ref, S = #state{monitors = Mons}) ->
+ MRef = erlang:monitor(process, To),
+ From ! {mref, Ref, MRef},
+ S#state{monitors = dict:store(MRef, From, Mons)}.
+
+timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}).
+
+%% ----------------------------------------------------------------------------
+%% Assertions
+%% ----------------------------------------------------------------------------
+
+ensure_joiners_joined_and_msgs_received(S0) ->
+ S = drain_and_proceed_gms(S0),
+ case outstanding_joiners(S) of
+ true -> ensure_joiners_joined_and_msgs_received(S);
+ false -> case outstanding_msgs(S) of
+ [] -> S;
+ Out -> exit({outstanding_msgs, Out})
+ end
+ end.
+
+outstanding_joiners(#state{to_join = ToJoin}) ->
+ sets:size(ToJoin) > 0.
+
+outstanding_msgs(#state{outstanding = Outstanding}) ->
+ dict:fold(fun (GM, Set, OS) ->
+ case gb_sets:is_empty(Set) of
+ true -> OS;
+ false -> [{GM, gb_sets:to_list(Set)} | OS]
+ end
+ end, [], Outstanding).
+
+%% ---------------------------------------------------------------------------
+%% For insertion into GM
+%% ---------------------------------------------------------------------------
+
+call(Pid, Msg, infinity) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ gen_server2:call(Pid, Msg, infinity).
+
+cast(Pid, Msg) ->
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}},
+ ok.
+
+monitor(Pid) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}},
+ receive
+ {mref, Ref, MRef} -> MRef
+ end.
+
+demonitor(MRef) ->
+ whereis(?MODULE) ! {instrumented, self(), {demon, MRef}},
+ true.
+
+execute_mnesia_transaction(Fun) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, self(), {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ rabbit_misc:execute_mnesia_transaction(Fun).
+
+-else.
+
+-export([prop_disabled/0]).
+
+prop_disabled() ->
+ exit({compiled_without_proper,
+ "PropEr was not present during compilation of the test module. "
+ "Hence all tests are disabled."}).
+
+-endif.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4b7a9a1f..a4c460ae 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -24,7 +24,7 @@
start_fhc/0]).
-export([start/2, stop/1]).
-export([start_apps/1, stop_apps/1]).
--export([log_location/1]). %% for testing
+-export([log_location/1, config_files/0]). %% for testing and mgmt-agent
%%---------------------------------------------------------------------------
%% Boot steps.
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 826bfc45..c5237d34 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -118,6 +118,7 @@ node_down(Node, _State) ->
handle_msg({request_start, Node},
not_healing, Partitions) ->
rabbit_log:info("Autoheal request received from ~p~n", [Node]),
+ rabbit_node_monitor:ping_all(),
case rabbit_node_monitor:all_rabbit_nodes_up() of
false -> not_healing;
true -> AllPartitions = all_partitions(Partitions),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index c7f94f58..8bc09426 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -54,6 +54,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
{cancel_sync_queue, [?VHOST_DEF]},
@@ -310,6 +311,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
[ClusterNode, false])
end;
+action(force_boot, _Node, [], _Opts, Inform) ->
+ Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
+ case rabbit:is_running(Node) of
+ false -> rabbit_mnesia:force_load_next_boot();
+ true -> {error, rabbit_running}
+ end;
+
action(sync_queue, Node, [Q], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index fd4b7b11..180993a5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -71,6 +71,7 @@
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
+-export([now_to_ms/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -255,6 +256,9 @@
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
-> float()).
+-spec(now_to_ms/1 :: ({non_neg_integer(),
+ non_neg_integer(),
+ non_neg_integer()}) -> pos_integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -1021,6 +1025,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+now_to_ms({Mega, Sec, Micro}) ->
+ (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000.
+
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c6c2c8eb..630d9853 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -23,6 +23,7 @@
update_cluster_nodes/1,
change_cluster_node_type/1,
forget_cluster_node/2,
+ force_load_next_boot/0,
status/0,
is_clustered/0,
@@ -63,6 +64,7 @@
-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+-spec(force_load_next_boot/0 :: () -> 'ok').
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
@@ -303,7 +305,6 @@ remove_node_offline_node(Node) ->
e(removing_node_from_offline_node)
end.
-
%%----------------------------------------------------------------------------
%% Queries
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 96448f32..c8d76719 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,8 +16,8 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
- stop_tcp_listener/1, on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2,
+ on_node_down/1, active_listeners/0,
node_listeners/1, register_connection/1, unregister_connection/1,
connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
@@ -58,10 +58,10 @@
-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
+-spec(killall/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(start_ssl_listener/2 ::
(listener_config(), rabbit_types:infos()) -> 'ok').
--spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(register_connection/1 :: (pid()) -> ok).
@@ -143,6 +143,25 @@ start() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
+%% We are going to stop for pause-minority, so we are already
+%% compromised; anything we confirm from now on is not going to be
+%% remembered after we come back. Since rabbit:stop/0 may take a while
+%% to gracefully shut down, we should stop talking to the outside
+%% world *immediately*.
+killall() ->
+ %% Stop ASAP
+ kill_connections(),
+ {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners),
+ {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners),
+ [stop_listener(L) || L <- TCPListeners ++ SSLListeners],
+ %% In case anything reconnected while we were stopping listeners
+ kill_connections(),
+ ok.
+
+kill_connections() ->
+ Conns = connections_local() ++ rabbit_direct:list_local(),
+ [exit(P, kill) || P <- Conns].
+
ensure_ssl() ->
{ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps),
ok = app_utils:start_applications(SslAppsConfig),
@@ -248,12 +267,12 @@ start_listener0(Address, Protocol, Label, OnConnect) ->
{rabbit_misc:ntoa(IPAddress), Port}})
end.
-stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Address) ||
+stop_listener(Listener) ->
+ [stop_listener0(Address) ||
Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family}) ->
+stop_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 1c971c1d..72acc905 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -31,7 +31,7 @@
code_change/3]).
%% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/1]).
+-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -63,6 +63,7 @@
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
+-spec(ping_all/0 :: () -> 'ok').
-endif.
@@ -301,12 +302,11 @@ handle_info(ping_nodes, State) ->
%% to ping the nodes that are up, after all.
State1 = State#state{down_ping_timer = undefined},
Self = self(),
- %% all_nodes_up() both pings all the nodes and tells us if we need to again.
- %%
%% We ping in a separate process since in a partition it might
%% take some noticeable length of time and we don't want to block
%% the node monitor for that long.
spawn_link(fun () ->
+ ping_all(),
case all_nodes_up() of
true -> ok;
false -> Self ! ping_again
@@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
await_cluster_recovery() ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
- Nodes = rabbit_mnesia:cluster_nodes(all),
run_outside_applications(fun () ->
+ rabbit_networking:killall(),
rabbit:stop(),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end),
ok.
@@ -381,11 +381,12 @@ run_outside_applications(Fun) ->
end
end).
-wait_for_cluster_recovery(Nodes) ->
+wait_for_cluster_recovery() ->
+ ping_all(),
case majority() of
true -> rabbit:start();
false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
- wait_for_cluster_recovery(Nodes)
+ wait_for_cluster_recovery()
end.
handle_dead_rabbit(Node, State = #state{partitions = Partitions,
@@ -454,6 +455,11 @@ del_node(Node, Nodes) -> Nodes -- [Node].
%% functions here. "rabbit" in a function's name implies we test if
%% the rabbit application is up, not just the node.
+%% As we use these functions to decide what to do in pause_minority
+%% state, they *must* be fast, even in the case where TCP connections
+%% are timing out. So that means we should be careful about whether we
+%% connect to nodes which are currently disconnected.
+
majority() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
@@ -466,9 +472,14 @@ all_rabbit_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
-alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)].
+alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_rabbit_nodes(Nodes) ->
[N || N <- alive_nodes(Nodes), rabbit:is_running(N)].
+
+%% This one is allowed to connect!
+ping_all() ->
+ [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
+ ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index db6d1eb0..9db607f9 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -42,7 +42,7 @@
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, connected_at}).
-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
@@ -54,7 +54,7 @@
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, channel_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties, connected_at]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -237,7 +237,8 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
client_properties = none,
capabilities = [],
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -1143,6 +1144,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
+ic(connected_at, #connection{connected_at = T}) -> T;
ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index cf125913..f78549ff 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -82,6 +82,8 @@ set(VHost, Component, Name, Term, User) ->
set_global(Name, Term) ->
mnesia_update(Name, Term),
+ event_notify(parameter_set, none, global, [{name, Name},
+ {value, Term}]),
ok.
format_error(L) ->
@@ -164,6 +166,8 @@ mnesia_clear(VHost, Component, Name) ->
event_notify(_Event, _VHost, <<"policy">>, _Props) ->
ok;
+event_notify(Event, none, Component, Props) ->
+ rabbit_event:notify(Event, [{component, Component} | Props]);
event_notify(Event, VHost, Component, Props) ->
rabbit_event:notify(Event, [{vhost, VHost},
{component, Component} | Props]).