diff options
author | Nick Vatamaniuc <vatamane@gmail.com> | 2022-11-04 14:23:47 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2022-11-18 17:04:30 -0500 |
commit | de05ea9f624d28ae99be9dba793f2e614c6f3658 (patch) | |
tree | 6f24c1f4b2370bd407b3d6f9a6b7b24e3d2ba55c | |
parent | 57b2dc340efc5f267f3200d17e30f977eedf491b (diff) | |
download | couchdb-de05ea9f624d28ae99be9dba793f2e614c6f3658.tar.gz |
Improve smoosh_priority_queue
* Remove `Value` parameter as it was never used.
* Use `make_ref()` instead of `unique_integer([monotonic])` for performance [1].
* Rewrite tests to get 100% coverage. Previous tests didn't actually run due
to a setup error.
* Switch from `size/1` to `qsize/1` as `size/1` is a built-in and we have to
write `?MODULE:size/1` everywhere.
* Do not remove elements from the gb_tree just for inspecting min and max
elements.
* Remove `last_updated/2` logic, as that will be published in an ETS table.
* Replace low level file serialization operations with `to_map/1`, `from_map/3`.
[1]
```
4> timer:tc(fun() -> [make_ref() || _ <- lists:seq(1, 1000000)], ok end ).
{488923,ok}
6> timer:tc(fun() -> [{erlang:monotonic_time(), erlang:unique_integer([monotonic])} || _ <- lists:seq(1, 1000000)], ok end ).
{1178409,ok}
```
-rw-r--r-- | src/smoosh/src/smoosh_priority_queue.erl | 281 | ||||
-rw-r--r-- | src/smoosh/test/smoosh_priority_queue_tests.erl | 167 |
2 files changed, 154 insertions, 294 deletions
diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl index 16deb0f76..b2ef4393d 100644 --- a/src/smoosh/src/smoosh_priority_queue.erl +++ b/src/smoosh/src/smoosh_priority_queue.erl @@ -12,19 +12,16 @@ -module(smoosh_priority_queue). --export([new/1, recover/1]). - --export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]). - --export([flush/1]). - --export([from_list/2, to_list/1]). - --export([is_empty/1]). - --export([write_to_file/1]). - --define(VSN, 1). +-export([ + new/1, + name/1, + in/4, + out/1, + info/1, + flush/1, + to_map/1, + from_map/3 +]). -record(priority_queue, { name, @@ -33,145 +30,175 @@ }). new(Name) -> - #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}. - -recover(#priority_queue{name = Name, map = Map0} = Q) -> - case do_recover(file_name(Q)) of - {ok, Terms} -> - Map = maps:merge(Map0, Terms), - Tree = maps:fold( - fun(Key, {TreeKey, Value}, TreeAcc) -> - gb_trees:enter(TreeKey, {Key, Value}, TreeAcc) - end, - gb_trees:empty(), - Map - ), - #priority_queue{name = Name, map = Map, tree = Tree}; - error -> - Q - end. + #priority_queue{name = Name, map = #{}, tree = gb_trees:empty()}. -write_to_file(#priority_queue{map = Map} = Q) -> - smoosh_utils:write_to_file(Map, file_name(Q), ?VSN). +name(#priority_queue{name = Name}) -> + Name. -flush(#priority_queue{name = Name} = Q) -> - Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}. +flush(#priority_queue{} = Q) -> + Q#priority_queue{map = #{}, tree = gb_trees:empty()}. -last_updated(Key, #priority_queue{map = Map}) -> +in(Key, Priority, Capacity, #priority_queue{map = Map, tree = Tree} = Q) -> case maps:find(Key, Map) of - {ok, {_Priority, {LastUpdatedMTime, _MInt}}} -> - LastUpdatedMTime; + {ok, {Priority, _}} -> + % Priority matches, keep everything as is. This might be the case + % for upgrade channels, for instance, where priority is 1. + Q; + {ok, TreeKey} -> + Tree1 = gb_trees:delete(TreeKey, Tree), + insert(Key, Priority, Capacity, Q#priority_queue{tree = Tree1}); error -> - false + insert(Key, Priority, Capacity, Q) end. -is_key(Key, #priority_queue{map = Map}) -> - maps:is_key(Key, Map). - -in(Key, Value, Priority, Q) -> - in(Key, Value, Priority, infinity, Q). - -in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, tree = Tree}) -> - Tree1 = - case maps:find(Key, Map) of - {ok, TreeKey} -> - gb_trees:delete_any(TreeKey, Tree); - error -> - Tree - end, - Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])}, - TreeKey1 = {Priority, Now}, - Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1), - Map1 = maps:put(Key, TreeKey1, Map), - truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}). - -out(#priority_queue{name = Name, map = Map, tree = Tree}) -> +out(#priority_queue{map = Map, tree = Tree} = Q) -> case gb_trees:is_empty(Tree) of true -> false; false -> - {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree), - Map1 = maps:remove(Key, Map), - Q = #priority_queue{name = Name, map = Map1, tree = Tree1}, - {Key, Value, Q} + {_, Key, Tree1} = gb_trees:take_largest(Tree), + {Key, Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1}} end. -size(#priority_queue{tree = Tree}) -> +qsize(#priority_queue{tree = Tree}) -> gb_trees:size(Tree). info(#priority_queue{tree = Tree} = Q) -> [ - {size, ?MODULE:size(Q)} + {size, qsize(Q)} | case gb_trees:is_empty(Tree) of true -> []; false -> - {Min, _, _} = gb_trees:take_smallest(Tree), - {Max, _, _} = gb_trees:take_largest(Tree), + {{Min, _}, _} = gb_trees:smallest(Tree), + {{Max, _}, _} = gb_trees:largest(Tree), [{min, Min}, {max, Max}] end ]. -from_list(Orddict, #priority_queue{name = Name}) -> - Map = maps:from_list(Orddict), - Tree = gb_trees:from_orddict(Orddict), - #priority_queue{name = Name, map = Map, tree = Tree}. - -to_list(#priority_queue{tree = Tree}) -> - gb_trees:to_list(Tree). - -is_empty(#priority_queue{tree = Tree}) -> - gb_trees:is_empty(Tree). - -file_name(#priority_queue{name = Name}) -> - filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting"). +insert(Key, Priority, Capacity, #priority_queue{tree = Tree, map = Map} = Q) -> + TreeKey = {Priority, make_ref()}, + Tree1 = gb_trees:insert(TreeKey, Key, Tree), + truncate(Capacity, Q#priority_queue{map = Map#{Key => TreeKey}, tree = Tree1}). -truncate(infinity, Q) -> - Q; -truncate(Capacity, Q) when Capacity > 0 -> - truncate(Capacity, ?MODULE:size(Q), Q). +truncate(Capacity, Q) when is_integer(Capacity), Capacity > 0 -> + truncate(Capacity, qsize(Q), Q). -truncate(Capacity, Size, Q) when Size =< Capacity -> +truncate(Capacity, Size, Q) when is_integer(Capacity), Size =< Capacity -> Q; -truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) when Size > 0 -> - {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree), - Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = Tree1}, - truncate(Capacity, ?MODULE:size(Q1), Q1). - -do_recover(FilePath) -> - case file:read_file(FilePath) of - {ok, Content} -> - <<Vsn, Binary/binary>> = Content, - try parse_queue(Vsn, ?VSN, Binary) of - Bin -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p Successfully restored state file ~s", [?MODULE, FilePath] - ), - {ok, Bin} - catch - error:Reason -> - couch_log:error( - "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath] - ), - file:delete(FilePath), - error - end; - {error, enoent} -> - Level = smoosh_utils:log_level("compaction_log_level", "debug"), - couch_log:Level( - "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath] - ), - error; - {error, Reason} -> - couch_log:error( - "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath] - ), - file:delete(FilePath), - error - end. - -parse_queue(1, ?VSN, Binary) -> - erlang:binary_to_term(Binary, [safe]); -parse_queue(Vsn, ?VSN, _) -> - error({unsupported_version, Vsn}). +truncate(Capacity, Size, Q) when is_integer(Capacity), Size > 0 -> + #priority_queue{map = Map, tree = Tree} = Q, + {_, Key, Tree1} = gb_trees:take_smallest(Tree), + Q1 = Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1}, + truncate(Capacity, qsize(Q1), Q1). + +% Serialize the queue to/from simple maps which look like #{Key => Priority}. +% The intent is for these to be used by the smoosh persistence facility. +% +to_map(#priority_queue{map = Map}) -> + Fun = fun(_Key, {Priority, _Ref}) -> + Priority + end, + maps:map(Fun, Map). + +from_map(Name, Capacity, #{} = SerializedMap) -> + Fun = fun(Key, Priority, Acc) -> + insert(Key, Priority, Capacity, Acc) + end, + maps:fold(Fun, new(Name), SerializedMap). + +-ifdef(TEST). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(K1, <<"db1">>). +-define(K2, {<<"db1">>, <<"design/_doc1">>}). +-define(K3, {index_cleanup, <<"db1">>}). +-define(P1, 1). +-define(P2, 2.4). +-define(P3, infinity). + +basics_test() -> + Q = new("foo"), + ?assertMatch(#priority_queue{}, Q), + ?assertEqual("foo", name(Q)), + ?assertEqual([{size, 0}], info(Q)). + +empty_test() -> + Q = new("foo"), + ?assertEqual(false, out(Q)), + ?assertEqual(Q, truncate(1, Q)), + ?assertEqual(Q, flush(Q)), + ?assertEqual(#{}, to_map(Q)), + ?assertEqual(Q, from_map("foo", 1, #{})). + +one_element_test() -> + Q0 = new("foo"), + Q = in(?K1, ?P1, 1, Q0), + ?assertMatch(#priority_queue{}, Q), + ?assertEqual([{size, 1}, {min, 1}, {max, 1}], info(Q)), + ?assertEqual(Q, truncate(1, Q)), + ?assertMatch({?K1, #priority_queue{}}, out(Q)), + {?K1, Q2} = out(Q), + ?assertEqual(Q2, Q0), + ?assertEqual(#{?K1 => ?P1}, to_map(Q)), + Q3 = from_map("foo", 1, to_map(Q)), + ?assertEqual("foo", name(Q3)), + ?assertEqual([{size, 1}, {min, ?P1}, {max, ?P1}], info(Q3)), + ?assertEqual(to_map(Q), to_map(Q3)), + ?assertEqual(Q0, flush(Q)). + +multiple_elements_basics_test() -> + Q0 = new("foo"), + Q1 = in(?K1, ?P1, 10, Q0), + Q2 = in(?K2, ?P2, 10, Q1), + Q3 = in(?K3, ?P3, 10, Q2), + ?assertEqual([{size, 3}, {min, ?P1}, {max, ?P3}], info(Q3)), + ?assertEqual([?K3, ?K2, ?K1], drain(Q3)). + +update_element_same_priority_test() -> + Q0 = new("foo"), + Q1 = in(?K1, ?P1, 10, Q0), + ?assertEqual(Q1, in(?K1, ?P1, 10, Q1)). + +update_element_new_priority_test() -> + Q0 = new("foo"), + Q1 = in(?K1, ?P1, 10, Q0), + Q2 = in(?K2, ?P2, 10, Q1), + Q3 = in(?K1, ?P3, 10, Q2), + ?assertEqual([{size, 2}, {min, ?P2}, {max, ?P3}], info(Q3)), + ?assertEqual([?K1, ?K2], drain(Q3)). + +capacity_test() -> + Q0 = new("foo"), + Q1 = in(?K1, ?P1, 2, Q0), + % Capacity = 1, one element only remains + ?assertEqual([?K2], drain(in(?K2, ?P2, 1, Q1))), + % Capacity = 2, only top two elements remain + Q2 = in(?K2, ?P2, 2, Q1), + Q3 = in(?K3, ?P3, 2, Q2), + ?assertEqual([?K3, ?K2], drain(Q3)). + +a_lot_of_elements_test() -> + N = 100000, + KVs = lists:map( + fun(I) -> + P = rand:uniform(100), + {{I, P}, P} + end, + lists:seq(1, N) + ), + Q = from_map("foo", N, maps:from_list(KVs)), + ?assertMatch([{size, N} | _], info(Q)), + {_, Priorities} = lists:unzip(drain(Q)), + ?assertEqual(lists:reverse(lists:sort(Priorities)), Priorities). + +drain(Q) -> + lists:reverse(drain(out(Q), [])). + +drain(false, Acc) -> + Acc; +drain({Key, Q}, Acc) -> + drain(out(Q), [Key | Acc]). + +-endif. diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl deleted file mode 100644 index 289804ca5..000000000 --- a/src/smoosh/test/smoosh_priority_queue_tests.erl +++ /dev/null @@ -1,167 +0,0 @@ --module(smoosh_priority_queue_tests). - --include_lib("proper/include/proper.hrl"). --include_lib("couch/include/couch_eunit.hrl"). - --define(PROP_PREFIX, "prop_"). - --define(CAPACITY, 3). - --define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))). - -setup() -> - Ctx = test_util:start_couch(), - Ctx. - -teardown(Ctx) -> - test_util:stop_couch(Ctx). - -smoosh_priority_queue_test_() -> - { - "smoosh priority queue test", - { - setup, - fun setup/0, - fun teardown/1, - [ - fun prop_inverse_test_/0, - fun no_halt_on_corrupted_file_test/0, - fun no_halt_on_missing_file_test/0 - ] - } - }. - -%% ========== -%% Tests -%% ---------- - -%% define all tests to be able to run them individually -prop_inverse_test_() -> - ?_test(begin - test_property(prop_inverse) - end). - -no_halt_on_corrupted_file_test() -> - ?_test(begin - Name = ?RANDOM_CHANNEL, - Q = smoosh_priority_queue:new(Name), - FilePath = smoosh_priority_queue:file_name(Q), - ok = file:write_file(FilePath, <<"garbage">>), - ?assertEqual(Q, smoosh_priority_queue:recover(Q)), - ok - end). - -no_halt_on_missing_file_test() -> - ?_test(begin - Name = ?RANDOM_CHANNEL, - Q = smoosh_priority_queue:new(Name), - FilePath = smoosh_priority_queue:file_name(Q), - ok = file:delete(FilePath), - ?assertEqual(Q, smoosh_priority_queue:recover(Q)), - ok - end). - -%% ========== -%% Properties -%% ---------- - -prop_inverse() -> - ?FORALL( - Q, - queue(), - begin - List = smoosh_priority_queue:to_list(Q), - equal(Q, smoosh_priority_queue:from_list(List, Q)) - end - ). - -%% ========== -%% Generators -%% ---------- - -key() -> - proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]). -value() -> - proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]). -priority() -> integer(). -item() -> {key(), value(), priority()}. - -items_list() -> - ?LET(L, list(item()), L). - -simple_queue() -> - ?LET( - L, - items_list(), - from_list(L) - ). - -with_deleted() -> - ?LET( - Q, - ?LET( - {{K0, V0, P0}, Q0}, - {item(), simple_queue()}, - smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0) - ), - frequency([ - {1, Q}, - {2, element(3, smoosh_priority_queue:out(Q))} - ]) - ). - -queue() -> - with_deleted(). - -%% ========================== -%% Proper related boilerplate -%% -------------------------- - -test_property(Property) when is_atom(Property) -> - test_property({atom_to_list(Property), Property}); -test_property({Id, Property}) -> - Name = string:sub_string(Id, length(?PROP_PREFIX) + 1), - Opts = [long_result, {numtests, 1000}, {to_file, user}], - {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}. - -test_it(Property, Opts) -> - case proper:quickcheck(?MODULE:Property(), Opts) of - true -> - true; - Else -> - erlang:error( - {propertyFailed, [ - {module, ?MODULE}, - {property, Property}, - {result, Else} - ]} - ) - end. - -%% ================ -%% Helper functions -%% ---------------- - -new() -> - Q = smoosh_priority_queue:new("foo"), - smoosh_priority_queue:recover(Q). - -from_list(List) -> - lists:foldl( - fun({Key, Value, Priority}, Queue) -> - smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue) - end, - new(), - List - ). - -equal(Q1, Q2) -> - out_all(Q1) =:= out_all(Q2). - -out_all(Q) -> - out_all(Q, []). -out_all(Q0, Acc) -> - case smoosh_priority_queue:out(Q0) of - {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]); - false -> lists:reverse(Acc) - end. |