path: root/src/nouveau/src/nouveau_index_updater.erl
diff options
Diffstat (limited to 'src/nouveau/src/nouveau_index_updater.erl')
1 files changed, 138 insertions, 0 deletions
diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl
new file mode 100644
index 000000000..af39faecf
--- /dev/null
+++ b/src/nouveau/src/nouveau_index_updater.erl
@@ -0,0 +1,138 @@
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% public api
+%% callbacks
+-import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]).
+-import(nouveau_util, [index_path/1]).
+outdated(#index{} = Index) ->
+ case open_or_create_index(Index) of
+ {ok, IndexSeq} ->
+ DbSeq = get_db_seq(Index),
+ DbSeq > IndexSeq;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+update(#index{} = Index) ->
+ {ok, Db} = couch_db:open_int(Index#index.dbname, []),
+ try
+ case open_or_create_index(Index) of
+ {error, Reason} ->
+ exit({error, Reason});
+ {ok, CurSeq} ->
+ TotalChanges = couch_db:count_changes_since(Db, CurSeq),
+ couch_task_status:add_task([
+ {type, search_indexer},
+ {database, Index#index.dbname},
+ {design_document, Index#index.ddoc_id},
+ {index,},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]),
+ %% update status every half second
+ couch_task_status:set_update_frequency(500),
+ Proc = get_os_process(Index#index.def_lang),
+ try
+ true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
+ Acc0 = {Db, Index, Proc, 0, TotalChanges},
+ {ok, _} = couch_db:fold_changes(Db, CurSeq, fun load_docs/2, Acc0, [])
+ after
+ ret_os_process(Proc)
+ end
+ end
+ after
+ couch_db:close(Db)
+ end.
+load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
+ {ok, Acc};
+load_docs(FDI, {Db, Index, Proc, ChangesDone, TotalChanges}) ->
+ couch_task_status:update([
+ {changes_done, ChangesDone}, {progress, (ChangesDone * 100) div TotalChanges}
+ ]),
+ DI = couch_doc:to_doc_info(FDI),
+ #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
+ case Del of
+ true ->
+ ok = nouveau_api:delete_doc(Index, Id, Seq);
+ false ->
+ {ok, Doc} = couch_db:open_doc(Db, DI, []),
+ Json = couch_doc:to_json_obj(Doc, []),
+ [Fields | _] = proc_prompt(Proc, [<<"nouveau_index_doc">>, Json]),
+ case Fields of
+ [] ->
+ ok = nouveau_api:delete_doc(Index, Id, Seq);
+ _ ->
+ case nouveau_api:update_doc(Index, Id, Seq, Fields) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ exit({error, Reason})
+ end
+ end
+ end,
+ {ok, {Db, Index, Proc, ChangesDone + 1, TotalChanges}}.
+open_or_create_index(#index{} = Index) ->
+ case get_index_seq(Index) of
+ {ok, UpdateSeq} ->
+ {ok, UpdateSeq};
+ {error, {not_found, _}} ->
+ case nouveau_api:create_index(Index, index_definition(Index)) of
+ ok ->
+ {ok, 0};
+ {error, Reason} ->
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+get_db_seq(#index{} = Index) ->
+ {ok, Db} = couch_db:open_int(Index#index.dbname, []),
+ try
+ couch_db:get_update_seq(Db)
+ after
+ couch_db:close(Db)
+ end.
+get_index_seq(#index{} = Index) ->
+ case nouveau_api:index_info(Index) of
+ {ok, #{<<"update_seq">> := Seq}} ->
+ {ok, Seq};
+ {error, Reason} ->
+ {error, Reason}
+ end.
+index_definition(#index{} = Index) ->
+ #{
+ <<"default_analyzer">> => Index#index.default_analyzer,
+ <<"field_analyzers">> => Index#index.field_analyzers
+ }.