diff options
Diffstat (limited to 'src/gm_qc.erl')
-rw-r--r-- | src/gm_qc.erl | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/src/gm_qc.erl b/src/gm_qc.erl new file mode 100644 index 00000000..394cbcbd --- /dev/null +++ b/src/gm_qc.erl @@ -0,0 +1,384 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved. +%% + +-module(gm_qc). +-ifdef(use_proper_qc). + +-include_lib("proper/include/proper.hrl"). + +-define(GROUP, test_group). +-define(MAX_SIZE, 5). +-define(MSG_TIMEOUT, 1000000). %% micros + +-export([prop_gm_test/0]). + +-behaviour(proper_statem). +-export([initial_state/0, command/1, precondition/2, postcondition/3, + next_state/3]). + +-behaviour(gm). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +%% Helpers +-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). + +%% For insertion into gm +-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). + +-record(state, {seq, %% symbolic and dynamic + instrumented, %% dynamic only + outstanding, %% dynamic only + monitors, %% dynamic only + all_join, %% for symbolic + to_join, %% dynamic only + to_leave %% for symbolic + }). + +prop_gm_test() -> + case ?INSTR_MOD of + ?MODULE -> ok; + _ -> exit(compile_with_INSTRUMENT_FOR_QC) + end, + process_flag(trap_exit, true), + erlang:register(?MODULE, self()), + ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). + +gm_test(Cmds) -> + {_H, State, Res} = run_commands(?MODULE, Cmds), + cleanup(State), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). + +cleanup(S) -> + S2 = ensure_joiners_joined_and_msgs_received(S), + All = gms_joined(S2), + All = gms(S2), %% assertion - none to join + check_stale_members(All), + [gm:leave(GM) || GM <- All], + drain_and_proceed_gms(S2), + [await_death(GM) || GM <- All], + gm:forget_group(?GROUP), + ok. + +check_stale_members(All) -> + GMs = [P || P <- processes(), is_gm_process(?GROUP, P)], + case GMs -- All of + [] -> ok; + Rest -> exit({forgot, Rest}) + end. + +is_gm_process(Group, P) -> + case process_info(P, dictionary) of + undefined -> false; + {dictionary, D} -> {gm, Group} =:= proplists:get_value(process_name, D) + end. + +await_death(P) -> + MRef = erlang:monitor(process, P), + await_death(MRef, P). + +await_death(MRef, P) -> + receive + {'DOWN', MRef, process, P, _} -> ok; + {'DOWN', _, _, _, _} -> await_death(MRef, P); + {'EXIT', _, normal} -> await_death(MRef, P); + {'EXIT', _, Reason} -> exit(Reason); + {joined, _GM} -> await_death(MRef, P); + {left, _GM} -> await_death(MRef, P); + Anything -> exit({stray_msg, Anything}) + end. + +%% --------------------------------------------------------------------------- +%% proper_statem +%% --------------------------------------------------------------------------- + +initial_state() -> #state{seq = 1, + outstanding = dict:new(), + instrumented = dict:new(), + monitors = dict:new(), + all_join = sets:new(), + to_join = sets:new(), + to_leave = sets:new()}. + +command(S) -> + case {length(gms_symb_not_left(S)), length(gms_symb(S))} of + {0, 0} -> qc_join(S); + {0, _} -> frequency([{1, qc_join(S)}, + {3, qc_proceed1(S)}, + {5, qc_proceed2(S)}]); + _ -> frequency([{1, qc_join(S)}, + {1, qc_leave(S)}, + {10, qc_send(S)}, + {5, qc_proceed1(S)}, + {15, qc_proceed2(S)}]) + end. + +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. +qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. +qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. +qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), + oneof(gms_symb(S))]}. + +precondition(S, {call, ?MODULE, do_join, []}) -> + length(gms_symb(S)) < ?MAX_SIZE; + +precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> + GM1 =/= GM2. + +postcondition(_S, {call, _M, _F, _A}, _Res) -> + true. + +next_state(S = #state{to_join = ToSet, + all_join = AllSet}, GM, {call, ?MODULE, do_join, []}) -> + S#state{to_join = sets:add_element(GM, ToSet), + all_join = sets:add_element(GM, AllSet)}; + +next_state(S = #state{to_leave = Set}, _Res, {call, ?MODULE, do_leave, [GM]}) -> + S#state{to_leave = sets:add_element(GM, Set)}; + +next_state(S = #state{seq = Seq, + outstanding = Outstanding}, _Res, + {call, ?MODULE, do_send, [GM]}) -> + case is_pid(GM) andalso lists:member(GM, gms_joined(S)) of + true -> + %% Dynamic state, i.e. runtime + Msg = [{sequence, Seq}, + {sent_to, GM}, + {dests, gms_joined(S)}], + gm:broadcast(GM, Msg), + Outstanding1 = dict:map( + fun (_GM, Set) -> + gb_sets:add_element(Msg, Set) + end, Outstanding), + drain(S#state{seq = Seq + 1, + outstanding = Outstanding1}); + false -> + S + end; + +next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> + proceed(Pid, S); + +next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> + proceed({From, To}, S). + +proceed(K, S = #state{instrumented = Msgs}) -> + case dict:find(K, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + S2 = proceed(K, Thing, S), + S2#state{instrumented = dict:store(K, Q2, Msgs)}; + {empty, _} -> + S + end; + error -> S + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined(Pid, _Members) -> Pid ! {joined, self()}, + ok. +members_changed(_Pid, _Bs, _Ds) -> ok. +handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. +terminate(Pid, _Reason) -> Pid ! {left, self()}. + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +do_join() -> + {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), + fun execute_mnesia_transaction/1), + GM. + +do_leave(GM) -> + gm:leave(GM), + GM. + +%% We need to update the state, so do the work in next_state +do_send( _GM) -> ok. +do_proceed1(_Pid) -> ok. +do_proceed2(_From, _To) -> ok. + +%% All GMs, joined and to join +gms(#state{outstanding = Outstanding, + to_join = ToJoin}) -> + dict:fetch_keys(Outstanding) ++ sets:to_list(ToJoin). + +%% All GMs, joined and to join +gms_joined(#state{outstanding = Outstanding}) -> + dict:fetch_keys(Outstanding). + +%% All GMs including those that have left (symbolic) +gms_symb(#state{all_join = AllJoin}) -> + sets:to_list(AllJoin). + +%% All GMs not including those that have left (symbolic) +gms_symb_not_left(#state{all_join = AllJoin, + to_leave = ToLeave}) -> + sets:to_list(sets:subtract(AllJoin, ToLeave)). + +drain(S) -> + receive + Msg -> drain(handle_msg(Msg, S)) + after 10 -> S + end. + +drain_and_proceed_gms(S0) -> + S = #state{instrumented = Msgs} = drain(S0), + case dict:size(Msgs) of + 0 -> S; + _ -> S1 = dict:fold( + fun (Key, Q, Si) -> + lists:foldl( + fun (Msg, Sij) -> + proceed(Key, Msg, Sij) + end, Si, queue:to_list(Q)) + end, S, Msgs), + drain_and_proceed_gms(S1#state{instrumented = dict:new()}) + end. + +handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> + case dict:find(GM, Outstanding) of + {ok, Set} -> + Set2 = gb_sets:del_element(Msg, Set), + S#state{outstanding = dict:store(GM, Set2, Outstanding)}; + error -> + %% Message from GM that has already died. OK. + S + end; +handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find(Key, Msgs) of + {ok, Q} -> queue:in(Thing, Q); + error -> queue:from_list([Thing]) + end, + S#state{instrumented = dict:store(Key, Q1, Msgs)}; +handle_msg({joined, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + S#state{outstanding = dict:store(GM, gb_sets:empty(), Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({left, GM}, S = #state{outstanding = Outstanding, + to_join = ToJoin}) -> + true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), + S#state{outstanding = dict:erase(GM, Outstanding), + to_join = sets:del_element(GM, ToJoin)}; +handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> + To = dict:fetch(MRef, Mons), + handle_msg({instrumented, {From, To}, {info, Msg}}, + S#state{monitors = dict:erase(MRef, Mons)}); +handle_msg({'EXIT', _From, normal}, S) -> + S; +handle_msg({'EXIT', _From, Reason}, _S) -> + %% We just trapped exits to get nicer SASL logging. + exit(Reason). + +proceed({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; +proceed({_From, To}, {info, Msg}, S) -> To ! Msg, S; +proceed({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; +proceed({From, To}, {mon, Ref}, S) -> add_monitor(From, To, Ref, S); +proceed(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; +proceed(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. + +%% NB From here is To in handle_msg/DOWN above, since the msg is going +%% the other way +add_monitor(From, To, Ref, S = #state{monitors = Mons}) -> + MRef = erlang:monitor(process, To), + From ! {mref, Ref, MRef}, + S#state{monitors = dict:store(MRef, From, Mons)}. + +%% ---------------------------------------------------------------------------- +%% Assertions +%% ---------------------------------------------------------------------------- + +ensure_joiners_joined_and_msgs_received(S0) -> + S = drain_and_proceed_gms(S0), + case outstanding_joiners(S) of + true -> ensure_joiners_joined_and_msgs_received(S); + false -> case outstanding_msgs(S) of + [] -> S; + Out -> exit({outstanding_msgs, Out}) + end + end. + +outstanding_joiners(#state{to_join = ToJoin}) -> + sets:size(ToJoin) > 0. + +outstanding_msgs(#state{outstanding = Outstanding}) -> + dict:fold(fun (GM, Set, OS) -> + case gb_sets:is_empty(Set) of + true -> OS; + false -> [{GM, gb_sets:to_list(Set)} | OS] + end + end, [], Outstanding). + +%% --------------------------------------------------------------------------- +%% For insertion into GM +%% --------------------------------------------------------------------------- + +call(Pid, Msg, infinity) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + gen_server2:call(Pid, Msg, infinity). + +cast(Pid, Msg) -> + whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, + ok. + +monitor(Pid) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, + receive + {mref, Ref, MRef} -> MRef + end. + +demonitor(MRef) -> + whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, + true. + +execute_mnesia_transaction(Fun) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + rabbit_misc:execute_mnesia_transaction(Fun). + +-else. + +-export([prop_disabled/0]). + +prop_disabled() -> + exit({compiled_without_proper, + "PropEr was not present during compilation of the test module. " + "Hence all tests are disabled."}). + +-endif. |