From 910e63d21414e32caf4f7a00f9e1117244e30884 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 24 Nov 2011 17:27:32 +0000 Subject: Handle confirms --- src/rabbit_msg_store_qc.erl | 158 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 125 insertions(+), 33 deletions(-) diff --git a/src/rabbit_msg_store_qc.erl b/src/rabbit_msg_store_qc.erl index d2a7fa16..5b22978a 100644 --- a/src/rabbit_msg_store_qc.erl +++ b/src/rabbit_msg_store_qc.erl @@ -19,23 +19,52 @@ -include_lib("proper/include/proper.hrl"). -define(MSMOD, rabbit_msg_store). +-define(CONFIRM_COLLECTOR_TIMEOUT, 10000). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -export([initial_state/0, command/1, precondition/2, postcondition/3, - next_state/3]). - --record(state, {msgids, - mscstate}). + next_state/3, confirm/2]). + +-record(state, {msgids, % tree of id => ms5(payload) + writeids, % list of {id, writecount} + removeids, % list of {id, removecount} + collect_pid, % pid for handling confirms + mscstate, % client state + ref}). % client ref + +collector(Ids) -> + receive + {confirm, Ref, {IdSet, _Written}} -> + collector([{Ref, gb_sets:to_list(IdSet)} | Ids]); + {retrieve, Ref, Pid} -> + Pid ! {Ref, proplists:append_values(Ref, Ids)}, + collector(proplists:delete(Ref, Ids)); + {reset} -> + collector([]); + X -> + throw({unexpected_collector_message, X}) + end. %% Initialise model initial_state() -> - Ref = {call, rabbit_guid, guid, []}, + Ref = rabbit_guid:guid(), + CollectorPid = spawn(fun () -> collector([]) end), MSCState = {call, rabbit_msg_store, client_init, - [?PERSISTENT_MSG_STORE, Ref, undefined, undefined]}, - #state{msgids = gb_trees:empty(), - mscstate = MSCState}. + [?PERSISTENT_MSG_STORE, + Ref, + fun (I, W) -> + CollectorPid ! {confirm, Ref, {I, W}}, + ok + end, + undefined]}, + #state{msgids = gb_trees:empty(), + writeids = [], + removeids = [], + mscstate = MSCState, + collect_pid = CollectorPid, + ref = Ref}. %% Property @@ -51,8 +80,9 @@ prop_msg_store_test() -> application:set_env(rabbit, msg_store_file_size_limit, 512, infinity), restart_msg_store_empty(), - {_H, _S, Res} = run_commands(?MODULE, Cmds), + {_H, #state{collect_pid = CP}, Res} = run_commands(?MODULE, Cmds), restart_msg_store_empty(), + CP ! {reset}, ?WHENFAIL( io:format("Result: ~p~n", [Res]), aggregate(command_names(Cmds), Res =:= ok)) @@ -62,75 +92,137 @@ prop_msg_store_test() -> command(S) -> frequency([{10, qc_write(S)}, - {1, qc_read(S)}, - {1, qc_remove(S)}, - {1, qc_contains(S)}]). + {3, qc_read(S)}, + {2, qc_remove(S)}, + {1, qc_contains(S)}, + {5, qc_confirm(S)}]). -qc_write(#state{mscstate = MS}) -> - {call, ?MSMOD, write, [qc_msg_id(), qc_payload(), MS]}. +qc_write(#state{msgids = MsgIds, mscstate = MS}) -> + {call, ?MSMOD, write, [frequency([{0, rand_elem(gb_trees:keys(MsgIds))}, + {10, qc_msg_id()}]), qc_payload(), MS]}. qc_read(#state{msgids = MsgIds, mscstate = MS}) -> {call, ?MSMOD, read, [frequency([{10, rand_elem(gb_trees:keys(MsgIds))}, {1, qc_msg_id()}]), MS]}. -qc_remove(#state{msgids = MsgIds, mscstate = MS}) -> - {call, ?MSMOD, remove, [rand_sublist(gb_trees:keys(MsgIds)), MS]}. +qc_remove(#state{mscstate = MS} = S) -> + {_Count, Ids} = unremoved(S), + {call, ?MSMOD, remove, [rand_sublist(Ids), MS]}. qc_contains(#state{msgids = MsgIds, mscstate = MS}) -> {call, ?MSMOD, contains, [frequency([{10, rand_elem(gb_trees:keys(MsgIds))}, {1, qc_msg_id()}]), MS]}. +qc_confirm(#state{collect_pid = Pid, ref = Ref}) -> + {call, ?MODULE, confirm, [Pid, Ref]}. + %% Preconditions -precondition(#state{msgids = MsgIds}, - {call, ?MSMOD, write, [MsgId, _Payload, _MS]}) -> - not gb_trees:is_defined(MsgId, MsgIds); +precondition(#state{}, {call, ?MSMOD, write, [MsgId, _Payload, _MS]}) -> + MsgId /= none; -precondition(#state{msgids = MsgIds}, {call, ?MSMOD, remove, _Arg}) -> - not gb_trees:is_empty(MsgIds); +precondition(#state{} = S, {call, ?MSMOD, remove, _Arg}) -> + {Count, _Ids} = unremoved(S), + Count > 0; precondition(_S, {call, ?MSMOD, _Fun, _Arg}) -> + true; + +precondition(_S, {call, ?MODULE, confirm, _Arg}) -> true. %% Model updates -next_state(#state{msgids = MsgIds} = S, _Res, +next_state(#state{msgids = MsgIds, writeids = WriteIds} = S, _Res, {call, ?MSMOD, write, [MsgId, Payload, _MS]}) -> - S#state{msgids = gb_trees:insert(MsgId, erlang:md5(Payload), MsgIds)}; + WriteIds1 = case lists:keysearch(MsgId, 1, WriteIds) of + {value, {MsgId, Count}} -> lists:keyreplace(MsgId, 1, WriteIds, {MsgId, Count + 1}); + false -> [{MsgId, 1} | WriteIds] + end, + + S#state{msgids = gb_trees:enter(MsgId, erlang:md5(Payload), MsgIds), + writeids = WriteIds1}; next_state(S, Res, {call, ?MSMOD, read, _Args}) -> MS1 = {call, erlang, element, [2, Res]}, S#state{mscstate = MS1}; -next_state(#state{msgids = MsgIds} = S, _Res, +next_state(#state{removeids = RemoveIds} = S, _Res, {call, ?MSMOD, remove, [MsgIdList, _MS]}) -> - S#state{msgids = lists:foldl(fun(Elem, Tree) -> - gb_trees:delete(Elem, Tree) - end, MsgIds, MsgIdList)}; + RemoveIds2 = + lists:foldl(fun (MsgId, RemoveIds1) -> + case lists:keysearch(MsgId, 1, RemoveIds1) of + {value, {MsgId, Count}} -> lists:keyreplace(MsgId, 1, RemoveIds, {MsgId, Count + 1}); + false -> [{MsgId, 1} | RemoveIds1] + end + end, + RemoveIds, + MsgIdList), + S#state{removeids = RemoveIds2}; next_state(S, _Res, {call, ?MSMOD, contains, [_MsgId, _MS]}) -> + S; + +next_state(S, _Res, {call, ?MODULE, confirm, _Args}) -> S. %% Postconditions -postcondition(#state{msgids = MsgIds}, +postcondition(#state{msgids = MsgIds} = S, {call, ?MSMOD, read, [MsgId, _MS]}, Res) -> + {_Count, Ids} = unremoved(S), case Res of {{ok, Retrieved}, _MS0} -> erlang:md5(Retrieved) == gb_trees:get(MsgId, MsgIds); {not_found, _MS0} -> - not gb_trees:is_defined(MsgId, MsgIds) + not lists:member(MsgId, Ids) end; -postcondition(#state{msgids = MsgIds}, +postcondition(#state{} = S, {call, ?MSMOD, contains, [MsgId, _MS]}, Res) -> - Res == gb_trees:is_defined(MsgId, MsgIds); + {_Count, Ids} = unremoved(S), + Res == lists:member(MsgId, Ids); -postcondition(_S, {call, _Mod, _Fun, _Args}, _Res) -> - true. +postcondition(_S, {call, ?MSMOD, _Fun, _Args}, _Res) -> + true; + +postcondition(#state{msgids = MsgIds}, {call, ?MODULE, confirm, _Args}, Res) -> + lists:foldl(fun (_I, false) -> false; + (I, true) -> gb_trees:is_defined(I, MsgIds) + end, true, Res). %% Helpers +confirm(CollectorPid, Ref) -> + CollectorPid ! {retrieve, Ref, self()}, + receive + {Ref, Ids} -> Ids + after ?CONFIRM_COLLECTOR_TIMEOUT -> + throw({failed_to_retrieve_ids, Ref}) + end. + +unremoved(#state{writeids = WriteIds, removeids = RemoveIds}) -> + unremoved(WriteIds, RemoveIds, 0, []). + +unremoved([], [], Count, Result) -> + {Count, Result}; +unremoved([], Removes, _, _) -> + throw({more_removes_than_writes, Removes}); +unremoved([{WriteId, WriteCount} | WriteRest], Removes, Count, Result) -> + case lists:keytake(WriteId, 1, Removes) of + {value, {WriteId, RemoveCount}, Removes1} -> + case {WriteCount, RemoveCount} of + {X, Y} when X > Y -> + unremoved(WriteRest, Removes1, Count + WriteCount - RemoveCount, [WriteId | Result]); + {X, X} -> + unremoved(WriteRest, Removes1, Count, Result); + _ -> + throw({found_too_many_removes, WriteCount, RemoveCount, WriteId}) + end; + false -> + unremoved(WriteRest, Removes, Count + WriteCount, [WriteId | Result]) + end. + qc_payload() -> binary(). -- cgit v1.2.1