summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-03-08 00:03:15 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2020-03-09 15:10:34 -0400
commit02ca72ba34eb1768a631f12e34022464cf70278f (patch)
treea7e77128c9902077e2d4559df827c1625fc41f4a
parent17ce741c7cfcb14df7a8f24d38a59a7bf302dd5d (diff)
downloadcouchdb-02ca72ba34eb1768a631f12e34022464cf70278f.tar.gz
Implement a simple index auto-updater
The main logic is as follows: - After doc update transaction(s) are completed in the `fabric2_db` module, call the `fabric2_index:db_updated(Db)` function. - `fabric2_index:db_updated(Db)` inserts a `{DbName, Timestamp}` tuple into one of the sharded ets tables. The tuple is inserted using `ets:insert_new`, which ensures only the first entry succeeds and others will be ignored, until that entry is cleared. - Each ets table in `fabric2_index` has a simple monitor process that periodically scans that table. If it finds databases which have been updated, it notifies all the indices which have registered with `fabric2_index` to build indices. There are config settings to disable index auto-updating and to adjust the delay interval, and the resolution. The interval specifies how long to wait since the first time the db was modified. The resolution interval specifies how often to check the ets tables. Just like in the original ken, design documents can have an `"autoupdate": false` option to disable auto-updating that design document only.
-rw-r--r--rel/files/eunit.ini6
-rw-r--r--rel/overlay/etc/default.ini10
-rw-r--r--src/fabric/src/fabric2_db.erl4
-rw-r--r--src/fabric/src/fabric2_index.erl222
-rw-r--r--src/fabric/src/fabric2_sup.erl8
-rw-r--r--src/fabric/test/fabric2_index_tests.erl304
6 files changed, 553 insertions, 1 deletions
diff --git a/rel/files/eunit.ini b/rel/files/eunit.ini
index 361ea6669..2b73ab307 100644
--- a/rel/files/eunit.ini
+++ b/rel/files/eunit.ini
@@ -35,4 +35,8 @@ level = info
[replicator]
; disable jitter to reduce test run times
-startup_jitter = 0 \ No newline at end of file
+startup_jitter = 0
+
+[fabric]
+; disable index auto-updater to avoid interfering with some of the tests
+index_updater_enabled = false
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 4c978b29c..749cdd27f 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -226,6 +226,16 @@ port = 6984
; should have a matching directory prefix in order to read and write the same
; data. Changes to this value take effect only on node start-up.
;fdb_directory = couchdb
+;
+; Enable or disable index auto-updater
+;index_autoupdater_enabled = true
+;
+; How long to wait from the first db update event until index building is
+; triggered.
+;index_autoupdater_delay_msec = 60000
+;
+; How often to check if databases may need their indices updated.
+;index_autoupdater_resolution_msec = 10000
; [rexi]
; buffer_count = 2000
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index b0f7849e2..791282f63 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -733,6 +733,10 @@ update_docs(Db, Docs0, Options) ->
end)
end, Docs1)
end,
+
+ % Notify index builder
+ fabric2_index:db_updated(name(Db)),
+
% Convert errors
Resps1 = lists:map(fun(Resp) ->
case Resp of
diff --git a/src/fabric/src/fabric2_index.erl b/src/fabric/src/fabric2_index.erl
new file mode 100644
index 000000000..938210514
--- /dev/null
+++ b/src/fabric/src/fabric2_index.erl
@@ -0,0 +1,222 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_index).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ register_index/1,
+ db_updated/1,
+ start_link/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-callback build_indices(Db :: map(), DDocs :: list(#doc{})) ->
+ [{ok, JobId::binary()} | {error, any()}].
+
+
+-define(SHARDS, 32).
+-define(DEFAULT_DELAY_MSEC, 60000).
+-define(DEFAULT_RESOLUTION_MSEC, 10000).
+
+
+register_index(Mod) when is_atom(Mod) ->
+ Indices = lists:usort([Mod | registrations()]),
+ application:set_env(fabric, indices, Indices).
+
+
+db_updated(DbName) when is_binary(DbName) ->
+ Table = table(erlang:phash2(DbName) rem ?SHARDS),
+ ets:insert_new(Table, {DbName, now_msec()}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ lists:foreach(fun(T) ->
+ spawn_link(fun() -> process_loop(T) end)
+ end, create_tables()),
+ {ok, nil}.
+
+
+terminate(_M, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+create_tables() ->
+ Opts = [
+ named_table,
+ public,
+ {write_concurrency, true},
+ {read_concurrency, true}
+ ],
+ Tables = [table(N) || N <- lists:seq(0, ?SHARDS - 1)],
+ [ets:new(T, Opts) || T <- Tables].
+
+
+table(Id) when is_integer(Id), Id >= 0 andalso Id < ?SHARDS ->
+ list_to_atom("fabric2_index_" ++ integer_to_list(Id)).
+
+
+process_loop(Table) ->
+ Now = now_msec(),
+ Delay = delay_msec(),
+ Since = Now - Delay,
+ case is_enabled() of
+ true ->
+ process_updates(Table, Since),
+ clean_stale(Table, Since);
+ false ->
+ clean_stale(Table, Now)
+ end,
+ Resolution = resolution_msec(),
+ Jitter = rand:uniform(1 + Resolution div 2),
+ timer:sleep(Resolution + Jitter),
+ process_loop(Table).
+
+
+clean_stale(Table, Since) ->
+ Head = {'_', '$1'},
+ Guard = {'<', '$1', Since},
+ % Monotonic is not strictly monotonic, so we process items using `=<` but
+ % clean with `<` in case there was an update with the same timestamp after
+ % we started processing already at that timestamp.
+ ets:select_delete(Table, [{Head, [Guard], [true]}]).
+
+
+process_updates(Table, Since) ->
+ Head = {'$1', '$2'},
+ Guard = {'=<', '$2', Since},
+ case ets:select(Table, [{Head, [Guard], ['$1']}], 25) of
+ '$end_of_table' -> ok;
+ {Match, Cont} -> process_updates_iter(Match, Cont)
+ end.
+
+
+process_updates_iter([], Cont) ->
+ case ets:select(Cont) of
+ '$end_of_table' -> ok;
+ {Match, Cont1} -> process_updates_iter(Match, Cont1)
+ end;
+
+process_updates_iter([Db | Rest], Cont) ->
+ try
+ process_db(Db)
+ catch
+ error:database_does_not_exist ->
+ ok;
+ Tag:Reason ->
+ Stack = erlang:get_stacktrace(),
+ LogMsg = "~p failed to build indices for `~s` ~p:~p ~p",
+ couch_log:error(LogMsg, [?MODULE, Db, Tag, Reason, Stack])
+ end,
+ process_updates_iter(Rest, Cont).
+
+
+build_indices(_Db, []) ->
+ [];
+
+build_indices(Db, DDocs) ->
+ lists:flatmap(fun(Mod) ->
+ Mod:build_indices(Db, DDocs)
+ end, registrations()).
+
+
+registrations() ->
+ application:get_env(fabric, indices, []).
+
+
+process_db(DbName) when is_binary(DbName) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ DDocs1 = get_design_docs(TxDb),
+ DDocs2 = lists:filter(fun should_update/1, DDocs1),
+ DDocs3 = shuffle(DDocs2),
+ build_indices(TxDb, DDocs3)
+ end).
+
+
+get_design_docs(Db) ->
+ Callback = fun
+ ({meta, _}, Acc) -> {ok, Acc};
+ (complete, Acc) -> {ok, Acc};
+ ({row, Row}, Acc) -> {ok, [get_doc(Db, Row) | Acc]}
+ end,
+ {ok, DDocs} = fabric2_db:fold_design_docs(Db, Callback, [], []),
+ DDocs.
+
+
+get_doc(Db, Row) ->
+ {_, DocId} = lists:keyfind(id, 1, Row),
+ {ok, #doc{deleted = false} = Doc} = fabric2_db:open_doc(Db, DocId, []),
+ Doc.
+
+
+should_update(#doc{body = {Props}}) ->
+ couch_util:get_value(<<"autoupdate">>, Props, true).
+
+
+shuffle(Items) ->
+ Tagged = [{rand:uniform(), I} || I <- Items],
+ Sorted = lists:sort(Tagged),
+ [I || {_T, I} <- Sorted].
+
+
+now_msec() ->
+ erlang:monotonic_time(millisecond).
+
+
+is_enabled() ->
+ config:get_boolean("fabric", "index_updater_enabled", true).
+
+
+delay_msec() ->
+ config:get_integer("fabric", "index_updater_delay_msec",
+ ?DEFAULT_DELAY_MSEC).
+
+
+resolution_msec() ->
+ config:get_integer("fabric", "index_updater_resolution_msec",
+ ?DEFAULT_RESOLUTION_MSEC).
diff --git a/src/fabric/src/fabric2_sup.erl b/src/fabric/src/fabric2_sup.erl
index 2510b13bb..e8201b4ee 100644
--- a/src/fabric/src/fabric2_sup.erl
+++ b/src/fabric/src/fabric2_sup.erl
@@ -47,6 +47,14 @@ init([]) ->
5000,
worker,
[fabric2_server]
+ },
+ {
+ fabric2_index,
+ {fabric2_index, start_link, []},
+ permanent,
+ 5000,
+ worker,
+ [fabric2_index]
}
],
ChildrenWithEpi = couch_epi:register_service(fabric2_epi, Children),
diff --git a/src/fabric/test/fabric2_index_tests.erl b/src/fabric/test/fabric2_index_tests.erl
new file mode 100644
index 000000000..3fc8a5b18
--- /dev/null
+++ b/src/fabric/test/fabric2_index_tests.erl
@@ -0,0 +1,304 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_index_tests).
+
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include("fabric2_test.hrl").
+
+
+% Should match fabric2_index define
+-define(SHARDS, 32).
+
+
+index_test_() ->
+ {
+ "Test fabric indexing module",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ with([
+ ?TDEF(register_index_works),
+ ?TDEF(single_update),
+ ?TDEF(multiple_updates),
+ ?TDEF(skip_db_if_no_ddocs),
+ ?TDEF(ignore_deleted_dbs),
+ ?TDEF(check_gen_server_messages)
+ ])
+ }
+ }.
+
+
+index_process_cleanup_test_() ->
+ {
+ "Test fabric process cleanup in indexing module",
+ {
+ foreach,
+ fun setup/0,
+ fun cleanup/1,
+ [
+ ?TDEF_FE(updater_processes_start),
+ ?TDEF_FE(updater_processes_stop),
+ ?TDEF_FE(indexing_can_be_disabled),
+ ?TDEF_FE(handle_indexer_blowing_up)
+ ]
+ }
+ }.
+
+
+setup() ->
+ meck:new(config, [passthrough]),
+ meck:expect(config, get_integer, fun
+ ("fabric", "index_updater_delay_msec", _) -> 200;
+ ("fabric", "index_updater_resolution_msec", _) -> 100;
+
+ (_, _, Default) -> Default
+ end),
+ meck:expect(config, get_boolean, fun
+ ("fabric", "index_updater_enabled", _) -> true;
+ (_, _, Default) -> Default
+ end),
+
+ Indices = application:get_env(fabric, indices, []),
+
+ Ctx = test_util:start_couch([fabric]),
+
+ % Db1 has a valid design doc, a deleted one and one with "autoupdate":false
+ {ok, Db1} = fabric2_db:create(?tempdb(), [?ADMIN_CTX]),
+ {_, _} = create_doc(Db1, <<"_design/doc1">>),
+
+ DDocId2 = <<"_design/doc2">>,
+ {DDocId2, {Pos, Rev}} = create_doc(Db1, DDocId2),
+ Delete2 = #doc{id = DDocId2, revs = {Pos, [Rev]}, deleted = true},
+ {ok, _} = fabric2_db:update_doc(Db1, Delete2),
+
+ NoAutoUpdate = {[{<<"autoupdate">>, false}]},
+ {_, _} = create_doc(Db1, <<"_design/doc3">>, NoAutoUpdate),
+
+ % Db2 doesn't have any desig documents
+ {ok, Db2} = fabric2_db:create(?tempdb(), [?ADMIN_CTX]),
+
+ #{db1 => Db1, db2 => Db2, ctx => Ctx, indices => Indices}.
+
+
+cleanup(#{db1 := Db1, db2 := Db2, ctx := Ctx, indices := Indices}) ->
+ catch fabric2_db:delete(fabric2_db:name(Db1), []),
+ catch fabric2_db:delete(fabric2_db:name(Db2), []),
+
+ test_util:stop_couch(Ctx),
+ application:set_env(fabric, indices, Indices),
+
+ meck:unload().
+
+
+register_index_works(_) ->
+ reset_callbacks(),
+
+ Mod1 = fabric2_test_callback1,
+ fabric2_index:register_index(Mod1),
+ Indices1 = application:get_env(fabric, indices, []),
+ ?assertEqual([Mod1], Indices1),
+
+ Mod2 = fabric2_test_callback2,
+ fabric2_index:register_index(Mod2),
+ Indices2 = application:get_env(fabric, indices, []),
+ ?assertEqual(lists:sort([Mod1, Mod2]), lists:sort(Indices2)).
+
+
+single_update(#{db1 := Db}) ->
+ reset_callbacks(),
+
+ Mod = fabric2_test_callback3,
+ setup_callback(Mod),
+ create_doc(Db),
+
+ meck:wait(Mod, build_indices, 2, 2000),
+ ?assertEqual(1, meck:num_calls(Mod, build_indices, 2)).
+
+
+multiple_updates(#{db1 := Db}) ->
+ reset_callbacks(),
+
+ Mod = fabric2_test_callback4,
+ setup_callback(Mod),
+ create_docs(Db, 10),
+
+ % should be called at least once
+ meck:wait(Mod, build_indices, 2, 2000),
+
+ % Maybe called another time or two at most
+ timer:sleep(500),
+ ?assert(meck:num_calls(Mod, build_indices, 2) =< 3).
+
+
+skip_db_if_no_ddocs(#{db2 := Db}) ->
+ reset_callbacks(),
+
+ Mod = fabric2_test_callback5,
+ setup_callback(Mod),
+ create_doc(Db),
+
+ timer:sleep(500),
+ ?assertEqual(0, meck:num_calls(Mod, build_indices, 2)).
+
+
+ignore_deleted_dbs(#{}) ->
+ reset_callbacks(),
+
+ Mod = fabric2_test_callback6,
+ setup_callback(Mod),
+ lists:foreach(fun(_) ->
+ RandomDbName = fabric2_util:uuid(),
+ fabric2_index:db_updated(RandomDbName)
+ end, lists:seq(1, 10000)),
+
+ test_util:wait(fun() ->
+ case table_sizes() =:= 0 of
+ true -> ok;
+ false -> wait
+ end
+ end, 5000).
+
+
+check_gen_server_messages(#{}) ->
+ CallExpect = {stop, {bad_call, foo}, {bad_call, foo}, baz},
+ CastExpect = {stop, {bad_cast, foo}, bar},
+ InfoExpect = {stop, {bad_info, foo}, bar},
+ ?assertEqual(CallExpect, fabric2_index:handle_call(foo, bar, baz)),
+ ?assertEqual(CastExpect, fabric2_index:handle_cast(foo, bar)),
+ ?assertEqual(InfoExpect, fabric2_index:handle_info(foo, bar)),
+ ?assertEqual(ok, fabric2_index:terminate(shutdown, nil)),
+ ?assertEqual({ok, nil}, fabric2_index:code_change(v0, nil, extra)).
+
+
+updater_processes_start(#{}) ->
+ Pid = whereis(fabric2_index),
+ ?assert(is_process_alive(Pid)),
+ lists:map(fun(N) ->
+ ?assertEqual(tid(N), ets:info(tid(N), name))
+ end, lists:seq(0, ?SHARDS - 1)).
+
+
+updater_processes_stop(#{}) ->
+ Refs = lists:map(fun(N) ->
+ Pid = ets:info(tid(N), owner),
+ ?assert(is_process_alive(Pid)),
+ monitor(process, Pid)
+ end, lists:seq(0, ?SHARDS - 1)),
+
+ % We stop but don't restart fabric after this as we're running in a foreach
+ % test list where app restart happens after each test.
+ application:stop(fabric),
+
+ lists:foreach(fun(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, _} -> ok
+ after 3000 ->
+ ?assert(false)
+ end
+ end, Refs).
+
+
+indexing_can_be_disabled(#{db1 := Db}) ->
+ Mod = fabric2_test_callback7,
+ setup_callback(Mod),
+
+ meck:expect(config, get_boolean, fun
+ ("fabric", "index_updater_enabled", _) -> false;
+ (_, _, Default) -> Default
+ end),
+
+ create_doc(Db),
+ timer:sleep(500),
+ ?assertEqual(0, meck:num_calls(Mod, build_indices, 2)),
+
+ meck:expect(config, get_boolean, fun
+ ("fabric", "index_updater_enabled", _) -> true;
+ (_, _, Default) -> Default
+ end),
+
+ create_doc(Db),
+ meck:wait(Mod, build_indices, 2, 2000).
+
+
+handle_indexer_blowing_up(#{db1 := Db}) ->
+ Mod = fabric2_test_callback8,
+ setup_callback(Mod),
+ meck:expect(Mod, build_indices, fun(_, _) -> error(bad_index) end),
+
+ MainPid = whereis(fabric2_index),
+ WPids1 = [ets:info(tid(N), owner) || N <- lists:seq(0, ?SHARDS - 1)],
+
+ create_doc(Db),
+ meck:wait(Mod, build_indices, 2, 2000),
+
+ ?assert(is_process_alive(MainPid)),
+
+ WPids2 = [ets:info(tid(N), owner) || N <- lists:seq(0, ?SHARDS - 1)],
+ ?assertEqual(lists:sort(WPids1), lists:sort(WPids2)),
+ ?assert(lists:all(fun(Pid) -> is_process_alive(Pid) end, WPids2)).
+
+
+% Utility functions
+
+setup_callback(Mod) ->
+ catch meck:unload(Mod),
+ meck:new(Mod, [non_strict]),
+ meck:expect(Mod, build_indices, 2, []),
+ fabric2_index:register_index(Mod).
+
+
+reset_callbacks() ->
+ Mods = application:get_env(fabric, indices, []),
+ application:set_env(fabric, indices, []),
+ lists:foreach(fun(M) ->
+ catch meck:reset(M),
+ catch meck:unload(M)
+ end, Mods).
+
+
+tid(Id) when is_integer(Id) ->
+ TableName = "fabric2_index_" ++ integer_to_list(Id),
+ list_to_existing_atom(TableName).
+
+
+table_sizes() ->
+ Sizes = [ets:info(tid(N), size) || N <- lists:seq(0, ?SHARDS - 1)],
+ lists:sum(Sizes).
+
+
+create_docs(Db, Count) ->
+ lists:map(fun(_) ->
+ {DocId, _RevStr} = create_doc(Db),
+ DocId
+ end, lists:seq(1, Count)).
+
+
+create_doc(Db) ->
+ create_doc(Db, fabric2_util:uuid()).
+
+
+create_doc(Db, DocId) ->
+ create_doc(Db, DocId, {[]}).
+
+
+create_doc(Db, DocId, Body) ->
+ Doc = #doc{
+ id = DocId,
+ body = Body
+ },
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc, []),
+ {DocId, {Pos, Rev}}.