diff options
Diffstat (limited to 'src/dreyfus/src/dreyfus_index_updater.erl')
-rw-r--r-- | src/dreyfus/src/dreyfus_index_updater.erl | 184 |
1 files changed, 0 insertions, 184 deletions
diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl deleted file mode 100644 index 6edc5a257..000000000 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ /dev/null @@ -1,184 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - -%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- - --module(dreyfus_index_updater). --include_lib("couch/include/couch_db.hrl"). --include("dreyfus.hrl"). - --export([update/2, load_docs/2]). - --import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). - -update(IndexPid, Index) -> - #index{ - current_seq = CurSeq, - dbname = DbName, - ddoc_id = DDocId, - name = IndexName - } = Index, - erlang:put(io_priority, {search, DbName, IndexName}), - {ok, Db} = couch_db:open_int(DbName, []), - try - TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq), - TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid), - TotalChanges = TotalUpdateChanges + TotalPurgeChanges, - - couch_task_status:add_task([ - {type, search_indexer}, - {database, DbName}, - {design_document, DDocId}, - {index, IndexName}, - {progress, 0}, - {changes_done, 0}, - {total_changes, TotalChanges} - ]), - - %% update status every half second - couch_task_status:set_update_frequency(500), - - %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...] - %The Rev is the final Rev, not purged Rev. - {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index), - %% compute on all docs modified since we last computed. - - NewCurSeq = couch_db:get_update_seq(Db), - Proc = get_os_process(Index#index.def_lang), - try - true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), - EnumFun = fun ?MODULE:load_docs/2, - [Changes] = couch_task_status:get([changes_done]), - Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, - {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []), - ok = clouseau_rpc:commit(IndexPid, NewCurSeq) - after - ret_os_process(Proc) - end, - exit({updated, NewCurSeq}) - after - couch_db:close(Db) - end. - -load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) -> - couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]), - DI = couch_doc:to_doc_info(FDI), - #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI, - %check if it is processed in purge_index to avoid update the index again. - case lists:member({Id, Rev}, ExcludeIdRevs) of - true -> ok; - false -> update_or_delete_index(IndexPid, Db, DI, Proc) - end, - %% Force a commit every minute - case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of - true -> - ok = clouseau_rpc:commit(IndexPid, Seq), - {ok, {I + 1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}}; - false -> - {ok, setelement(1, Acc, I + 1)} - end. - -purge_index(Db, IndexPid, Index) -> - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), - Proc = get_os_process(Index#index.def_lang), - try - true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), - FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) -> - Acc0 = - case couch_db:get_full_doc_info(Db, Id) of - not_found -> - ok = clouseau_rpc:delete(IndexPid, Id), - Acc; - FDI -> - DI = couch_doc:to_doc_info(FDI), - #doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI, - case lists:member({Id, Rev}, Acc) of - true -> - Acc; - false -> - update_or_delete_index(IndexPid, Db, DI, Proc), - [{Id, Rev} | Acc] - end - end, - update_task(1), - {ok, {Acc0, PurgeSeq}} - end, - - {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos( - Db, IdxPurgeSeq, FoldFun, {[], 0}, [] - ), - clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq), - update_local_doc(Db, Index, NewPurgeSeq), - {ok, ExcludeList} - after - ret_os_process(Proc) - end. - -count_pending_purged_docs_since(Db, IndexPid) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), - DbPurgeSeq - IdxPurgeSeq. - -update_or_delete_index(IndexPid, Db, DI, Proc) -> - #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI, - case Del of - true -> - ok = clouseau_rpc:delete(IndexPid, Id); - false -> - case maybe_skip_doc(Db, Id) of - true -> - ok; - false -> - {ok, Doc} = couch_db:open_doc(Db, DI, []), - Json = couch_doc:to_json_obj(Doc, []), - [Fields | _] = proc_prompt(Proc, [<<"index_doc">>, Json]), - Fields1 = [list_to_tuple(Field) || Field <- Fields], - Fields2 = maybe_add_partition(Db, Id, Fields1), - case Fields2 of - [] -> ok = clouseau_rpc:delete(IndexPid, Id); - _ -> ok = clouseau_rpc:update(IndexPid, Id, Fields2) - end - end - end. - -update_local_doc(Db, Index, PurgeSeq) -> - DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig), - DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index), - couch_db:update_doc(Db, DocContent, []). - -update_task(NumChanges) -> - [Changes, Total] = couch_task_status:get([changes_done, total_changes]), - Changes2 = Changes + NumChanges, - Progress = - case Total of - 0 -> - 0; - _ -> - (Changes2 * 100) div Total - end, - couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]). - -maybe_skip_doc(Db, <<"_design/", _/binary>>) -> - couch_db:is_partitioned(Db); -maybe_skip_doc(_Db, _Id) -> - false. - -maybe_add_partition(_Db, _Id, []) -> - []; -maybe_add_partition(Db, Id, Fields) -> - case couch_db:is_partitioned(Db) of - true -> - Partition = couch_partition:from_docid(Id), - [{<<"_partition">>, Partition, {[]}} | Fields]; - false -> - Fields - end. |