path: root/src/dreyfus/src/dreyfus_index_updater.erl
diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_index_updater.erl')
1 files changed, 0 insertions, 184 deletions
diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl
deleted file mode 100644
index 6edc5a257..000000000
--- a/src/dreyfus/src/dreyfus_index_updater.erl
+++ /dev/null
@@ -1,184 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
--export([update/2, load_docs/2]).
--import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]).
-update(IndexPid, Index) ->
- #index{
- current_seq = CurSeq,
- dbname = DbName,
- ddoc_id = DDocId,
- name = IndexName
- } = Index,
- erlang:put(io_priority, {search, DbName, IndexName}),
- {ok, Db} = couch_db:open_int(DbName, []),
- try
- TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
- TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
- TotalChanges = TotalUpdateChanges + TotalPurgeChanges,
- couch_task_status:add_task([
- {type, search_indexer},
- {database, DbName},
- {design_document, DDocId},
- {index, IndexName},
- {progress, 0},
- {changes_done, 0},
- {total_changes, TotalChanges}
- ]),
- %% update status every half second
- couch_task_status:set_update_frequency(500),
- %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
- %The Rev is the final Rev, not purged Rev.
- {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
- %% compute on all docs modified since we last computed.
- NewCurSeq = couch_db:get_update_seq(Db),
- Proc = get_os_process(Index#index.def_lang),
- try
- true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
- EnumFun = fun ?MODULE:load_docs/2,
- [Changes] = couch_task_status:get([changes_done]),
- Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs},
- {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []),
- ok = clouseau_rpc:commit(IndexPid, NewCurSeq)
- after
- ret_os_process(Proc)
- end,
- exit({updated, NewCurSeq})
- after
- couch_db:close(Db)
- end.
-load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) ->
- couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]),
- DI = couch_doc:to_doc_info(FDI),
- #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI,
- %check if it is processed in purge_index to avoid update the index again.
- case lists:member({Id, Rev}, ExcludeIdRevs) of
- true -> ok;
- false -> update_or_delete_index(IndexPid, Db, DI, Proc)
- end,
- %% Force a commit every minute
- case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of
- true ->
- ok = clouseau_rpc:commit(IndexPid, Seq),
- {ok, {I + 1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}};
- false ->
- {ok, setelement(1, Acc, I + 1)}
- end.
-purge_index(Db, IndexPid, Index) ->
- {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
- Proc = get_os_process(Index#index.def_lang),
- try
- true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
- FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
- Acc0 =
- case couch_db:get_full_doc_info(Db, Id) of
- not_found ->
- ok = clouseau_rpc:delete(IndexPid, Id),
- Acc;
- FDI ->
- DI = couch_doc:to_doc_info(FDI),
- #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
- case lists:member({Id, Rev}, Acc) of
- true ->
- Acc;
- false ->
- update_or_delete_index(IndexPid, Db, DI, Proc),
- [{Id, Rev} | Acc]
- end
- end,
- update_task(1),
- {ok, {Acc0, PurgeSeq}}
- end,
- {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
- Db, IdxPurgeSeq, FoldFun, {[], 0}, []
- ),
- clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq),
- update_local_doc(Db, Index, NewPurgeSeq),
- {ok, ExcludeList}
- after
- ret_os_process(Proc)
- end.
-count_pending_purged_docs_since(Db, IndexPid) ->
- DbPurgeSeq = couch_db:get_purge_seq(Db),
- {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
- DbPurgeSeq - IdxPurgeSeq.
-update_or_delete_index(IndexPid, Db, DI, Proc) ->
- #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI,
- case Del of
- true ->
- ok = clouseau_rpc:delete(IndexPid, Id);
- false ->
- case maybe_skip_doc(Db, Id) of
- true ->
- ok;
- false ->
- {ok, Doc} = couch_db:open_doc(Db, DI, []),
- Json = couch_doc:to_json_obj(Doc, []),
- [Fields | _] = proc_prompt(Proc, [<<"index_doc">>, Json]),
- Fields1 = [list_to_tuple(Field) || Field <- Fields],
- Fields2 = maybe_add_partition(Db, Id, Fields1),
- case Fields2 of
- [] -> ok = clouseau_rpc:delete(IndexPid, Id);
- _ -> ok = clouseau_rpc:update(IndexPid, Id, Fields2)
- end
- end
- end.
-update_local_doc(Db, Index, PurgeSeq) ->
- DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
- DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index),
- couch_db:update_doc(Db, DocContent, []).
-update_task(NumChanges) ->
- [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
- Changes2 = Changes + NumChanges,
- Progress =
- case Total of
- 0 ->
- 0;
- _ ->
- (Changes2 * 100) div Total
- end,
- couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
-maybe_skip_doc(Db, <<"_design/", _/binary>>) ->
- couch_db:is_partitioned(Db);
-maybe_skip_doc(_Db, _Id) ->
- false.
-maybe_add_partition(_Db, _Id, []) ->
- [];
-maybe_add_partition(Db, Id, Fields) ->
- case couch_db:is_partitioned(Db) of
- true ->
- Partition = couch_partition:from_docid(Id),
- [{<<"_partition">>, Partition, {[]}} | Fields];
- false ->
- Fields
- end.