authorEmile Joubert <>2011-11-24 17:27:32 +0000
committerEmile Joubert <>2011-11-24 17:27:32 +0000
commit910e63d21414e32caf4f7a00f9e1117244e30884 (patch)
parent9e765f0ca144b0068f591003362f72eb9e4d2581 (diff)
Handle confirmsbug24238
1 files changed, 125 insertions, 33 deletions
diff --git a/src/rabbit_msg_store_qc.erl b/src/rabbit_msg_store_qc.erl
index d2a7fa16..5b22978a 100644
--- a/src/rabbit_msg_store_qc.erl
+++ b/src/rabbit_msg_store_qc.erl
@@ -19,23 +19,52 @@
-define(MSMOD, rabbit_msg_store).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-export([initial_state/0, command/1, precondition/2, postcondition/3,
- next_state/3]).
--record(state, {msgids,
- mscstate}).
+ next_state/3, confirm/2]).
+-record(state, {msgids, % tree of id => ms5(payload)
+ writeids, % list of {id, writecount}
+ removeids, % list of {id, removecount}
+ collect_pid, % pid for handling confirms
+ mscstate, % client state
+ ref}). % client ref
+collector(Ids) ->
+ receive
+ {confirm, Ref, {IdSet, _Written}} ->
+ collector([{Ref, gb_sets:to_list(IdSet)} | Ids]);
+ {retrieve, Ref, Pid} ->
+ Pid ! {Ref, proplists:append_values(Ref, Ids)},
+ collector(proplists:delete(Ref, Ids));
+ {reset} ->
+ collector([]);
+ X ->
+ throw({unexpected_collector_message, X})
+ end.
%% Initialise model
initial_state() ->
- Ref = {call, rabbit_guid, guid, []},
+ Ref = rabbit_guid:guid(),
+ CollectorPid = spawn(fun () -> collector([]) end),
MSCState = {call, rabbit_msg_store, client_init,
- [?PERSISTENT_MSG_STORE, Ref, undefined, undefined]},
- #state{msgids = gb_trees:empty(),
- mscstate = MSCState}.
+ Ref,
+ fun (I, W) ->
+ CollectorPid ! {confirm, Ref, {I, W}},
+ ok
+ end,
+ undefined]},
+ #state{msgids = gb_trees:empty(),
+ writeids = [],
+ removeids = [],
+ mscstate = MSCState,
+ collect_pid = CollectorPid,
+ ref = Ref}.
%% Property
@@ -51,8 +80,9 @@ prop_msg_store_test() ->
application:set_env(rabbit, msg_store_file_size_limit, 512,
- {_H, _S, Res} = run_commands(?MODULE, Cmds),
+ {_H, #state{collect_pid = CP}, Res} = run_commands(?MODULE, Cmds),
+ CP ! {reset},
io:format("Result: ~p~n", [Res]),
aggregate(command_names(Cmds), Res =:= ok))
@@ -62,75 +92,137 @@ prop_msg_store_test() ->
command(S) ->
frequency([{10, qc_write(S)},
- {1, qc_read(S)},
- {1, qc_remove(S)},
- {1, qc_contains(S)}]).
+ {3, qc_read(S)},
+ {2, qc_remove(S)},
+ {1, qc_contains(S)},
+ {5, qc_confirm(S)}]).
-qc_write(#state{mscstate = MS}) ->
- {call, ?MSMOD, write, [qc_msg_id(), qc_payload(), MS]}.
+qc_write(#state{msgids = MsgIds, mscstate = MS}) ->
+ {call, ?MSMOD, write, [frequency([{0, rand_elem(gb_trees:keys(MsgIds))},
+ {10, qc_msg_id()}]), qc_payload(), MS]}.
qc_read(#state{msgids = MsgIds, mscstate = MS}) ->
{call, ?MSMOD, read, [frequency([{10, rand_elem(gb_trees:keys(MsgIds))},
{1, qc_msg_id()}]), MS]}.
-qc_remove(#state{msgids = MsgIds, mscstate = MS}) ->
- {call, ?MSMOD, remove, [rand_sublist(gb_trees:keys(MsgIds)), MS]}.
+qc_remove(#state{mscstate = MS} = S) ->
+ {_Count, Ids} = unremoved(S),
+ {call, ?MSMOD, remove, [rand_sublist(Ids), MS]}.
qc_contains(#state{msgids = MsgIds, mscstate = MS}) ->
{call, ?MSMOD, contains, [frequency([{10, rand_elem(gb_trees:keys(MsgIds))},
{1, qc_msg_id()}]), MS]}.
+qc_confirm(#state{collect_pid = Pid, ref = Ref}) ->
+ {call, ?MODULE, confirm, [Pid, Ref]}.
%% Preconditions
-precondition(#state{msgids = MsgIds},
- {call, ?MSMOD, write, [MsgId, _Payload, _MS]}) ->
- not gb_trees:is_defined(MsgId, MsgIds);
+precondition(#state{}, {call, ?MSMOD, write, [MsgId, _Payload, _MS]}) ->
+ MsgId /= none;
-precondition(#state{msgids = MsgIds}, {call, ?MSMOD, remove, _Arg}) ->
- not gb_trees:is_empty(MsgIds);
+precondition(#state{} = S, {call, ?MSMOD, remove, _Arg}) ->
+ {Count, _Ids} = unremoved(S),
+ Count > 0;
precondition(_S, {call, ?MSMOD, _Fun, _Arg}) ->
+ true;
+precondition(_S, {call, ?MODULE, confirm, _Arg}) ->
%% Model updates
-next_state(#state{msgids = MsgIds} = S, _Res,
+next_state(#state{msgids = MsgIds, writeids = WriteIds} = S, _Res,
{call, ?MSMOD, write, [MsgId, Payload, _MS]}) ->
- S#state{msgids = gb_trees:insert(MsgId, erlang:md5(Payload), MsgIds)};
+ WriteIds1 = case lists:keysearch(MsgId, 1, WriteIds) of
+ {value, {MsgId, Count}} -> lists:keyreplace(MsgId, 1, WriteIds, {MsgId, Count + 1});
+ false -> [{MsgId, 1} | WriteIds]
+ end,
+ S#state{msgids = gb_trees:enter(MsgId, erlang:md5(Payload), MsgIds),
+ writeids = WriteIds1};
next_state(S, Res, {call, ?MSMOD, read, _Args}) ->
MS1 = {call, erlang, element, [2, Res]},
S#state{mscstate = MS1};
-next_state(#state{msgids = MsgIds} = S, _Res,
+next_state(#state{removeids = RemoveIds} = S, _Res,
{call, ?MSMOD, remove, [MsgIdList, _MS]}) ->
- S#state{msgids = lists:foldl(fun(Elem, Tree) ->
- gb_trees:delete(Elem, Tree)
- end, MsgIds, MsgIdList)};
+ RemoveIds2 =
+ lists:foldl(fun (MsgId, RemoveIds1) ->
+ case lists:keysearch(MsgId, 1, RemoveIds1) of
+ {value, {MsgId, Count}} -> lists:keyreplace(MsgId, 1, RemoveIds, {MsgId, Count + 1});
+ false -> [{MsgId, 1} | RemoveIds1]
+ end
+ end,
+ RemoveIds,
+ MsgIdList),
+ S#state{removeids = RemoveIds2};
next_state(S, _Res, {call, ?MSMOD, contains, [_MsgId, _MS]}) ->
+ S;
+next_state(S, _Res, {call, ?MODULE, confirm, _Args}) ->
%% Postconditions
-postcondition(#state{msgids = MsgIds},
+postcondition(#state{msgids = MsgIds} = S,
{call, ?MSMOD, read, [MsgId, _MS]}, Res) ->
+ {_Count, Ids} = unremoved(S),
case Res of
{{ok, Retrieved}, _MS0} ->
erlang:md5(Retrieved) == gb_trees:get(MsgId, MsgIds);
{not_found, _MS0} ->
- not gb_trees:is_defined(MsgId, MsgIds)
+ not lists:member(MsgId, Ids)
-postcondition(#state{msgids = MsgIds},
+postcondition(#state{} = S,
{call, ?MSMOD, contains, [MsgId, _MS]}, Res) ->
- Res == gb_trees:is_defined(MsgId, MsgIds);
+ {_Count, Ids} = unremoved(S),
+ Res == lists:member(MsgId, Ids);
-postcondition(_S, {call, _Mod, _Fun, _Args}, _Res) ->
- true.
+postcondition(_S, {call, ?MSMOD, _Fun, _Args}, _Res) ->
+ true;
+postcondition(#state{msgids = MsgIds}, {call, ?MODULE, confirm, _Args}, Res) ->
+ lists:foldl(fun (_I, false) -> false;
+ (I, true) -> gb_trees:is_defined(I, MsgIds)
+ end, true, Res).
%% Helpers
+confirm(CollectorPid, Ref) ->
+ CollectorPid ! {retrieve, Ref, self()},
+ receive
+ {Ref, Ids} -> Ids
+ throw({failed_to_retrieve_ids, Ref})
+ end.
+unremoved(#state{writeids = WriteIds, removeids = RemoveIds}) ->
+ unremoved(WriteIds, RemoveIds, 0, []).
+unremoved([], [], Count, Result) ->
+ {Count, Result};
+unremoved([], Removes, _, _) ->
+ throw({more_removes_than_writes, Removes});
+unremoved([{WriteId, WriteCount} | WriteRest], Removes, Count, Result) ->
+ case lists:keytake(WriteId, 1, Removes) of
+ {value, {WriteId, RemoveCount}, Removes1} ->
+ case {WriteCount, RemoveCount} of
+ {X, Y} when X > Y ->
+ unremoved(WriteRest, Removes1, Count + WriteCount - RemoveCount, [WriteId | Result]);
+ {X, X} ->
+ unremoved(WriteRest, Removes1, Count, Result);
+ _ ->
+ throw({found_too_many_removes, WriteCount, RemoveCount, WriteId})
+ end;
+ false ->
+ unremoved(WriteRest, Removes, Count + WriteCount, [WriteId | Result])
+ end.
qc_payload() ->