summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2022-11-04 14:23:47 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2022-11-18 17:04:30 -0500
commitde05ea9f624d28ae99be9dba793f2e614c6f3658 (patch)
tree6f24c1f4b2370bd407b3d6f9a6b7b24e3d2ba55c
parent57b2dc340efc5f267f3200d17e30f977eedf491b (diff)
downloadcouchdb-de05ea9f624d28ae99be9dba793f2e614c6f3658.tar.gz
Improve smoosh_priority_queue
* Remove `Value` parameter as it was never used. * Use `make_ref()` instead of `unique_integer([monotonic])` for performance [1]. * Rewrite tests to get 100% coverage. Previous tests didn't actually run due to a setup error. * Switch from `size/1` to `qsize/1` as `size/1` is a built-in and we have to write `?MODULE:size/1` everywhere. * Do not remove elements from the gb_tree just for inspecting min and max elements. * Remove `last_updated/2` logic, as that will be published in an ETS table. * Replace low level file serialization operations with `to_map/1`, `from_map/3`. [1] ``` 4> timer:tc(fun() -> [make_ref() || _ <- lists:seq(1, 1000000)], ok end ). {488923,ok} 6> timer:tc(fun() -> [{erlang:monotonic_time(), erlang:unique_integer([monotonic])} || _ <- lists:seq(1, 1000000)], ok end ). {1178409,ok} ```
-rw-r--r--src/smoosh/src/smoosh_priority_queue.erl281
-rw-r--r--src/smoosh/test/smoosh_priority_queue_tests.erl167
2 files changed, 154 insertions, 294 deletions
diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl
index 16deb0f76..b2ef4393d 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,19 +12,16 @@
-module(smoosh_priority_queue).
--export([new/1, recover/1]).
-
--export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
-
--export([flush/1]).
-
--export([from_list/2, to_list/1]).
-
--export([is_empty/1]).
-
--export([write_to_file/1]).
-
--define(VSN, 1).
+-export([
+ new/1,
+ name/1,
+ in/4,
+ out/1,
+ info/1,
+ flush/1,
+ to_map/1,
+ from_map/3
+]).
-record(priority_queue, {
name,
@@ -33,145 +30,175 @@
}).
new(Name) ->
- #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
-
-recover(#priority_queue{name = Name, map = Map0} = Q) ->
- case do_recover(file_name(Q)) of
- {ok, Terms} ->
- Map = maps:merge(Map0, Terms),
- Tree = maps:fold(
- fun(Key, {TreeKey, Value}, TreeAcc) ->
- gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
- end,
- gb_trees:empty(),
- Map
- ),
- #priority_queue{name = Name, map = Map, tree = Tree};
- error ->
- Q
- end.
+ #priority_queue{name = Name, map = #{}, tree = gb_trees:empty()}.
-write_to_file(#priority_queue{map = Map} = Q) ->
- smoosh_utils:write_to_file(Map, file_name(Q), ?VSN).
+name(#priority_queue{name = Name}) ->
+ Name.
-flush(#priority_queue{name = Name} = Q) ->
- Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
+flush(#priority_queue{} = Q) ->
+ Q#priority_queue{map = #{}, tree = gb_trees:empty()}.
-last_updated(Key, #priority_queue{map = Map}) ->
+in(Key, Priority, Capacity, #priority_queue{map = Map, tree = Tree} = Q) ->
case maps:find(Key, Map) of
- {ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
- LastUpdatedMTime;
+ {ok, {Priority, _}} ->
+ % Priority matches, keep everything as is. This might be the case
+ % for upgrade channels, for instance, where priority is 1.
+ Q;
+ {ok, TreeKey} ->
+ Tree1 = gb_trees:delete(TreeKey, Tree),
+ insert(Key, Priority, Capacity, Q#priority_queue{tree = Tree1});
error ->
- false
+ insert(Key, Priority, Capacity, Q)
end.
-is_key(Key, #priority_queue{map = Map}) ->
- maps:is_key(Key, Map).
-
-in(Key, Value, Priority, Q) ->
- in(Key, Value, Priority, infinity, Q).
-
-in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, tree = Tree}) ->
- Tree1 =
- case maps:find(Key, Map) of
- {ok, TreeKey} ->
- gb_trees:delete_any(TreeKey, Tree);
- error ->
- Tree
- end,
- Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
- TreeKey1 = {Priority, Now},
- Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
- Map1 = maps:put(Key, TreeKey1, Map),
- truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}).
-
-out(#priority_queue{name = Name, map = Map, tree = Tree}) ->
+out(#priority_queue{map = Map, tree = Tree} = Q) ->
case gb_trees:is_empty(Tree) of
true ->
false;
false ->
- {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
- Map1 = maps:remove(Key, Map),
- Q = #priority_queue{name = Name, map = Map1, tree = Tree1},
- {Key, Value, Q}
+ {_, Key, Tree1} = gb_trees:take_largest(Tree),
+ {Key, Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1}}
end.
-size(#priority_queue{tree = Tree}) ->
+qsize(#priority_queue{tree = Tree}) ->
gb_trees:size(Tree).
info(#priority_queue{tree = Tree} = Q) ->
[
- {size, ?MODULE:size(Q)}
+ {size, qsize(Q)}
| case gb_trees:is_empty(Tree) of
true ->
[];
false ->
- {Min, _, _} = gb_trees:take_smallest(Tree),
- {Max, _, _} = gb_trees:take_largest(Tree),
+ {{Min, _}, _} = gb_trees:smallest(Tree),
+ {{Max, _}, _} = gb_trees:largest(Tree),
[{min, Min}, {max, Max}]
end
].
-from_list(Orddict, #priority_queue{name = Name}) ->
- Map = maps:from_list(Orddict),
- Tree = gb_trees:from_orddict(Orddict),
- #priority_queue{name = Name, map = Map, tree = Tree}.
-
-to_list(#priority_queue{tree = Tree}) ->
- gb_trees:to_list(Tree).
-
-is_empty(#priority_queue{tree = Tree}) ->
- gb_trees:is_empty(Tree).
-
-file_name(#priority_queue{name = Name}) ->
- filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting").
+insert(Key, Priority, Capacity, #priority_queue{tree = Tree, map = Map} = Q) ->
+ TreeKey = {Priority, make_ref()},
+ Tree1 = gb_trees:insert(TreeKey, Key, Tree),
+ truncate(Capacity, Q#priority_queue{map = Map#{Key => TreeKey}, tree = Tree1}).
-truncate(infinity, Q) ->
- Q;
-truncate(Capacity, Q) when Capacity > 0 ->
- truncate(Capacity, ?MODULE:size(Q), Q).
+truncate(Capacity, Q) when is_integer(Capacity), Capacity > 0 ->
+ truncate(Capacity, qsize(Q), Q).
-truncate(Capacity, Size, Q) when Size =< Capacity ->
+truncate(Capacity, Size, Q) when is_integer(Capacity), Size =< Capacity ->
Q;
-truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) when Size > 0 ->
- {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
- Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = Tree1},
- truncate(Capacity, ?MODULE:size(Q1), Q1).
-
-do_recover(FilePath) ->
- case file:read_file(FilePath) of
- {ok, Content} ->
- <<Vsn, Binary/binary>> = Content,
- try parse_queue(Vsn, ?VSN, Binary) of
- Bin ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p Successfully restored state file ~s", [?MODULE, FilePath]
- ),
- {ok, Bin}
- catch
- error:Reason ->
- couch_log:error(
- "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end;
- {error, enoent} ->
- Level = smoosh_utils:log_level("compaction_log_level", "debug"),
- couch_log:Level(
- "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
- ),
- error;
- {error, Reason} ->
- couch_log:error(
- "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
- ),
- file:delete(FilePath),
- error
- end.
-
-parse_queue(1, ?VSN, Binary) ->
- erlang:binary_to_term(Binary, [safe]);
-parse_queue(Vsn, ?VSN, _) ->
- error({unsupported_version, Vsn}).
+truncate(Capacity, Size, Q) when is_integer(Capacity), Size > 0 ->
+ #priority_queue{map = Map, tree = Tree} = Q,
+ {_, Key, Tree1} = gb_trees:take_smallest(Tree),
+ Q1 = Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1},
+ truncate(Capacity, qsize(Q1), Q1).
+
+% Serialize the queue to/from simple maps which look like #{Key => Priority}.
+% The intent is for these to be used by the smoosh persistence facility.
+%
+to_map(#priority_queue{map = Map}) ->
+ Fun = fun(_Key, {Priority, _Ref}) ->
+ Priority
+ end,
+ maps:map(Fun, Map).
+
+from_map(Name, Capacity, #{} = SerializedMap) ->
+ Fun = fun(Key, Priority, Acc) ->
+ insert(Key, Priority, Capacity, Acc)
+ end,
+ maps:fold(Fun, new(Name), SerializedMap).
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(K1, <<"db1">>).
+-define(K2, {<<"db1">>, <<"design/_doc1">>}).
+-define(K3, {index_cleanup, <<"db1">>}).
+-define(P1, 1).
+-define(P2, 2.4).
+-define(P3, infinity).
+
+basics_test() ->
+ Q = new("foo"),
+ ?assertMatch(#priority_queue{}, Q),
+ ?assertEqual("foo", name(Q)),
+ ?assertEqual([{size, 0}], info(Q)).
+
+empty_test() ->
+ Q = new("foo"),
+ ?assertEqual(false, out(Q)),
+ ?assertEqual(Q, truncate(1, Q)),
+ ?assertEqual(Q, flush(Q)),
+ ?assertEqual(#{}, to_map(Q)),
+ ?assertEqual(Q, from_map("foo", 1, #{})).
+
+one_element_test() ->
+ Q0 = new("foo"),
+ Q = in(?K1, ?P1, 1, Q0),
+ ?assertMatch(#priority_queue{}, Q),
+ ?assertEqual([{size, 1}, {min, 1}, {max, 1}], info(Q)),
+ ?assertEqual(Q, truncate(1, Q)),
+ ?assertMatch({?K1, #priority_queue{}}, out(Q)),
+ {?K1, Q2} = out(Q),
+ ?assertEqual(Q2, Q0),
+ ?assertEqual(#{?K1 => ?P1}, to_map(Q)),
+ Q3 = from_map("foo", 1, to_map(Q)),
+ ?assertEqual("foo", name(Q3)),
+ ?assertEqual([{size, 1}, {min, ?P1}, {max, ?P1}], info(Q3)),
+ ?assertEqual(to_map(Q), to_map(Q3)),
+ ?assertEqual(Q0, flush(Q)).
+
+multiple_elements_basics_test() ->
+ Q0 = new("foo"),
+ Q1 = in(?K1, ?P1, 10, Q0),
+ Q2 = in(?K2, ?P2, 10, Q1),
+ Q3 = in(?K3, ?P3, 10, Q2),
+ ?assertEqual([{size, 3}, {min, ?P1}, {max, ?P3}], info(Q3)),
+ ?assertEqual([?K3, ?K2, ?K1], drain(Q3)).
+
+update_element_same_priority_test() ->
+ Q0 = new("foo"),
+ Q1 = in(?K1, ?P1, 10, Q0),
+ ?assertEqual(Q1, in(?K1, ?P1, 10, Q1)).
+
+update_element_new_priority_test() ->
+ Q0 = new("foo"),
+ Q1 = in(?K1, ?P1, 10, Q0),
+ Q2 = in(?K2, ?P2, 10, Q1),
+ Q3 = in(?K1, ?P3, 10, Q2),
+ ?assertEqual([{size, 2}, {min, ?P2}, {max, ?P3}], info(Q3)),
+ ?assertEqual([?K1, ?K2], drain(Q3)).
+
+capacity_test() ->
+ Q0 = new("foo"),
+ Q1 = in(?K1, ?P1, 2, Q0),
+ % Capacity = 1, one element only remains
+ ?assertEqual([?K2], drain(in(?K2, ?P2, 1, Q1))),
+ % Capacity = 2, only top two elements remain
+ Q2 = in(?K2, ?P2, 2, Q1),
+ Q3 = in(?K3, ?P3, 2, Q2),
+ ?assertEqual([?K3, ?K2], drain(Q3)).
+
+a_lot_of_elements_test() ->
+ N = 100000,
+ KVs = lists:map(
+ fun(I) ->
+ P = rand:uniform(100),
+ {{I, P}, P}
+ end,
+ lists:seq(1, N)
+ ),
+ Q = from_map("foo", N, maps:from_list(KVs)),
+ ?assertMatch([{size, N} | _], info(Q)),
+ {_, Priorities} = lists:unzip(drain(Q)),
+ ?assertEqual(lists:reverse(lists:sort(Priorities)), Priorities).
+
+drain(Q) ->
+ lists:reverse(drain(out(Q), [])).
+
+drain(false, Acc) ->
+ Acc;
+drain({Key, Q}, Acc) ->
+ drain(out(Q), [Key | Acc]).
+
+-endif.
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl
deleted file mode 100644
index 289804ca5..000000000
--- a/src/smoosh/test/smoosh_priority_queue_tests.erl
+++ /dev/null
@@ -1,167 +0,0 @@
--module(smoosh_priority_queue_tests).
-
--include_lib("proper/include/proper.hrl").
--include_lib("couch/include/couch_eunit.hrl").
-
--define(PROP_PREFIX, "prop_").
-
--define(CAPACITY, 3).
-
--define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))).
-
-setup() ->
- Ctx = test_util:start_couch(),
- Ctx.
-
-teardown(Ctx) ->
- test_util:stop_couch(Ctx).
-
-smoosh_priority_queue_test_() ->
- {
- "smoosh priority queue test",
- {
- setup,
- fun setup/0,
- fun teardown/1,
- [
- fun prop_inverse_test_/0,
- fun no_halt_on_corrupted_file_test/0,
- fun no_halt_on_missing_file_test/0
- ]
- }
- }.
-
-%% ==========
-%% Tests
-%% ----------
-
-%% define all tests to be able to run them individually
-prop_inverse_test_() ->
- ?_test(begin
- test_property(prop_inverse)
- end).
-
-no_halt_on_corrupted_file_test() ->
- ?_test(begin
- Name = ?RANDOM_CHANNEL,
- Q = smoosh_priority_queue:new(Name),
- FilePath = smoosh_priority_queue:file_name(Q),
- ok = file:write_file(FilePath, <<"garbage">>),
- ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
- ok
- end).
-
-no_halt_on_missing_file_test() ->
- ?_test(begin
- Name = ?RANDOM_CHANNEL,
- Q = smoosh_priority_queue:new(Name),
- FilePath = smoosh_priority_queue:file_name(Q),
- ok = file:delete(FilePath),
- ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
- ok
- end).
-
-%% ==========
-%% Properties
-%% ----------
-
-prop_inverse() ->
- ?FORALL(
- Q,
- queue(),
- begin
- List = smoosh_priority_queue:to_list(Q),
- equal(Q, smoosh_priority_queue:from_list(List, Q))
- end
- ).
-
-%% ==========
-%% Generators
-%% ----------
-
-key() ->
- proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
-value() ->
- proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
-priority() -> integer().
-item() -> {key(), value(), priority()}.
-
-items_list() ->
- ?LET(L, list(item()), L).
-
-simple_queue() ->
- ?LET(
- L,
- items_list(),
- from_list(L)
- ).
-
-with_deleted() ->
- ?LET(
- Q,
- ?LET(
- {{K0, V0, P0}, Q0},
- {item(), simple_queue()},
- smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)
- ),
- frequency([
- {1, Q},
- {2, element(3, smoosh_priority_queue:out(Q))}
- ])
- ).
-
-queue() ->
- with_deleted().
-
-%% ==========================
-%% Proper related boilerplate
-%% --------------------------
-
-test_property(Property) when is_atom(Property) ->
- test_property({atom_to_list(Property), Property});
-test_property({Id, Property}) ->
- Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
- Opts = [long_result, {numtests, 1000}, {to_file, user}],
- {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
-
-test_it(Property, Opts) ->
- case proper:quickcheck(?MODULE:Property(), Opts) of
- true ->
- true;
- Else ->
- erlang:error(
- {propertyFailed, [
- {module, ?MODULE},
- {property, Property},
- {result, Else}
- ]}
- )
- end.
-
-%% ================
-%% Helper functions
-%% ----------------
-
-new() ->
- Q = smoosh_priority_queue:new("foo"),
- smoosh_priority_queue:recover(Q).
-
-from_list(List) ->
- lists:foldl(
- fun({Key, Value, Priority}, Queue) ->
- smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
- end,
- new(),
- List
- ).
-
-equal(Q1, Q2) ->
- out_all(Q1) =:= out_all(Q2).
-
-out_all(Q) ->
- out_all(Q, []).
-out_all(Q0, Acc) ->
- case smoosh_priority_queue:out(Q0) of
- {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
- false -> lists:reverse(Acc)
- end.