diff options
author | Garren Smith <garren.smith@gmail.com> | 2020-02-05 14:06:18 +0200 |
---|---|---|
committer | Garren Smith <garren.smith@gmail.com> | 2020-02-05 14:06:18 +0200 |
commit | f337fc6df4f7d6123ed496170f62e053320ead59 (patch) | |
tree | c7f17cff85c6bb6bfca2b74bccd7cfb212da4e8a | |
parent | b29c11efb99852352913fb685b54ccffed617239 (diff) | |
download | couchdb-f337fc6df4f7d6123ed496170f62e053320ead59.tar.gz |
background indexing for mango
-rw-r--r-- | src/couch_views/src/couch_views_indexer.erl | 3 | ||||
-rw-r--r-- | src/fabric/src/fabric2_fdb.erl | 12 | ||||
-rw-r--r-- | src/mango/src/mango.hrl | 10 | ||||
-rw-r--r-- | src/mango/src/mango_fdb.erl | 128 | ||||
-rw-r--r-- | src/mango/src/mango_idx.erl | 19 | ||||
-rw-r--r-- | src/mango/src/mango_idx.hrl | 3 | ||||
-rw-r--r-- | src/mango/src/mango_idx_view.erl | 3 | ||||
-rw-r--r-- | src/mango/src/mango_indexer.erl | 24 | ||||
-rw-r--r-- | src/mango/src/mango_indexer_server.erl | 103 | ||||
-rw-r--r-- | src/mango/src/mango_jobs.erl | 53 | ||||
-rw-r--r-- | src/mango/src/mango_jobs_indexer.erl | 358 | ||||
-rw-r--r-- | src/mango/src/mango_sup.erl | 14 | ||||
-rw-r--r-- | src/mango/test/01-index-crud-test.py | 1 | ||||
-rw-r--r-- | src/mango/test/eunit/mango_indexer_test.erl | 5 | ||||
-rw-r--r-- | src/mango/test/eunit/mango_jobs_indexer_test.erl | 189 | ||||
-rw-r--r-- | src/mango/test/mango.py | 8 |
16 files changed, 877 insertions, 56 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl index 75e4b368f..745f7e08b 100644 --- a/src/couch_views/src/couch_views_indexer.erl +++ b/src/couch_views/src/couch_views_indexer.erl @@ -18,7 +18,8 @@ -export([ - init/0 + init/0, + fetch_docs/2 ]). -include("couch_views.hrl"). diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl index a814742eb..6d21ba1c8 100644 --- a/src/fabric/src/fabric2_fdb.erl +++ b/src/fabric/src/fabric2_fdb.erl @@ -60,6 +60,8 @@ seq_to_vs/1, next_vs/1, + new_versionstamp/1, + debug_cluster/0, debug_cluster/2 ]). @@ -872,6 +874,11 @@ next_vs({versionstamp, VS, Batch, TxId}) -> {versionstamp, V, B, T}. +new_versionstamp(Tx) -> + TxId = erlfdb:get_next_tx_id(Tx), + {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. + + debug_cluster() -> debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>). @@ -1476,11 +1483,6 @@ get_transaction_id(Tx, LayerPrefix) -> end. -new_versionstamp(Tx) -> - TxId = erlfdb:get_next_tx_id(Tx), - {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}. - - on_commit(Tx, Fun) when is_function(Fun, 0) -> % Here we rely on Tx objects matching. However they contain a nif resource % object. Before Erlang 20.0 those would have been represented as empty diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl index d3445a857..a1f932582 100644 --- a/src/mango/src/mango.hrl +++ b/src/mango/src/mango.hrl @@ -12,5 +12,11 @@ -define(MANGO_ERROR(R), throw({mango_error, ?MODULE, R})). --define(MANGO_IDX_BUILD_STATUS, 0). --define(MANGO_IDX_RANGE, 1). +-define(MANGO_IDX_BUILD_STATUS, 1). +-define(MANGO_UPDATE_SEQ, 2). +-define(MANGO_IDX_RANGE, 3). + +-define(MANGO_INDEX_JOB_TYPE, <<"mango">>). + +-define(MANGO_INDEX_BUILDING, <<"building">>). +-define(MANGO_INDEX_READY, <<"ready">>). diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl index def942fa3..a54d6582f 100644 --- a/src/mango/src/mango_fdb.erl +++ b/src/mango/src/mango_fdb.erl @@ -22,13 +22,104 @@ -export([ - query_all_docs/4, + create_build_vs/2, + set_build_vs/4, + get_build_vs/2, + get_build_status/2, + get_update_seq/2, + set_update_seq/3, remove_doc/3, write_doc/3, + query_all_docs/4, query/4 ]). +create_build_vs(TxDb, #idx{} = Idx) -> + #{ + tx := Tx + } = TxDb, + Key = build_vs_key(TxDb, Idx#idx.ddoc), + VS = fabric2_fdb:new_versionstamp(Tx), + Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}), + erlfdb:set_versionstamped_value(Tx, Key, Value). + + +set_build_vs(TxDb, #idx{} = Idx, VS, State) -> + #{ + tx := Tx + } = TxDb, + + Key = build_vs_key(TxDb, Idx#idx.ddoc), + Value = erlfdb_tuple:pack({VS, State}), + ok = erlfdb:set(Tx, Key, Value). + + +get_build_vs(TxDb, #idx{} = Idx) -> + get_build_vs(TxDb, Idx#idx.ddoc); + +get_build_vs(TxDb, DDoc) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + Key = build_vs_key(TxDb, DDoc), + EV = erlfdb:wait(erlfdb:get(Tx, Key)), + case EV of + not_found -> not_found; + EV -> erlfdb_tuple:unpack(EV) + end. + + +get_build_status(TxDb, DDoc) -> + case get_build_vs(TxDb, DDoc) of + not_found -> ?MANGO_INDEX_BUILDING; + {_, BuildState} -> BuildState + end. + + +get_update_seq(TxDb, #idx{ddoc = DDoc}) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + + case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, DDoc))) of + not_found -> <<>>; + UpdateSeq -> UpdateSeq + end. + + +set_update_seq(TxDb, #idx{ddoc = DDoc}, Seq) -> + #{ + tx := Tx, + db_prefix := DbPrefix + } = TxDb, + ok = erlfdb:set(Tx, seq_key(DbPrefix, DDoc), Seq). + + +remove_doc(TxDb, DocId, IdxResults) -> + lists:foreach(fun (IdxResult) -> + #{ + ddoc_id := DDocId, + results := Results + } = IdxResult, + MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId), + clear_key(TxDb, MangoIdxPrefix, Results, DocId) + end, IdxResults). + + +write_doc(TxDb, DocId, IdxResults) -> + lists:foreach(fun (IdxResult) -> + #{ + ddoc_id := DDocId, + results := Results + } = IdxResult, + MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId), + add_key(TxDb, MangoIdxPrefix, Results, DocId) + end, IdxResults). + + query_all_docs(Db, CallBack, Cursor, Args) -> Opts = args_to_fdb_opts(Args) ++ [{include_docs, true}], fabric2_db:fold_docs(Db, CallBack, Cursor, Opts). @@ -133,7 +224,7 @@ fold_cb({Key, _}, Acc) -> {{_, DocId}} = erlfdb_tuple:unpack(Key, MangoIdxPrefix), {ok, Doc} = fabric2_db:open_doc(Db, DocId), JSONDoc = couch_doc:to_json_obj(Doc, []), - io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]), +%% io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]), case Callback({doc, JSONDoc}, Cursor) of {ok, Cursor1} -> Acc#{ @@ -144,33 +235,24 @@ fold_cb({Key, _}, Acc) -> end. -remove_doc(TxDb, DocId, IdxResults) -> - lists:foreach(fun (IdxResult) -> - #{ - ddoc_id := DDocId, - results := Results - } = IdxResult, - MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId), - clear_key(TxDb, MangoIdxPrefix, Results, DocId) - end, IdxResults). +mango_idx_prefix(TxDb, Id) -> + #{ + db_prefix := DbPrefix + } = TxDb, + Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE}, + erlfdb_tuple:pack(Key, DbPrefix). -write_doc(TxDb, DocId, IdxResults) -> - lists:foreach(fun (IdxResult) -> - #{ - ddoc_id := DDocId, - results := Results - } = IdxResult, - MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId), - add_key(TxDb, MangoIdxPrefix, Results, DocId) - end, IdxResults). +seq_key(DbPrefix, DDoc) -> + Key = {?DB_MANGO, DDoc, ?MANGO_UPDATE_SEQ}, + erlfdb_tuple:pack(Key, DbPrefix). -mango_idx_prefix(TxDb, Id) -> +build_vs_key(Db, DDoc) -> #{ db_prefix := DbPrefix - } = TxDb, - Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE}, + } = Db, + Key = {?DB_MANGO, DDoc, ?MANGO_IDX_BUILD_STATUS}, erlfdb_tuple:pack(Key, DbPrefix). diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl index cf3f5079c..3aadd49d6 100644 --- a/src/mango/src/mango_idx.erl +++ b/src/mango/src/mango_idx.erl @@ -58,7 +58,7 @@ list(Db) -> rows => [] }, {ok, Indexes} = fabric2_db:fold_design_docs(Db, fun ddoc_fold_cb/2, Acc0, []), - io:format("INDEXES ~p ~n", [Indexes]), +%% io:format("INDEXES ~p ~n", [Indexes]), Indexes ++ special(Db). @@ -237,13 +237,16 @@ from_ddoc(Db, {Props}) -> %% [mango_idx_view] %% end, Idxs = lists:flatmap(fun(Mod) -> Mod:from_ddoc({Props}) end, IdxMods), - lists:map(fun(Idx) -> - Idx#idx{ - dbname = DbName, - ddoc = DDoc, - partitioned = get_idx_partitioned(Db, Props) - } - end, Idxs). + fabric2_fdb:transactional(Db, fun(TxDb) -> + lists:map(fun(Idx) -> + Idx#idx{ + dbname = DbName, + ddoc = DDoc, + partitioned = get_idx_partitioned(Db, Props), + build_status = mango_fdb:get_build_status(TxDb, DDoc) + } + end, Idxs) + end). special(Db) -> diff --git a/src/mango/src/mango_idx.hrl b/src/mango/src/mango_idx.hrl index 97259500b..f5f827b22 100644 --- a/src/mango/src/mango_idx.hrl +++ b/src/mango/src/mango_idx.hrl @@ -17,5 +17,6 @@ type, def, partitioned, - opts + opts, + build_status }). diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl index 5ec2a1020..949c69bdf 100644 --- a/src/mango/src/mango_idx_view.erl +++ b/src/mango/src/mango_idx_view.erl @@ -105,7 +105,8 @@ to_json(Idx) -> {name, Idx#idx.name}, {type, Idx#idx.type}, {partitioned, Idx#idx.partitioned}, - {def, {def_to_json(Idx#idx.def)}} + {def, {def_to_json(Idx#idx.def)}}, + {build_status, Idx#idx.build_status} ]}. diff --git a/src/mango/src/mango_indexer.erl b/src/mango/src/mango_indexer.erl index c22b9cfeb..c7632a7f1 100644 --- a/src/mango/src/mango_indexer.erl +++ b/src/mango/src/mango_indexer.erl @@ -17,11 +17,14 @@ -export([ create_doc/2, update_doc/3, - delete_doc/2 + delete_doc/2, + + write_doc/3 ]). -include_lib("couch/include/couch_db.hrl"). +-include("mango.hrl"). -include("mango_idx.hrl"). @@ -42,7 +45,7 @@ modify(Db, Change, Doc, PrevDoc) -> modify_int(Db, Change, Doc, PrevDoc) catch Error:Reason -> - io:format("ERROR ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]), + io:format("ERROR INDEXER ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]), #{ name := DbName } = Db, @@ -66,9 +69,16 @@ doc_id(#doc{id = DocId}, _) -> % Design doc % Todo: Check if design doc is mango index and kick off background worker % to build new index -modify_int(_Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, +modify_int(Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _PrevDoc) -> - ok; + {Props} = JSONDoc = couch_doc:to_json_obj(Doc, []), + case proplists:get_value(<<"language">>, Props) of + <<"query">> -> + [Idx] = mango_idx:from_ddoc(Db, JSONDoc), + {ok, _} = mango_jobs:build_index(Db, Idx); + _ -> + ok + end; modify_int(Db, delete, _, PrevDoc) -> remove_doc(Db, PrevDoc, json_indexes(Db)); @@ -138,15 +148,13 @@ get_index_entries(IdxDef, Doc) -> get_index_values(Fields, Doc) -> - Out1 = lists:map(fun({Field, _Dir}) -> + lists:map(fun({Field, _Dir}) -> case mango_doc:get_field(Doc, Field) of not_found -> not_found; bad_path -> not_found; Value -> Value end - end, Fields), - io:format("OUT ~p ~p ~n", [Fields, Out1]), - Out1. + end, Fields). get_index_partial_filter_selector(IdxDef) -> diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl new file mode 100644 index 000000000..29530bb9a --- /dev/null +++ b/src/mango/src/mango_indexer_server.erl @@ -0,0 +1,103 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mango_indexer_server). + + +-behaviour(gen_server). + + +-export([ + start_link/0 +]). + + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + + +-define(MAX_WORKERS, 1). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +init(_) -> + process_flag(trap_exit, true), + mango_jobs:set_timeout(), + St = #{ + workers => #{}, + max_workers => max_workers() + }, + {ok, spawn_workers(St)}. + + +terminate(_, _St) -> + ok. + + +handle_call(Msg, _From, St) -> + {stop, {bad_call, Msg}, {bad_call, Msg}, St}. + + +handle_cast(Msg, St) -> + {stop, {bad_cast, Msg}, St}. + + +handle_info({'EXIT', Pid, Reason}, St) -> + #{workers := Workers} = St, + case maps:is_key(Pid, Workers) of + true -> + if Reason == normal -> ok; true -> + LogMsg = "~p : indexer process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]) + end, + NewWorkers = maps:remove(Pid, Workers), + {noreply, spawn_workers(St#{workers := NewWorkers})}; + false -> + LogMsg = "~p : unknown process ~p exited with ~p", + couch_log:error(LogMsg, [?MODULE, Pid, Reason]), + {stop, {unknown_pid_exit, Pid}, St} + end; + +handle_info(Msg, St) -> + {stop, {bad_info, Msg}, St}. + + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + + +spawn_workers(St) -> + #{ + workers := Workers, + max_workers := MaxWorkers + } = St, + case maps:size(Workers) < MaxWorkers of + true -> + Pid = mango_jobs_indexer:spawn_link(), + NewSt = St#{workers := Workers#{Pid => true}}, + spawn_workers(NewSt); + false -> + St + end. + + +max_workers() -> + config:get_integer("mango", "max_workers", ?MAX_WORKERS). diff --git a/src/mango/src/mango_jobs.erl b/src/mango/src/mango_jobs.erl new file mode 100644 index 000000000..6739d620e --- /dev/null +++ b/src/mango/src/mango_jobs.erl @@ -0,0 +1,53 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under + + +-module(mango_jobs). + +-include("mango_idx.hrl"). +-include("mango.hrl"). + + +-export([ + set_timeout/0, + build_index/2 +]). + + +set_timeout() -> + couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 6). + + +build_index(TxDb, #idx{} = Idx) -> + #{ + tx := Tx + } = TxDb, + + mango_fdb:create_build_vs(TxDb, Idx), + + JobId = job_id(TxDb, Idx), + JobData = job_data(TxDb, Idx), + ok = couch_jobs:add(undefined, ?MANGO_INDEX_JOB_TYPE, JobId, JobData), + {ok, JobId}. + + +job_id(#{name := DbName}, #idx{ddoc = DDoc}) -> + <<DbName/binary, "-",DDoc/binary>>. + + +job_data(Db, Idx) -> + #{ + db_name => fabric2_db:name(Db), + ddoc_id => mango_idx:ddoc(Idx), + columns => mango_idx:columns(Idx), + retries => 0 + }. + diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl new file mode 100644 index 000000000..ce6b8506c --- /dev/null +++ b/src/mango/src/mango_jobs_indexer.erl @@ -0,0 +1,358 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% Todo: this is a copy-pasta of couch_views_indexer +% We need to make the indexing generic and have only the specific mango +% logic here +-module(mango_jobs_indexer). + +-export([ + spawn_link/0 +]). + + +-export([ + init/0 +]). + +-include("mango.hrl"). +-include("mango_idx.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("fabric/include/fabric2.hrl"). + + +spawn_link() -> + proc_lib:spawn_link(?MODULE, init, []). + + +init() -> + {ok, Job, Data} = couch_jobs:accept(?MANGO_INDEX_JOB_TYPE, #{}), + #{ + <<"db_name">> := DbName, + <<"ddoc_id">> := DDocId, + <<"columns">> := JobColumns, + <<"retries">> := Retries + } = Data, + + {ok, Db} = try + fabric2_db:open(DbName, [?ADMIN_CTX]) + catch error:database_does_not_exist -> + couch_jobs:finish(undefined, Job, Data#{ + error => db_deleted, + reason => "Database was deleted" + }), + exit(normal) + end, + + [Idx] = case fabric2_db:open_doc(Db, DDocId) of + {ok, DDoc} -> + JSONDDoc = couch_doc:to_json_obj(DDoc, []), + mango_idx:from_ddoc(Db, JSONDDoc); + {not_found, _} -> + couch_jobs:finish(undefined, Job, Data#{ + error => ddoc_deleted, + reason => "Design document was deleted" + }), + exit(normal) + end, + + Columns = mango_idx:columns(Idx), + + if JobColumns == Columns -> ok; true -> + couch_jobs:finish(undefined, Job, Data#{ + error => index_changed, + reason => <<"Design document was modified">> + }), + exit(normal) + end, + + + State = #{ + tx_db => undefined, + idx_vs => undefined, + idx_seq => undefined, + last_seq => undefined, + job => Job, + job_data => Data, + count => 0, + limit => num_changes(), + doc_acc => [] + }, + + try + update(Db, Idx, State) + catch + exit:normal -> + ok; + Error:Reason -> + io:format("ERROR in index worker ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]), + NewRetry = Retries + 1, + RetryLimit = retry_limit(), + + case should_retry(NewRetry, RetryLimit, Reason) of + true -> + DataErr = Data#{<<"retries">> := NewRetry}, + StateErr = State#{job_data := DataErr}, + report_progress(StateErr, update); + false -> + NewData = add_error(Error, Reason, Data), + couch_jobs:finish(undefined, Job, NewData), + exit(normal) + end + end. + + +% Transaction limit exceeded don't retry +should_retry(_, _, {erlfdb_error, 2101}) -> + false; + +should_retry(Retries, RetryLimit, _) when Retries < RetryLimit -> + true; + +should_retry(_, _, _) -> + false. + + +add_error(error, {erlfdb_error, Code}, Data) -> + CodeBin = couch_util:to_binary(Code), + CodeString = erlfdb:get_error_string(Code), + Data#{ + error => foundationdb_error, + reason => list_to_binary([CodeBin, <<"-">>, CodeString]) + }; + +add_error(Error, Reason, Data) -> + Data#{ + error => couch_util:to_binary(Error), + reason => couch_util:to_binary(Reason) + }. + + +update(#{} = Db, #idx{} = Idx, State0) -> + {Idx2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) -> + % In the first iteration of update we need + % to populate our db and view sequences + State1 = case State0 of + #{idx_vs := undefined} -> + #{ + job := Job, + job_data := Data + } = State0, + + {IdxVS, BuildState} = mango_fdb:get_build_vs(TxDb, Idx), + if BuildState == ?MANGO_INDEX_BUILDING -> ok; true -> + couch_jobs:finish(undefined, Job, Data#{ + error => index_built, + reason => <<"Index is already built">> + }), + exit(normal) + end, + + IdxSeq = mango_fdb:get_update_seq(TxDb, Idx), + + State0#{ + tx_db := TxDb, + idx_vs := IdxVS, + idx_seq := IdxSeq + }; + _ -> + State0#{ + tx_db := TxDb + } + end, + + {ok, State2} = fold_changes(State1), + + #{ + idx_vs := IdxVS1, + count := Count, + limit := Limit, + doc_acc := DocAcc, + idx_seq := IdxSeq1 + } = State2, + + DocAcc1 = couch_views_indexer:fetch_docs(TxDb, DocAcc), + index_docs(TxDb, Idx, DocAcc1), + mango_fdb:set_update_seq(TxDb, Idx, IdxSeq1), + case Count < Limit of + true -> + mango_fdb:set_build_vs(TxDb, Idx, IdxVS1, ?MANGO_INDEX_READY), + report_progress(State2, finished), + {Idx, finished}; + false -> + State3 = report_progress(State2, update), + {Idx, State3#{ + tx_db := undefined, + count := 0, + doc_acc := [], + idx_seq := IdxSeq1 + }} + end + end), + + case State4 of + finished -> + ok; + _ -> + update(Db, Idx2, State4) + end. + + +fold_changes(State) -> + #{ + idx_seq := SinceSeq, + limit := Limit, + tx_db := TxDb + } = State, + + Fun = fun process_changes/2, + fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]). + + +process_changes(Change, Acc) -> + #{ + doc_acc := DocAcc, + count := Count, + idx_vs := IdxVS + } = Acc, + + #{ + id := Id, + sequence := LastSeq + } = Change, + + DocVS = fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(LastSeq)), + + case IdxVS =< DocVS of + true -> + {stop, Acc}; + false -> + Acc1 = case Id of + <<?DESIGN_DOC_PREFIX, _/binary>> -> + maps:merge(Acc, #{ + count => Count + 1, + idx_seq => LastSeq + }); + _ -> + Acc#{ + doc_acc := DocAcc ++ [Change], + count := Count + 1, + idx_seq := LastSeq + } + end, + {ok, Acc1} + end. + + +index_docs(Db, Idx, Docs) -> + lists:foreach(fun (Doc) -> + index_doc(Db, Idx, Doc) + end, Docs). + + +index_doc(_Db, _Idx, #{deleted := true}) -> + ok; + +index_doc(Db, Idx, #{doc := Doc}) -> + mango_indexer:write_doc(Db, Doc, [Idx]). + + +%%fetch_docs(Db, Changes) -> +%% {Deleted, NotDeleted} = lists:partition(fun(Doc) -> +%% #{deleted := Deleted} = Doc, +%% Deleted +%% end, Changes), +%% +%% RevState = lists:foldl(fun(Change, Acc) -> +%% #{id := Id} = Change, +%% RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1), +%% Acc#{ +%% RevFuture => {Id, Change} +%% } +%% end, #{}, NotDeleted), +%% +%% RevFutures = maps:keys(RevState), +%% BodyState = lists:foldl(fun(RevFuture, Acc) -> +%% {Id, Change} = maps:get(RevFuture, RevState), +%% Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture), +%% +%% % I'm assuming that in this changes transaction that the winning +%% % doc body exists since it is listed in the changes feed as not deleted +%% #{winner := true} = RevInfo = lists:last(Revs), +%% BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo), +%% Acc#{ +%% BodyFuture => {Id, RevInfo, Change} +%% } +%% end, #{}, erlfdb:wait_for_all(RevFutures)), +%% +%% BodyFutures = maps:keys(BodyState), +%% ChangesWithDocs = lists:map(fun (BodyFuture) -> +%% {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState), +%% Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture), +%% Change#{doc => Doc} +%% end, erlfdb:wait_for_all(BodyFutures)), +%% +%% % This combines the deleted changes with the changes that contain docs +%% % Important to note that this is now unsorted. Which is fine for now +%% % But later could be an issue if we split this across transactions +%% Deleted ++ ChangesWithDocs. + + +report_progress(State, UpdateType) -> + #{ + tx_db := TxDb, + job := Job1, + job_data := JobData + } = State, + + #{ + <<"db_name">> := DbName, + <<"ddoc_id">> := DDocId, + <<"columns">> := Columns, + <<"retries">> := Retries + } = JobData, + + % Reconstruct from scratch to remove any + % possible existing error state. + NewData = #{ + <<"db_name">> => DbName, + <<"ddoc_id">> => DDocId, + <<"columns">> => Columns, + <<"retries">> => Retries + }, + + case UpdateType of + update -> + case couch_jobs:update(TxDb, Job1, NewData) of + {ok, Job2} -> + State#{job := Job2}; + {error, halt} -> + couch_log:error("~s job halted :: ~w", [?MODULE, Job1]), + exit(normal) + end; + finished -> + case couch_jobs:finish(TxDb, Job1, NewData) of + ok -> + State; + {error, halt} -> + couch_log:error("~s job halted :: ~w", [?MODULE, Job1]), + exit(normal) + end + end. + + +num_changes() -> + config:get_integer("mango", "change_limit", 100). + + +retry_limit() -> + config:get_integer("mango", "retry_limit", 3). diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl index b0dedf125..fc12dfe8e 100644 --- a/src/mango/src/mango_sup.erl +++ b/src/mango/src/mango_sup.erl @@ -21,4 +21,16 @@ start_link(Args) -> supervisor:start_link({local,?MODULE}, ?MODULE, Args). init([]) -> - {ok, {{one_for_one, 3, 10}, couch_epi:register_service(mango_epi, [])}}. + Flags = #{ + strategy => one_for_one, + intensity => 3, + period => 10 + }, + + Children = [ + #{ + id => mango_indexer_server, + start => {mango_indexer_server, start_link, []} + } + ] ++ couch_epi:register_service(mango_epi, []), + {ok, {Flags, Children}}. diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py index dd9ab1ade..3434c6611 100644 --- a/src/mango/test/01-index-crud-test.py +++ b/src/mango/test/01-index-crud-test.py @@ -91,6 +91,7 @@ class IndexCrudTests(mango.DbPerClass): for idx in self.db.list_indexes(): if idx["name"] != "idx_01": continue + print(idx) self.assertEqual(idx["def"]["fields"], [{"foo": "asc"}, {"bar": "asc"}]) return raise AssertionError("index not created") diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_indexer_test.erl index 778caea50..ee24b21c5 100644 --- a/src/mango/test/eunit/mango_indexer_test.erl +++ b/src/mango/test/eunit/mango_indexer_test.erl @@ -41,10 +41,7 @@ indexer_test_() -> setup() -> Ctx = test_util:start_couch([ - fabric, - couch_jobs, - couch_js, - couch_views + fabric ]), Ctx. diff --git a/src/mango/test/eunit/mango_jobs_indexer_test.erl b/src/mango/test/eunit/mango_jobs_indexer_test.erl new file mode 100644 index 000000000..7a8cb248a --- /dev/null +++ b/src/mango/test/eunit/mango_jobs_indexer_test.erl @@ -0,0 +1,189 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mango_jobs_indexer_test). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("mango/src/mango.hrl"). +-include_lib("mango/src/mango_cursor.hrl"). +-include_lib("mango/src/mango_idx.hrl"). + +-include_lib("fabric/test/fabric2_test.hrl"). + +indexer_test_() -> + { + "Test indexing", + { + setup, + fun setup/0, + fun cleanup/1, + { + foreach, + fun foreach_setup/0, + fun foreach_teardown/1, + [ + with([?TDEF(index_docs)]), + with([?TDEF(index_lots_of_docs, 10)]), + with([?TDEF(index_can_recover_from_crash, 60)]) + ] + } + } + }. + + +setup() -> + Ctx = test_util:start_couch([ + fabric, + couch_jobs, + mango + ]), +%% couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 1), + Ctx. + + +cleanup(Ctx) -> + test_util:stop_couch(Ctx). + + +foreach_setup() -> + DbName = ?tempdb(), + {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]), + Db. + + +foreach_teardown(Db) -> + meck:unload(), + ok = fabric2_db:delete(fabric2_db:name(Db), []). + + +index_docs(Db) -> + DDoc = generate_docs(Db, 5), + wait_while_ddoc_builds(Db), + Docs = run_query(Db, DDoc), + ?assertEqual([ + [{id, <<"1">>}, {value, 1}], + [{id, <<"2">>}, {value, 2}], + [{id, <<"3">>}, {value, 3}], + [{id, <<"4">>}, {value, 4}], + [{id, <<"5">>}, {value, 5}] +], Docs). + + +index_lots_of_docs(Db) -> + DDoc = generate_docs(Db, 150), + wait_while_ddoc_builds(Db), + Docs = run_query(Db, DDoc), + ?assertEqual(length(Docs), 150). + + +index_can_recover_from_crash(Db) -> + meck:new(mango_indexer, [passthrough]), + meck:expect(mango_indexer, write_doc, fun (Db, Doc, Idxs) -> + ?debugFmt("doc ~p ~p ~n", [Doc, Idxs]), + Id = Doc#doc.id, + case Id == <<"2">> of + true -> + meck:unload(mango_indexer), + throw({fake_crash, test_jobs_restart}); + false -> + meck:passthrough([Db, Doc, Idxs]) + end + end), + DDoc = generate_docs(Db, 3), + wait_while_ddoc_builds(Db), + Docs = run_query(Db, DDoc), + ?assertEqual([ + [{id, <<"1">>}, {value, 1}], + [{id, <<"2">>}, {value, 2}], + [{id, <<"3">>}, {value, 3}] + ], Docs). + + +wait_while_ddoc_builds(Db) -> + fabric2_fdb:transactional(Db, fun(TxDb) -> + Idxs = mango_idx:list(TxDb), + [Idx] = lists:filter(fun (Idx) -> Idx#idx.type == <<"json">> end, Idxs), + if Idx#idx.build_status == ?MANGO_INDEX_READY -> ok; true -> + timer:sleep(100), + wait_while_ddoc_builds(Db) + end + end). + + +run_query(Db, DDoc) -> + Args = #{ + start_key => [], + start_key_docid => <<>>, + end_key => [], + end_key_docid => <<255>>, + dir => fwd, + skip => 0 + }, + [Idx] = mango_idx:from_ddoc(Db, DDoc), + Cursor = #cursor{ + db = Db, + index = Idx, + user_acc = [] + }, + {ok, Cursor1} = mango_fdb:query(Db, fun query_cb/2, Cursor, Args), + Acc = Cursor1#cursor.user_acc, + lists:map(fun ({Props}) -> + [ + {id, couch_util:get_value(<<"_id">>, Props)}, + {value, couch_util:get_value(<<"value">>, Props)} + ] + + end, Acc). + + +generate_docs(Db, Count) -> + Docs = make_docs(Count), + fabric2_db:update_docs(Db, Docs), + + + DDoc = create_idx_ddoc(Db), + fabric2_db:update_docs(Db, [DDoc]), + couch_doc:to_json_obj(DDoc, []). + + +create_idx_ddoc(Db) -> + Opts = [ + {def, {[{<<"fields">>,{[{<<"value">>,<<"asc">>}]}}]}}, + {type, <<"json">>}, + {name, <<"idx_01">>}, + {ddoc, auto_name}, + {w, 3}, + {partitioned, db_default} + ], + + {ok, Idx} = mango_idx:new(Db, Opts), + {ok, DDoc} = mango_util:load_ddoc(Db, mango_idx:ddoc(Idx), []), + {ok, NewDDoc} = mango_idx:add(DDoc, Idx), + NewDDoc. + + +make_docs(Count) -> + [doc(I) || I <- lists:seq(1, Count)]. + + +doc(Id) -> + couch_doc:from_json_obj({[ + {<<"_id">>, list_to_binary(integer_to_list(Id))}, + {<<"value">>, Id} + ]}). + + +query_cb({doc, Doc}, #cursor{user_acc = Acc} = Cursor) -> + {ok, Cursor#cursor{ + user_acc = Acc ++ [Doc] + }}. diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py index a39476dec..92cf2113a 100644 --- a/src/mango/test/mango.py +++ b/src/mango/test/mango.py @@ -161,8 +161,12 @@ class Database(object): created = r.json()["result"] == "created" if created: - # wait until the database reports the index as available - while len(self.get_index(r.json()["id"], r.json()["name"])) < 1: + # wait until the database reports the index as available and build + while len([ + i + for i in self.get_index(r.json()["id"], r.json()["name"]) + if i["build_status"] == "ready" + ]) < 1: delay(t=0.1) return created |