diff options
-rw-r--r-- | Makefile | 11 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 42 | ||||
-rwxr-xr-x | quickcheck | 9 | ||||
-rwxr-xr-x | scripts/rabbitmqctl | 1 | ||||
-rwxr-xr-x | scripts/rabbitmqctl.bat | 1 | ||||
-rw-r--r-- | src/gm.erl | 94 | ||||
-rw-r--r-- | src/gm_qc.erl | 386 | ||||
-rw-r--r-- | src/rabbit.erl | 2 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 1 | ||||
-rw-r--r-- | src/rabbit_control_main.erl | 8 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 31 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 27 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 8 | ||||
-rw-r--r-- | src/rabbit_runtime_parameters.erl | 4 |
16 files changed, 562 insertions, 73 deletions
@@ -20,8 +20,6 @@ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml $(DOCS_DIR)/rabbitmq-echopid.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-plugins.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) -QC_MODULES := rabbit_backing_queue_qc -QC_TRIALS ?= 100 ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) PYTHON=python @@ -56,6 +54,12 @@ endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_macro,$(USE_SPECS),use_specs) $(call boolean_macro,$(USE_PROPER_QC),use_proper_qc) +ifdef INSTRUMENT_FOR_QC +ERLC_OPTS += -DINSTR_MOD=gm_qc +else +ERLC_OPTS += -DINSTR_MOD=gm +endif + include version.mk PLUGINS_SRC_DIR?=$(shell [ -d "plugins-src" ] && echo "plugins-src" || echo ) @@ -217,7 +221,8 @@ run-tests: all echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null run-qc: all - $(foreach MOD,$(QC_MODULES),./quickcheck $(RABBITMQ_NODENAME) $(MOD) $(QC_TRIALS)) + ./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40 + ./quickcheck $(RABBITMQ_NODENAME) gm_qc 1000 200 start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c2dc1f62..328926df 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -458,6 +458,44 @@ </listitem> </varlistentry> <varlistentry> + <term><cmdsynopsis><command>force_boot</command></cmdsynopsis></term> + <listitem> + <para> + Ensure that the node will start next time, even if it + was not the last to shut down. + </para> + <para> + Normally when you shut down a RabbitMQ cluster + altogether, the first node you restart should be the + last one to go down, since it may have seen things + happen that other nodes did not. But sometimes + that's not possible: for instance if the entire cluster + loses power then all nodes may think they were not the + last to shut down. + </para> + <para> + In such a case you can invoke <command>rabbitmqctl + force_boot</command> while the node is down. This will + tell the node to unconditionally start next time you ask + it to. If any changes happened to the cluster after this + node shut down, they will be lost. + </para> + <para> + If the last node to go down is permanently lost then you + should use <command>rabbitmqctl forget_cluster_node + --offline</command> in preference to this command, as it + will ensure that mirrored queues which were mastered on + the lost node get promoted. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl force_boot</screen> + <para role="example"> + This will force the node not to wait for other nodes + next time it is started. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis> </term> <listitem> @@ -1479,6 +1517,10 @@ <term>send_pend</term> <listitem><para>Send queue size.</para></listitem> </varlistentry> + <varlistentry> + <term>connected_at</term> + <listitem><para>Date and time this connection was established, as timestamp.</para></listitem> + </varlistentry> </variablelist> <para> If no <command>connectioninfoitem</command>s are @@ -7,15 +7,18 @@ %% NodeStr is a local broker node name %% ModStr is the module containing quickcheck properties %% TrialsStr is the number of trials -main([NodeStr, ModStr, TrialsStr]) -> +main([NodeStr, ModStr, NumTestsStr, MaxSizeStr]) -> {ok, Hostname} = inet:gethostname(), Node = list_to_atom(NodeStr ++ "@" ++ Hostname), Mod = list_to_atom(ModStr), - Trials = erlang:list_to_integer(TrialsStr), + NumTests = erlang:list_to_integer(NumTestsStr), + MaxSize = erlang:list_to_integer(MaxSizeStr), case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> case rpc:call(Node, proper, module, - [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of + [Mod] ++ [[{numtests, NumTests}, + {max_size, MaxSize}, + {constraint_tries, 200}]]) of [] -> ok; R -> io:format("~p.~n", [R]), quit(1) diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 032d10bf..495b06b3 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -27,6 +27,7 @@ exec ${ERL_DIR}erl \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ + -sname rabbitmqctl$$ \ -boot "${CLEAN_BOOT_FILE}" \ -sasl errlog_type error \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index a3734088..22eabf94 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -62,6 +62,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( !RABBITMQ_CTL_ERL_ARGS! ^
-sasl errlog_type error ^
-mnesia dir \""!RABBITMQ_MNESIA_DIR:\=/!"\" ^
+-sname rabbitmqctl!RANDOM!!TIME:~9! ^
-s rabbit_control_main ^
-nodename !RABBITMQ_NODENAME! ^
-extra !STAR!
@@ -388,6 +388,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). +%% For INSTR_MOD callbacks +-export([call/3, cast/2, monitor/1, demonitor/1]). + -ifndef(use_specs). -export([behaviour_info/1]). -endif. @@ -567,11 +570,6 @@ init([GroupName, Module, Args, TxnFun]) -> txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({confirmed_broadcast, _Msg}, _From, - State = #state { members_state = undefined }) -> - reply(not_joined, State); - handle_call({confirmed_broadcast, Msg}, _From, State = #state { self = Self, right = {Self, undefined}, @@ -587,10 +585,6 @@ handle_call({confirmed_broadcast, Msg}, From, State) -> handle_callback_result({Result, flush_broadcast_buffer( State1 #state { confirms = Confirms1 })}); -handle_call(info, _From, - State = #state { members_state = undefined }) -> - reply(not_joined, State); - handle_call(info, _From, State = #state { group_name = GroupName, module = Module, view = View }) -> @@ -598,10 +592,6 @@ handle_call(info, _From, State = #state { group_name = GroupName, {module, Module}, {group_members, get_pids(alive_view_members(View))}], State); -handle_call({add_on_right, _NewMember}, _From, - State = #state { members_state = undefined }) -> - reply(not_ready, State); - handle_call({add_on_right, NewMember}, _From, State = #state { self = Self, group_name = GroupName, @@ -610,11 +600,10 @@ handle_call({add_on_right, NewMember}, _From, Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), View1 = group_to_view(Group), MembersState1 = remove_erased_members(MembersState, View1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state(MembersState1)}), {Result, State1} = change_view(View1, State #state { members_state = MembersState1 }), - handle_callback_result({Result, {ok, Group}, State1}). + Reply = {ok, Group, prepare_members_state(MembersState1)}, + handle_callback_result({Result, Reply, State1}). handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, @@ -632,10 +621,6 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg, _SizeHint}, - State = #state { members_state = undefined }) -> - noreply(State); - handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, @@ -654,16 +639,17 @@ handle_cast(join, State = #state { self = Self, module = Module, callback_args = Args, txn_executor = TxnFun }) -> - View = join_group(Self, GroupName, TxnFun), - MembersState = - case alive_view_members(View) of - [Self] -> blank_member_state(); - _ -> undefined - end, - State1 = check_neighbours(State #state { view = View, - members_state = MembersState }), - handle_callback_result( - {Module:joined(Args, get_pids(all_known_members(View))), State1}); + State1 = case join_group(Self, GroupName, TxnFun) of + {ok, View} -> + check_neighbours( + State#state{view = View, + members_state = blank_member_state()}); + {ok, View, Left, MembersState} -> + initial_catchup(Left, MembersState, + check_neighbours(State#state{view = View})) + end, + Members = get_pids(all_known_members(State1#state.view)), + handle_callback_result({Module:joined(Args, Members), State1}); handle_cast({validate_members, OldMembers}, State = #state { view = View, @@ -752,21 +738,21 @@ prioritise_info(_, _Len, _State) -> 0. +initial_catchup(Left, MembersStateLeft, + State = #state { self = Self, + left = {Left, _MRefL}, + right = {Right, _MRefR}, + view = View, + members_state = undefined }) -> + ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), + MembersStateLeft1 = build_members_state(MembersStateLeft), + State #state { members_state = MembersStateLeft1 }. + handle_msg(check_neighbours, State) -> %% no-op - it's already been done by the calling handle_cast {ok, State}; handle_msg({catchup, Left, MembersStateLeft}, - State = #state { self = Self, - left = {Left, _MRefL}, - right = {Right, _MRefR}, - view = View, - members_state = undefined }) -> - ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), - MembersStateLeft1 = build_members_state(MembersStateLeft), - {ok, State #state { members_state = MembersStateLeft1 }}; - -handle_msg({catchup, Left, MembersStateLeft}, State = #state { self = Self, left = {Left, _MRefL}, view = View, @@ -1040,11 +1026,11 @@ join_group(Self, GroupName, {error, not_found}, TxnFun) -> join_group(Self, GroupName, prune_or_create_group(Self, GroupName, TxnFun), TxnFun); join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> - group_to_view(Group); + {ok, group_to_view(Group)}; join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> - group_to_view(Group); + {ok, group_to_view(Group)}; false -> case lists:filter(fun is_member_alive/1, Members) of [] -> @@ -1063,8 +1049,11 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> end, try case neighbour_call(Left, {add_on_right, Self}) of - {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName, TxnFun) + {ok, Group1, MembersState1} -> + {ok, group_to_view(Group1), Left, + build_members_state(MembersState1)}; + not_ready -> + join_group(Self, GroupName, TxnFun) end catch exit:{R, _} @@ -1177,8 +1166,8 @@ can_erase_view_member(Self, Self, _LA, _LP) -> false; can_erase_view_member(_Self, _Id, N, N) -> true; can_erase_view_member(_Self, _Id, _LA, _LP) -> false. -neighbour_cast(N, Msg) -> gen_server2:cast(get_pid(N), Msg). -neighbour_call(N, Msg) -> gen_server2:call(get_pid(N), Msg, infinity). +neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). +neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). %% --------------------------------------------------------------------------- %% View monitoring and maintanence @@ -1192,7 +1181,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> - true = erlang:demonitor(MRef), + true = ?INSTR_MOD:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, ok = neighbour_cast(RealNeighbour, Msg), ok = case Neighbour of @@ -1202,7 +1191,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor( Self, Self) -> undefined; -maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). +maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1461,3 +1450,12 @@ last_pub( [], LP) -> LP; last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), true = PubNum > LP, %% ASSERTION PubNum. + +%% --------------------------------------------------------------------------- + +%% Uninstrumented versions + +call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). +cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). +monitor(Pid) -> erlang:monitor(process, Pid). +demonitor(MRef) -> erlang:demonitor(MRef). diff --git a/src/gm_qc.erl b/src/gm_qc.erl new file mode 100644 index 00000000..96ef5035 --- /dev/null +++ b/src/gm_qc.erl @@ -0,0 +1,386 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(gm_qc). +-ifdef(use_proper_qc). + +-include_lib("proper/include/proper.hrl"). + +-define(GROUP, test_group). +-define(MAX_SIZE, 5). +-define(MSG_TIMEOUT, 1000000). %% micros + +-export([prop_gm_test/0]). + +-behaviour(proper_statem). +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-behaviour(gm). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +%% Helpers +-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). + +%% For insertion into gm +-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). + +-record(state, {seq, %% symbolic and dynamic + instrumented, %% dynamic only + outstanding, %% dynamic only + monitors, %% dynamic only + all_join, %% for symbolic + to_join, %% dynamic only + to_leave %% for symbolic + }). + +prop_gm_test() -> + case ?INSTR_MOD of + ?MODULE -> ok; + _ -> exit(compile_with_INSTRUMENT_FOR_QC) + end, + process_flag(trap_exit, true), + erlang:register(?MODULE, self()), + ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). + +gm_test(Cmds) -> + {_H, State, Res} = run_commands(?MODULE, Cmds), + cleanup(State), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +cleanup(S) -> + S2 = ensure_joiners_joined_and_msgs_received(S), + All = gms_joined(S2), + All = gms(S2), %% assertion - none to join + check_stale_members(All), + [gm:leave(GM) || GM <- All], + drain_and_proceed_gms(S2), + [await_death(GM) || GM <- All], + gm:forget_group(?GROUP), + ok. + +check_stale_members(All) -> + GMs = [P || P <- processes(), is_gm_process(?GROUP, P)], + case GMs -- All of + [] -> ok; + Rest -> exit({forgot, Rest}) + end. + +is_gm_process(Group, P) -> + case process_info(P, dictionary) of + undefined -> false; + {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D) + end. + +await_death(P) -> + MRef = erlang:monitor(process, P), + await_death(MRef, P). + +await_death(MRef, P) -> + receive + {'DOWN', MRef, process, P, _} -> ok; + {'DOWN', _, _, _, _} -> await_death(MRef, P); + {'EXIT', _, normal} -> await_death(MRef, P); + {'EXIT', _, Reason} -> exit(Reason); + {joined, _GM} -> await_death(MRef, P); + {left, _GM} -> await_death(MRef, P); + Anything -> exit({stray_msg, Anything}) + end. + +%% --------------------------------------------------------------------------- +%% proper_statem +%% --------------------------------------------------------------------------- + +initial_state() -> #state{seq = 1, + outstanding = dict:new(), + instrumented = dict:new(), + monitors = dict:new(), + all_join = sets:new(), + to_join = sets:new(), + to_leave = sets:new()}. + +command(S) -> + case {length(gms_symb_not_left(S)), length(gms_symb(S))} of + {0, 0} -> qc_join(S); + {0, _} -> frequency([{1, qc_join(S)}, + {3, qc_proceed1(S)}, + {5, qc_proceed2(S)}]); + _ -> frequency([{1, qc_join(S)}, + {1, qc_leave(S)}, + {10, qc_send(S)}, + {5, qc_proceed1(S)}, + {15, qc_proceed2(S)}]) + end. + +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. +qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. +qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. +qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), + oneof(gms_symb(S))]}. + +precondition(S, {call, ?MODULE, do_join, []}) -> + length(gms_symb(S)) < ?MAX_SIZE; + +precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> + GM1 =/= GM2. + +postcondition(S = #state{}, {call, _M, _F, _A}, _Res) -> + true. + +next_state(S = #state{to_join = ToSet, + all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) -> + S#state{to_join = sets:add_element(GM, ToSet), + all_join = sets:add_element(GM, AllSet)}; + +next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) -> + S#state{to_leave = sets:add_element(GM, Set)}; + +next_state(S = #state{seq = Seq, + outstanding = Outstanding}, _Res, + {call, ?MODULE, do_send, [GM]}) -> + case is_pid(GM) andalso lists:member(GM, gms(S)) of + true -> + %% Dynamic state, i.e. runtime + Msg = [{sequence, Seq}, + {sent_to, GM}, + {dests, gms(S)}], + gm:broadcast(GM, Msg), + Outstanding1 = dict:map( + fun (_GM, Set) -> + gb_sets:add_element(Msg, Set) + end, Outstanding), + drain(S#state{seq = Seq + 1, + outstanding = Outstanding1}); + false -> + S + end; + +next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> + proceed(Pid, S); + +next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> + proceed({From, To}, S). + +proceed(K, S = #state{instrumented = Msgs}) -> + case dict:find(K, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + S2 = proceed(K, Thing, S), + S2#state{instrumented = dict:store(K, Q2, Msgs)}; + {empty, _} -> + S + end; + error -> S + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined(Pid, _Members) -> Pid ! {joined, self()}, + ok. +members_changed(_Pid, _Bs, _Ds) -> ok. +handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. +terminate(Pid, _Reason) -> Pid ! {left, self()}. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +do_join() -> + {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), + fun execute_mnesia_transaction/1), + GM. + +do_leave(GM) -> + gm:leave(GM), + GM. + +%% We need to update the state, so do the work in next_state +do_send( _GM) -> ok. +do_proceed1(_Pid) -> ok. +do_proceed2(_From, _To) -> ok. + +%% All GMs, joined and to join +gms(#state{outstanding = Outstanding, + to_join = ToJoin}) -> + dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin). + +%% All GMs, joined and to join +gms_joined(#state{outstanding = Outstanding}) -> + dict:fetch_keys(Outstanding). + +%% All GMs including those that have left (symbolic) +gms_symb(#state{all_join = AllJoin}) -> + sets:to_list(AllJoin). + +%% All GMs not including those that have left (symbolic) +gms_symb_not_left(#state{all_join = AllJoin, + to_leave = ToLeave}) -> + sets:to_list(sets:subtract(AllJoin, ToLeave)). + +drain(S) -> + receive + Msg -> drain(handle_msg(Msg, S)) + after 10 -> S + end. + +drain_and_proceed_gms(S0) -> + S = #state{instrumented = Msgs} = drain(S0), + case dict:size(Msgs) of + 0 -> S; + _ -> S1 = dict:fold( + fun (Key, Q, Si) -> + lists:foldl( + fun (Msg, Sij) -> + proceed(Key, Msg, Sij) + end, Si, queue:to_list(Q)) + end, S, Msgs), + drain_and_proceed_gms(S1#state{instrumented = dict:new()}) + end. + +handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> + case dict:find(GM, Outstanding) of + {ok, Set} -> + Set2 = gb_sets:del_element(Msg, Set), + S#state{outstanding = dict:store(GM, Set2, Outstanding)}; + error -> + %% Message from GM that has already died. OK. + S + end; +handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find(Key, Msgs) of + {ok, Q} -> queue:in(Thing, Q); + error -> queue:from_list([Thing]) + end, + S#state{instrumented = dict:store(Key, Q1, Msgs)}; +handle_msg({joined, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({left, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), + S#state{outstanding = dict:erase(GM, Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> + To = dict:fetch(MRef, Mons), + handle_msg({instrumented, {From, To}, {info, Msg}}, + S#state{monitors = dict:erase(MRef, Mons)}); +handle_msg({'EXIT', _From, normal}, S) -> + S; +handle_msg({'EXIT', _From, Reason}, _S) -> + %% We just trapped exits to get nicer SASL logging. + exit(Reason). + +proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; +proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S; +proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; +proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S); +proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; +proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. + +%% NB From here is To in handle_msg/DOWN above, since the msg is going +%% the other way +add_monitor(From, To, Ref, S = #state{monitors = Mons}) -> + MRef = erlang:monitor(process, To), + From ! {mref, Ref, MRef}, + S#state{monitors = dict:store(MRef, From, Mons)}. + +timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}). + +%% ---------------------------------------------------------------------------- +%% Assertions +%% ---------------------------------------------------------------------------- + +ensure_joiners_joined_and_msgs_received(S0) -> + S = drain_and_proceed_gms(S0), + case outstanding_joiners(S) of + true -> ensure_joiners_joined_and_msgs_received(S); + false -> case outstanding_msgs(S) of + [] -> S; + Out -> exit({outstanding_msgs, Out}) + end + end. + +outstanding_joiners(#state{to_join = ToJoin}) -> + sets:size(ToJoin) > 0. + +outstanding_msgs(#state{outstanding = Outstanding}) -> + dict:fold(fun (GM, Set, OS) -> + case gb_sets:is_empty(Set) of + true -> OS; + false -> [{GM, gb_sets:to_list(Set)} | OS] + end + end, [], Outstanding). + +%% --------------------------------------------------------------------------- +%% For insertion into GM +%% --------------------------------------------------------------------------- + +call(Pid, Msg, infinity) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + gen_server2:call(Pid, Msg, infinity). + +cast(Pid, Msg) -> + whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, + ok. + +monitor(Pid) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, + receive + {mref, Ref, MRef} -> MRef + end. + +demonitor(MRef) -> + whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, + true. + +execute_mnesia_transaction(Fun) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + rabbit_misc:execute_mnesia_transaction(Fun). + +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + +-endif. diff --git a/src/rabbit.erl b/src/rabbit.erl index 4b7a9a1f..a4c460ae 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -24,7 +24,7 @@ start_fhc/0]). -export([start/2, stop/1]). -export([start_apps/1, stop_apps/1]). --export([log_location/1]). %% for testing +-export([log_location/1, config_files/0]). %% for testing and mgmt-agent %%--------------------------------------------------------------------------- %% Boot steps. diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 826bfc45..c5237d34 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -118,6 +118,7 @@ node_down(Node, _State) -> handle_msg({request_start, Node}, not_healing, Partitions) -> rabbit_log:info("Autoheal request received from ~p~n", [Node]), + rabbit_node_monitor:ping_all(), case rabbit_node_monitor:all_rabbit_nodes_up() of false -> not_healing; true -> AllPartitions = all_partitions(Partitions), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index c7f94f58..8bc09426 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -54,6 +54,7 @@ change_cluster_node_type, update_cluster_nodes, {forget_cluster_node, [?OFFLINE_DEF]}, + force_boot, cluster_status, {sync_queue, [?VHOST_DEF]}, {cancel_sync_queue, [?VHOST_DEF]}, @@ -310,6 +311,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> [ClusterNode, false]) end; +action(force_boot, _Node, [], _Opts, Inform) -> + Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]), + case rabbit:is_running(Node) of + false -> rabbit_mnesia:force_load_next_boot(); + true -> {error, rabbit_running} + end; + action(sync_queue, Node, [Q], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index fd4b7b11..180993a5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -71,6 +71,7 @@ -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). +-export([now_to_ms/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -255,6 +256,9 @@ -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') -> float()). +-spec(now_to_ms/1 :: ({non_neg_integer(), + non_neg_integer(), + non_neg_integer()}) -> pos_integer()). -endif. %%---------------------------------------------------------------------------- @@ -1021,6 +1025,9 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. +now_to_ms({Mega, Sec, Micro}) -> + (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000. + check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c6c2c8eb..630d9853 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -23,6 +23,7 @@ update_cluster_nodes/1, change_cluster_node_type/1, forget_cluster_node/2, + force_load_next_boot/0, status/0, is_clustered/0, @@ -63,6 +64,7 @@ -spec(update_cluster_nodes/1 :: (node()) -> 'ok'). -spec(change_cluster_node_type/1 :: (node_type()) -> 'ok'). -spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok'). +-spec(force_load_next_boot/0 :: () -> 'ok'). %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | @@ -303,7 +305,6 @@ remove_node_offline_node(Node) -> e(removing_node_from_offline_node) end. - %%---------------------------------------------------------------------------- %% Queries %%---------------------------------------------------------------------------- diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 96448f32..c8d76719 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -16,8 +16,8 @@ -module(rabbit_networking). --export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2, - stop_tcp_listener/1, on_node_down/1, active_listeners/0, +-export([boot/0, start/0, killall/0, start_tcp_listener/1, start_ssl_listener/2, + on_node_down/1, active_listeners/0, node_listeners/1, register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, @@ -58,10 +58,10 @@ -type(label() :: string()). -spec(start/0 :: () -> 'ok'). +-spec(killall/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(start_ssl_listener/2 :: (listener_config(), rabbit_types:infos()) -> 'ok'). --spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok'). -spec(active_listeners/0 :: () -> [rabbit_types:listener()]). -spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]). -spec(register_connection/1 :: (pid()) -> ok). @@ -143,6 +143,25 @@ start() -> rabbit_sup:start_supervisor_child( [{local, rabbit_tcp_client_sup}, {rabbit_connection_sup,start_link,[]}]). +%% We are going to stop for pause-minority, so we are already +%% compromised; anything we confirm from now on is not going to be +%% remembered after we come back. Since rabbit:stop/0 may take a while +%% to gracefully shut down, we should stop talking to the outside +%% world *immediately*. +killall() -> + %% Stop ASAP + kill_connections(), + {ok, TCPListeners} = application:get_env(rabbit, tcp_listeners), + {ok, SSLListeners} = application:get_env(rabbit, ssl_listeners), + [stop_listener(L) || L <- TCPListeners ++ SSLListeners], + %% In case anything reconnected while we were stopping listeners + kill_connections(), + ok. + +kill_connections() -> + Conns = connections_local() ++ rabbit_direct:list_local(), + [exit(P, kill) || P <- Conns]. + ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), @@ -248,12 +267,12 @@ start_listener0(Address, Protocol, Label, OnConnect) -> {rabbit_misc:ntoa(IPAddress), Port}}) end. -stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Address) || +stop_listener(Listener) -> + [stop_listener0(Address) || Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family}) -> +stop_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 1c971c1d..72acc905 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -31,7 +31,7 @@ code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1]). +-export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -63,6 +63,7 @@ -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). +-spec(ping_all/0 :: () -> 'ok'). -endif. @@ -301,12 +302,11 @@ handle_info(ping_nodes, State) -> %% to ping the nodes that are up, after all. State1 = State#state{down_ping_timer = undefined}, Self = self(), - %% all_nodes_up() both pings all the nodes and tells us if we need to again. - %% %% We ping in a separate process since in a partition it might %% take some noticeable length of time and we don't want to block %% the node monitor for that long. spawn_link(fun () -> + ping_all(), case all_nodes_up() of true -> ok; false -> Self ! ping_again @@ -361,10 +361,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> await_cluster_recovery() -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), - Nodes = rabbit_mnesia:cluster_nodes(all), run_outside_applications(fun () -> + rabbit_networking:killall(), rabbit:stop(), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end), ok. @@ -381,11 +381,12 @@ run_outside_applications(Fun) -> end end). -wait_for_cluster_recovery(Nodes) -> +wait_for_cluster_recovery() -> + ping_all(), case majority() of true -> rabbit:start(); false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), - wait_for_cluster_recovery(Nodes) + wait_for_cluster_recovery() end. handle_dead_rabbit(Node, State = #state{partitions = Partitions, @@ -454,6 +455,11 @@ del_node(Node, Nodes) -> Nodes -- [Node]. %% functions here. "rabbit" in a function's name implies we test if %% the rabbit application is up, not just the node. +%% As we use these functions to decide what to do in pause_minority +%% state, they *must* be fast, even in the case where TCP connections +%% are timing out. So that means we should be careful about whether we +%% connect to nodes which are currently disconnected. + majority() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) / length(Nodes) > 0.5. @@ -466,9 +472,14 @@ all_rabbit_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). -alive_nodes(Nodes) -> [N || N <- Nodes, pong =:= net_adm:ping(N)]. +alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). alive_rabbit_nodes(Nodes) -> [N || N <- alive_nodes(Nodes), rabbit:is_running(N)]. + +%% This one is allowed to connect! +ping_all() -> + [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], + ok. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index db6d1eb0..9db607f9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -42,7 +42,7 @@ -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, client_properties, capabilities, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, connected_at}). -record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). @@ -54,7 +54,7 @@ peer_host, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, auth_mechanism, ssl_protocol, ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, - timeout, frame_max, channel_max, client_properties]). + timeout, frame_max, channel_max, client_properties, connected_at]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -237,7 +237,8 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> client_properties = none, capabilities = [], auth_mechanism = none, - auth_state = none}, + auth_state = none, + connected_at = rabbit_misc:now_to_ms(os:timestamp())}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -1143,6 +1144,7 @@ ic(channel_max, #connection{channel_max = ChMax}) -> ChMax; ic(client_properties, #connection{client_properties = CP}) -> CP; ic(auth_mechanism, #connection{auth_mechanism = none}) -> none; ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name; +ic(connected_at, #connection{connected_at = T}) -> T; ic(Item, #connection{}) -> throw({bad_argument, Item}). socket_info(Get, Select, #v1{sock = Sock}) -> diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index cf125913..f78549ff 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -82,6 +82,8 @@ set(VHost, Component, Name, Term, User) -> set_global(Name, Term) -> mnesia_update(Name, Term), + event_notify(parameter_set, none, global, [{name, Name}, + {value, Term}]), ok. format_error(L) -> @@ -164,6 +166,8 @@ mnesia_clear(VHost, Component, Name) -> event_notify(_Event, _VHost, <<"policy">>, _Props) -> ok; +event_notify(Event, none, Component, Props) -> + rabbit_event:notify(Event, [{component, Component} | Props]); event_notify(Event, VHost, Component, Props) -> rabbit_event:notify(Event, [{vhost, VHost}, {component, Component} | Props]). |