summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-07-18 18:36:00 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-07-18 18:36:00 +0100
commit19e1278a72feb7101ae5f1af08ef32d9a7e55ff2 (patch)
tree44c0fa9848c5029f3dd5b5c597eff7703f0a5a0e
parentefb66a7e5fe0b0fdd29cb6232e9eac5cb9918504 (diff)
downloadrabbitmq-server-19e1278a72feb7101ae5f1af08ef32d9a7e55ff2.tar.gz
Speed improvements
-rw-r--r--src/rabbit_backing_queue_qc.erl82
1 files changed, 42 insertions, 40 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a870ddd5..1afe20ce 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -38,7 +38,7 @@
-record(state, {bqstate,
len, %% int
messages, %% queue of {msg_props, basic_msg}
- acks, %% list of {acktag, {msg_props, basic_msg}}
+ acks, %% dict of acktag => {msg_props, basic_msg}
confirms}). %% set of msgid
%% Initialise model
@@ -47,35 +47,36 @@ initial_state() ->
#state{bqstate = qc_variable_queue_init(qc_test_queue()),
len = 0,
messages = queue:new(),
- acks = [],
+ acks = orddict:new(),
confirms = gb_sets:new()}.
%% Property
prop_backing_queue_test() ->
?FORALL(Cmds, commands(?MODULE, initial_state()),
- begin
- {ok, FileSizeLimit} =
- application:get_env(rabbit, msg_store_file_size_limit),
- application:set_env(rabbit, msg_store_file_size_limit, 512,
- infinity),
- {ok, MaxJournal} =
- application:get_env(rabbit, queue_index_max_journal_entries),
- application:set_env(rabbit, queue_index_max_journal_entries, 128,
- infinity),
-
- {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds),
-
- application:set_env(rabbit, msg_store_file_size_limit,
- FileSizeLimit, infinity),
- application:set_env(rabbit, queue_index_max_journal_entries,
- MaxJournal, infinity),
-
- rabbit_variable_queue:delete_and_terminate(shutdown, BQ),
- ?WHENFAIL(
- io:format("Result: ~p~n", [Res]),
- aggregate(command_names(Cmds), Res =:= ok))
- end).
+ backing_queue_test(Cmds)).
+
+backing_queue_test(Cmds) ->
+ {ok, FileSizeLimit} =
+ application:get_env(rabbit, msg_store_file_size_limit),
+ application:set_env(rabbit, msg_store_file_size_limit, 512,
+ infinity),
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128,
+ infinity),
+
+ {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds),
+
+ application:set_env(rabbit, msg_store_file_size_limit,
+ FileSizeLimit, infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries,
+ MaxJournal, infinity),
+
+ rabbit_variable_queue:delete_and_terminate(shutdown, BQ),
+ ?WHENFAIL(
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
%% Commands
@@ -121,11 +122,11 @@ qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
+ {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, requeue,
- [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -153,7 +154,7 @@ qc_purge(#state{bqstate = BQ}) ->
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
- length(Acks) > 0;
+ orddict:size(Acks) > 0;
precondition(#state{messages = Messages},
{call, ?BQMOD, publish_delivered, _Arg}) ->
queue:is_empty(Messages);
@@ -205,7 +206,7 @@ next_state(S, Res,
_ -> Confirms
end,
acks = case AckReq of
- true -> [{AckTag, {MsgProps, Msg}} | Acks];
+ true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
false -> Acks
end
};
@@ -222,7 +223,7 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
{{value, MsgProp_Msg}, M2} ->
S2 = S1#state{len = Len - 1, messages = M2},
case AckReq of
- true -> S2#state{acks = [{AckTag, MsgProp_Msg} | Acks]};
+ true -> S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
false -> S2
end
end;
@@ -231,16 +232,20 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1,
- acks = propvals_by_keys(AcksState, AcksArg)};
+ acks = orddict:filter(fun (AckTag, _) ->
+ not lists:member(AckTag, AcksArg)
+ end, AcksState)};
next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
#state{len = Len, messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
- RequeueMsgs = [proplists:get_value(Key, AcksState) || Key <- AcksArg],
+ RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || Key <- AcksArg]),
S#state{bqstate = BQ1,
len = Len + length(RequeueMsgs),
messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
- acks = propvals_by_keys(AcksState, AcksArg)};
+ acks = orddict:filter(fun (AckTag, _) ->
+ not lists:member(AckTag, AcksArg)
+ end, AcksState)};
next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
S#state{bqstate = BQ};
@@ -276,7 +281,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
{{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
{_MsgProps, Msg} = queue:head(Messages),
MsgFetched =:= Msg andalso
- not lists:member(AckTag, Acks) andalso
+ not orddict:is_key(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
RemainingLen =:= Len - 1;
{empty, _BQ} ->
@@ -285,7 +290,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
#state{acks = Acks, confirms = Confrms} = S,
- not lists:member(AckTag, Acks) andalso
+ not orddict:is_key(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms);
postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
@@ -299,7 +304,9 @@ postcondition(#state{len = Len},
postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
#state{confirms = Confirms} = S,
{ReportedConfirmed, _BQ} = Res,
- lists:all(fun (M) -> lists:member(M, Confirms) end, ReportedConfirmed);
+ lists:all(fun (M) ->
+ gb_sets:is_element(M, Confirms)
+ end, ReportedConfirmed);
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -363,11 +370,6 @@ qc_test_queue(Durable) ->
arguments = [],
pid = self()}.
-propvals_by_keys(Props, Keys) ->
- lists:filter(fun ({Key, _Msg}) ->
- not lists:member(Key, Keys)
- end, Props).
-
rand_choice([]) -> [];
rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].