diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-07-18 18:36:00 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-07-18 18:36:00 +0100 |
commit | 19e1278a72feb7101ae5f1af08ef32d9a7e55ff2 (patch) | |
tree | 44c0fa9848c5029f3dd5b5c597eff7703f0a5a0e | |
parent | efb66a7e5fe0b0fdd29cb6232e9eac5cb9918504 (diff) | |
download | rabbitmq-server-19e1278a72feb7101ae5f1af08ef32d9a7e55ff2.tar.gz |
Speed improvements
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 82 |
1 files changed, 42 insertions, 40 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a870ddd5..1afe20ce 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -38,7 +38,7 @@ -record(state, {bqstate, len, %% int messages, %% queue of {msg_props, basic_msg} - acks, %% list of {acktag, {msg_props, basic_msg}} + acks, %% dict of acktag => {msg_props, basic_msg} confirms}). %% set of msgid %% Initialise model @@ -47,35 +47,36 @@ initial_state() -> #state{bqstate = qc_variable_queue_init(qc_test_queue()), len = 0, messages = queue:new(), - acks = [], + acks = orddict:new(), confirms = gb_sets:new()}. %% Property prop_backing_queue_test() -> ?FORALL(Cmds, commands(?MODULE, initial_state()), - begin - {ok, FileSizeLimit} = - application:get_env(rabbit, msg_store_file_size_limit), - application:set_env(rabbit, msg_store_file_size_limit, 512, - infinity), - {ok, MaxJournal} = - application:get_env(rabbit, queue_index_max_journal_entries), - application:set_env(rabbit, queue_index_max_journal_entries, 128, - infinity), - - {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds), - - application:set_env(rabbit, msg_store_file_size_limit, - FileSizeLimit, infinity), - application:set_env(rabbit, queue_index_max_journal_entries, - MaxJournal, infinity), - - rabbit_variable_queue:delete_and_terminate(shutdown, BQ), - ?WHENFAIL( - io:format("Result: ~p~n", [Res]), - aggregate(command_names(Cmds), Res =:= ok)) - end). + backing_queue_test(Cmds)). + +backing_queue_test(Cmds) -> + {ok, FileSizeLimit} = + application:get_env(rabbit, msg_store_file_size_limit), + application:set_env(rabbit, msg_store_file_size_limit, 512, + infinity), + {ok, MaxJournal} = + application:get_env(rabbit, queue_index_max_journal_entries), + application:set_env(rabbit, queue_index_max_journal_entries, 128, + infinity), + + {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds), + + application:set_env(rabbit, msg_store_file_size_limit, + FileSizeLimit, infinity), + application:set_env(rabbit, queue_index_max_journal_entries, + MaxJournal, infinity), + + rabbit_variable_queue:delete_and_terminate(shutdown, BQ), + ?WHENFAIL( + io:format("Result: ~p~n", [Res]), + aggregate(command_names(Cmds), Res =:= ok)). %% Commands @@ -121,11 +122,11 @@ qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. qc_ack(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. + {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, requeue, - [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -153,7 +154,7 @@ qc_purge(#state{bqstate = BQ}) -> precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> - length(Acks) > 0; + orddict:size(Acks) > 0; precondition(#state{messages = Messages}, {call, ?BQMOD, publish_delivered, _Arg}) -> queue:is_empty(Messages); @@ -205,7 +206,7 @@ next_state(S, Res, _ -> Confirms end, acks = case AckReq of - true -> [{AckTag, {MsgProps, Msg}} | Acks]; + true -> orddict:append(AckTag, {MsgProps, Msg}, Acks); false -> Acks end }; @@ -222,7 +223,7 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> {{value, MsgProp_Msg}, M2} -> S2 = S1#state{len = Len - 1, messages = M2}, case AckReq of - true -> S2#state{acks = [{AckTag, MsgProp_Msg} | Acks]}; + true -> S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)}; false -> S2 end end; @@ -231,16 +232,20 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> #state{acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1, - acks = propvals_by_keys(AcksState, AcksArg)}; + acks = orddict:filter(fun (AckTag, _) -> + not lists:member(AckTag, AcksArg) + end, AcksState)}; next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> #state{len = Len, messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, - RequeueMsgs = [proplists:get_value(Key, AcksState) || Key <- AcksArg], + RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || Key <- AcksArg]), S#state{bqstate = BQ1, len = Len + length(RequeueMsgs), messages = queue:join(Messages, queue:from_list(RequeueMsgs)), - acks = propvals_by_keys(AcksState, AcksArg)}; + acks = orddict:filter(fun (AckTag, _) -> + not lists:member(AckTag, AcksArg) + end, AcksState)}; next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> S#state{bqstate = BQ}; @@ -276,7 +281,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> {_MsgProps, Msg} = queue:head(Messages), MsgFetched =:= Msg andalso - not lists:member(AckTag, Acks) andalso + not orddict:is_key(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso RemainingLen =:= Len - 1; {empty, _BQ} -> @@ -285,7 +290,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> #state{acks = Acks, confirms = Confrms} = S, - not lists:member(AckTag, Acks) andalso + not orddict:is_key(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms); postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> @@ -299,7 +304,9 @@ postcondition(#state{len = Len}, postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) -> #state{confirms = Confirms} = S, {ReportedConfirmed, _BQ} = Res, - lists:all(fun (M) -> lists:member(M, Confirms) end, ReportedConfirmed); + lists:all(fun (M) -> + gb_sets:is_element(M, Confirms) + end, ReportedConfirmed); postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) -> ?BQMOD:len(BQ) =:= Len. @@ -363,11 +370,6 @@ qc_test_queue(Durable) -> arguments = [], pid = self()}. -propvals_by_keys(Props, Keys) -> - lists:filter(fun ({Key, _Msg}) -> - not lists:member(Key, Keys) - end, Props). - rand_choice([]) -> []; rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. |