summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarren Smith <garren.smith@gmail.com>2020-02-05 14:06:18 +0200
committerGarren Smith <garren.smith@gmail.com>2020-02-05 14:06:18 +0200
commitf337fc6df4f7d6123ed496170f62e053320ead59 (patch)
treec7f17cff85c6bb6bfca2b74bccd7cfb212da4e8a
parentb29c11efb99852352913fb685b54ccffed617239 (diff)
downloadcouchdb-f337fc6df4f7d6123ed496170f62e053320ead59.tar.gz
background indexing for mango
-rw-r--r--src/couch_views/src/couch_views_indexer.erl3
-rw-r--r--src/fabric/src/fabric2_fdb.erl12
-rw-r--r--src/mango/src/mango.hrl10
-rw-r--r--src/mango/src/mango_fdb.erl128
-rw-r--r--src/mango/src/mango_idx.erl19
-rw-r--r--src/mango/src/mango_idx.hrl3
-rw-r--r--src/mango/src/mango_idx_view.erl3
-rw-r--r--src/mango/src/mango_indexer.erl24
-rw-r--r--src/mango/src/mango_indexer_server.erl103
-rw-r--r--src/mango/src/mango_jobs.erl53
-rw-r--r--src/mango/src/mango_jobs_indexer.erl358
-rw-r--r--src/mango/src/mango_sup.erl14
-rw-r--r--src/mango/test/01-index-crud-test.py1
-rw-r--r--src/mango/test/eunit/mango_indexer_test.erl5
-rw-r--r--src/mango/test/eunit/mango_jobs_indexer_test.erl189
-rw-r--r--src/mango/test/mango.py8
16 files changed, 877 insertions, 56 deletions
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 75e4b368f..745f7e08b 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -18,7 +18,8 @@
-export([
- init/0
+ init/0,
+ fetch_docs/2
]).
-include("couch_views.hrl").
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index a814742eb..6d21ba1c8 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -60,6 +60,8 @@
seq_to_vs/1,
next_vs/1,
+ new_versionstamp/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -872,6 +874,11 @@ next_vs({versionstamp, VS, Batch, TxId}) ->
{versionstamp, V, B, T}.
+new_versionstamp(Tx) ->
+ TxId = erlfdb:get_next_tx_id(Tx),
+ {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -1476,11 +1483,6 @@ get_transaction_id(Tx, LayerPrefix) ->
end.
-new_versionstamp(Tx) ->
- TxId = erlfdb:get_next_tx_id(Tx),
- {versionstamp, 16#FFFFFFFFFFFFFFFF, 16#FFFF, TxId}.
-
-
on_commit(Tx, Fun) when is_function(Fun, 0) ->
% Here we rely on Tx objects matching. However they contain a nif resource
% object. Before Erlang 20.0 those would have been represented as empty
diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl
index d3445a857..a1f932582 100644
--- a/src/mango/src/mango.hrl
+++ b/src/mango/src/mango.hrl
@@ -12,5 +12,11 @@
-define(MANGO_ERROR(R), throw({mango_error, ?MODULE, R})).
--define(MANGO_IDX_BUILD_STATUS, 0).
--define(MANGO_IDX_RANGE, 1).
+-define(MANGO_IDX_BUILD_STATUS, 1).
+-define(MANGO_UPDATE_SEQ, 2).
+-define(MANGO_IDX_RANGE, 3).
+
+-define(MANGO_INDEX_JOB_TYPE, <<"mango">>).
+
+-define(MANGO_INDEX_BUILDING, <<"building">>).
+-define(MANGO_INDEX_READY, <<"ready">>).
diff --git a/src/mango/src/mango_fdb.erl b/src/mango/src/mango_fdb.erl
index def942fa3..a54d6582f 100644
--- a/src/mango/src/mango_fdb.erl
+++ b/src/mango/src/mango_fdb.erl
@@ -22,13 +22,104 @@
-export([
- query_all_docs/4,
+ create_build_vs/2,
+ set_build_vs/4,
+ get_build_vs/2,
+ get_build_status/2,
+ get_update_seq/2,
+ set_update_seq/3,
remove_doc/3,
write_doc/3,
+ query_all_docs/4,
query/4
]).
+create_build_vs(TxDb, #idx{} = Idx) ->
+ #{
+ tx := Tx
+ } = TxDb,
+ Key = build_vs_key(TxDb, Idx#idx.ddoc),
+ VS = fabric2_fdb:new_versionstamp(Tx),
+ Value = erlfdb_tuple:pack_vs({VS, ?MANGO_INDEX_BUILDING}),
+ erlfdb:set_versionstamped_value(Tx, Key, Value).
+
+
+set_build_vs(TxDb, #idx{} = Idx, VS, State) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ Key = build_vs_key(TxDb, Idx#idx.ddoc),
+ Value = erlfdb_tuple:pack({VS, State}),
+ ok = erlfdb:set(Tx, Key, Value).
+
+
+get_build_vs(TxDb, #idx{} = Idx) ->
+ get_build_vs(TxDb, Idx#idx.ddoc);
+
+get_build_vs(TxDb, DDoc) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+ Key = build_vs_key(TxDb, DDoc),
+ EV = erlfdb:wait(erlfdb:get(Tx, Key)),
+ case EV of
+ not_found -> not_found;
+ EV -> erlfdb_tuple:unpack(EV)
+ end.
+
+
+get_build_status(TxDb, DDoc) ->
+ case get_build_vs(TxDb, DDoc) of
+ not_found -> ?MANGO_INDEX_BUILDING;
+ {_, BuildState} -> BuildState
+ end.
+
+
+get_update_seq(TxDb, #idx{ddoc = DDoc}) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+
+ case erlfdb:wait(erlfdb:get(Tx, seq_key(DbPrefix, DDoc))) of
+ not_found -> <<>>;
+ UpdateSeq -> UpdateSeq
+ end.
+
+
+set_update_seq(TxDb, #idx{ddoc = DDoc}, Seq) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = TxDb,
+ ok = erlfdb:set(Tx, seq_key(DbPrefix, DDoc), Seq).
+
+
+remove_doc(TxDb, DocId, IdxResults) ->
+ lists:foreach(fun (IdxResult) ->
+ #{
+ ddoc_id := DDocId,
+ results := Results
+ } = IdxResult,
+ MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+ clear_key(TxDb, MangoIdxPrefix, Results, DocId)
+ end, IdxResults).
+
+
+write_doc(TxDb, DocId, IdxResults) ->
+ lists:foreach(fun (IdxResult) ->
+ #{
+ ddoc_id := DDocId,
+ results := Results
+ } = IdxResult,
+ MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
+ add_key(TxDb, MangoIdxPrefix, Results, DocId)
+ end, IdxResults).
+
+
query_all_docs(Db, CallBack, Cursor, Args) ->
Opts = args_to_fdb_opts(Args) ++ [{include_docs, true}],
fabric2_db:fold_docs(Db, CallBack, Cursor, Opts).
@@ -133,7 +224,7 @@ fold_cb({Key, _}, Acc) ->
{{_, DocId}} = erlfdb_tuple:unpack(Key, MangoIdxPrefix),
{ok, Doc} = fabric2_db:open_doc(Db, DocId),
JSONDoc = couch_doc:to_json_obj(Doc, []),
- io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
+%% io:format("PRINT ~p ~p ~n", [DocId, JSONDoc]),
case Callback({doc, JSONDoc}, Cursor) of
{ok, Cursor1} ->
Acc#{
@@ -144,33 +235,24 @@ fold_cb({Key, _}, Acc) ->
end.
-remove_doc(TxDb, DocId, IdxResults) ->
- lists:foreach(fun (IdxResult) ->
- #{
- ddoc_id := DDocId,
- results := Results
- } = IdxResult,
- MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
- clear_key(TxDb, MangoIdxPrefix, Results, DocId)
- end, IdxResults).
+mango_idx_prefix(TxDb, Id) ->
+ #{
+ db_prefix := DbPrefix
+ } = TxDb,
+ Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+ erlfdb_tuple:pack(Key, DbPrefix).
-write_doc(TxDb, DocId, IdxResults) ->
- lists:foreach(fun (IdxResult) ->
- #{
- ddoc_id := DDocId,
- results := Results
- } = IdxResult,
- MangoIdxPrefix = mango_idx_prefix(TxDb, DDocId),
- add_key(TxDb, MangoIdxPrefix, Results, DocId)
- end, IdxResults).
+seq_key(DbPrefix, DDoc) ->
+ Key = {?DB_MANGO, DDoc, ?MANGO_UPDATE_SEQ},
+ erlfdb_tuple:pack(Key, DbPrefix).
-mango_idx_prefix(TxDb, Id) ->
+build_vs_key(Db, DDoc) ->
#{
db_prefix := DbPrefix
- } = TxDb,
- Key = {?DB_MANGO, Id, ?MANGO_IDX_RANGE},
+ } = Db,
+ Key = {?DB_MANGO, DDoc, ?MANGO_IDX_BUILD_STATUS},
erlfdb_tuple:pack(Key, DbPrefix).
diff --git a/src/mango/src/mango_idx.erl b/src/mango/src/mango_idx.erl
index cf3f5079c..3aadd49d6 100644
--- a/src/mango/src/mango_idx.erl
+++ b/src/mango/src/mango_idx.erl
@@ -58,7 +58,7 @@ list(Db) ->
rows => []
},
{ok, Indexes} = fabric2_db:fold_design_docs(Db, fun ddoc_fold_cb/2, Acc0, []),
- io:format("INDEXES ~p ~n", [Indexes]),
+%% io:format("INDEXES ~p ~n", [Indexes]),
Indexes ++ special(Db).
@@ -237,13 +237,16 @@ from_ddoc(Db, {Props}) ->
%% [mango_idx_view]
%% end,
Idxs = lists:flatmap(fun(Mod) -> Mod:from_ddoc({Props}) end, IdxMods),
- lists:map(fun(Idx) ->
- Idx#idx{
- dbname = DbName,
- ddoc = DDoc,
- partitioned = get_idx_partitioned(Db, Props)
- }
- end, Idxs).
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:map(fun(Idx) ->
+ Idx#idx{
+ dbname = DbName,
+ ddoc = DDoc,
+ partitioned = get_idx_partitioned(Db, Props),
+ build_status = mango_fdb:get_build_status(TxDb, DDoc)
+ }
+ end, Idxs)
+ end).
special(Db) ->
diff --git a/src/mango/src/mango_idx.hrl b/src/mango/src/mango_idx.hrl
index 97259500b..f5f827b22 100644
--- a/src/mango/src/mango_idx.hrl
+++ b/src/mango/src/mango_idx.hrl
@@ -17,5 +17,6 @@
type,
def,
partitioned,
- opts
+ opts,
+ build_status
}).
diff --git a/src/mango/src/mango_idx_view.erl b/src/mango/src/mango_idx_view.erl
index 5ec2a1020..949c69bdf 100644
--- a/src/mango/src/mango_idx_view.erl
+++ b/src/mango/src/mango_idx_view.erl
@@ -105,7 +105,8 @@ to_json(Idx) ->
{name, Idx#idx.name},
{type, Idx#idx.type},
{partitioned, Idx#idx.partitioned},
- {def, {def_to_json(Idx#idx.def)}}
+ {def, {def_to_json(Idx#idx.def)}},
+ {build_status, Idx#idx.build_status}
]}.
diff --git a/src/mango/src/mango_indexer.erl b/src/mango/src/mango_indexer.erl
index c22b9cfeb..c7632a7f1 100644
--- a/src/mango/src/mango_indexer.erl
+++ b/src/mango/src/mango_indexer.erl
@@ -17,11 +17,14 @@
-export([
create_doc/2,
update_doc/3,
- delete_doc/2
+ delete_doc/2,
+
+ write_doc/3
]).
-include_lib("couch/include/couch_db.hrl").
+-include("mango.hrl").
-include("mango_idx.hrl").
@@ -42,7 +45,7 @@ modify(Db, Change, Doc, PrevDoc) ->
modify_int(Db, Change, Doc, PrevDoc)
catch
Error:Reason ->
- io:format("ERROR ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+ io:format("ERROR INDEXER ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
#{
name := DbName
} = Db,
@@ -66,9 +69,16 @@ doc_id(#doc{id = DocId}, _) ->
% Design doc
% Todo: Check if design doc is mango index and kick off background worker
% to build new index
-modify_int(_Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
+modify_int(Db, _Change, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc,
_PrevDoc) ->
- ok;
+ {Props} = JSONDoc = couch_doc:to_json_obj(Doc, []),
+ case proplists:get_value(<<"language">>, Props) of
+ <<"query">> ->
+ [Idx] = mango_idx:from_ddoc(Db, JSONDoc),
+ {ok, _} = mango_jobs:build_index(Db, Idx);
+ _ ->
+ ok
+ end;
modify_int(Db, delete, _, PrevDoc) ->
remove_doc(Db, PrevDoc, json_indexes(Db));
@@ -138,15 +148,13 @@ get_index_entries(IdxDef, Doc) ->
get_index_values(Fields, Doc) ->
- Out1 = lists:map(fun({Field, _Dir}) ->
+ lists:map(fun({Field, _Dir}) ->
case mango_doc:get_field(Doc, Field) of
not_found -> not_found;
bad_path -> not_found;
Value -> Value
end
- end, Fields),
- io:format("OUT ~p ~p ~n", [Fields, Out1]),
- Out1.
+ end, Fields).
get_index_partial_filter_selector(IdxDef) ->
diff --git a/src/mango/src/mango_indexer_server.erl b/src/mango/src/mango_indexer_server.erl
new file mode 100644
index 000000000..29530bb9a
--- /dev/null
+++ b/src/mango/src/mango_indexer_server.erl
@@ -0,0 +1,103 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mango_indexer_server).
+
+
+-behaviour(gen_server).
+
+
+-export([
+ start_link/0
+]).
+
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+
+-define(MAX_WORKERS, 1).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+init(_) ->
+ process_flag(trap_exit, true),
+ mango_jobs:set_timeout(),
+ St = #{
+ workers => #{},
+ max_workers => max_workers()
+ },
+ {ok, spawn_workers(St)}.
+
+
+terminate(_, _St) ->
+ ok.
+
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'EXIT', Pid, Reason}, St) ->
+ #{workers := Workers} = St,
+ case maps:is_key(Pid, Workers) of
+ true ->
+ if Reason == normal -> ok; true ->
+ LogMsg = "~p : indexer process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason])
+ end,
+ NewWorkers = maps:remove(Pid, Workers),
+ {noreply, spawn_workers(St#{workers := NewWorkers})};
+ false ->
+ LogMsg = "~p : unknown process ~p exited with ~p",
+ couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
+ {stop, {unknown_pid_exit, Pid}, St}
+ end;
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+
+spawn_workers(St) ->
+ #{
+ workers := Workers,
+ max_workers := MaxWorkers
+ } = St,
+ case maps:size(Workers) < MaxWorkers of
+ true ->
+ Pid = mango_jobs_indexer:spawn_link(),
+ NewSt = St#{workers := Workers#{Pid => true}},
+ spawn_workers(NewSt);
+ false ->
+ St
+ end.
+
+
+max_workers() ->
+ config:get_integer("mango", "max_workers", ?MAX_WORKERS).
diff --git a/src/mango/src/mango_jobs.erl b/src/mango/src/mango_jobs.erl
new file mode 100644
index 000000000..6739d620e
--- /dev/null
+++ b/src/mango/src/mango_jobs.erl
@@ -0,0 +1,53 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+
+
+-module(mango_jobs).
+
+-include("mango_idx.hrl").
+-include("mango.hrl").
+
+
+-export([
+ set_timeout/0,
+ build_index/2
+]).
+
+
+set_timeout() ->
+ couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 6).
+
+
+build_index(TxDb, #idx{} = Idx) ->
+ #{
+ tx := Tx
+ } = TxDb,
+
+ mango_fdb:create_build_vs(TxDb, Idx),
+
+ JobId = job_id(TxDb, Idx),
+ JobData = job_data(TxDb, Idx),
+ ok = couch_jobs:add(undefined, ?MANGO_INDEX_JOB_TYPE, JobId, JobData),
+ {ok, JobId}.
+
+
+job_id(#{name := DbName}, #idx{ddoc = DDoc}) ->
+ <<DbName/binary, "-",DDoc/binary>>.
+
+
+job_data(Db, Idx) ->
+ #{
+ db_name => fabric2_db:name(Db),
+ ddoc_id => mango_idx:ddoc(Idx),
+ columns => mango_idx:columns(Idx),
+ retries => 0
+ }.
+
diff --git a/src/mango/src/mango_jobs_indexer.erl b/src/mango/src/mango_jobs_indexer.erl
new file mode 100644
index 000000000..ce6b8506c
--- /dev/null
+++ b/src/mango/src/mango_jobs_indexer.erl
@@ -0,0 +1,358 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+% Todo: this is a copy-pasta of couch_views_indexer
+% We need to make the indexing generic and have only the specific mango
+% logic here
+-module(mango_jobs_indexer).
+
+-export([
+ spawn_link/0
+]).
+
+
+-export([
+ init/0
+]).
+
+-include("mango.hrl").
+-include("mango_idx.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("fabric/include/fabric2.hrl").
+
+
+spawn_link() ->
+ proc_lib:spawn_link(?MODULE, init, []).
+
+
+init() ->
+ {ok, Job, Data} = couch_jobs:accept(?MANGO_INDEX_JOB_TYPE, #{}),
+ #{
+ <<"db_name">> := DbName,
+ <<"ddoc_id">> := DDocId,
+ <<"columns">> := JobColumns,
+ <<"retries">> := Retries
+ } = Data,
+
+ {ok, Db} = try
+ fabric2_db:open(DbName, [?ADMIN_CTX])
+ catch error:database_does_not_exist ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => db_deleted,
+ reason => "Database was deleted"
+ }),
+ exit(normal)
+ end,
+
+ [Idx] = case fabric2_db:open_doc(Db, DDocId) of
+ {ok, DDoc} ->
+ JSONDDoc = couch_doc:to_json_obj(DDoc, []),
+ mango_idx:from_ddoc(Db, JSONDDoc);
+ {not_found, _} ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => ddoc_deleted,
+ reason => "Design document was deleted"
+ }),
+ exit(normal)
+ end,
+
+ Columns = mango_idx:columns(Idx),
+
+ if JobColumns == Columns -> ok; true ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => index_changed,
+ reason => <<"Design document was modified">>
+ }),
+ exit(normal)
+ end,
+
+
+ State = #{
+ tx_db => undefined,
+ idx_vs => undefined,
+ idx_seq => undefined,
+ last_seq => undefined,
+ job => Job,
+ job_data => Data,
+ count => 0,
+ limit => num_changes(),
+ doc_acc => []
+ },
+
+ try
+ update(Db, Idx, State)
+ catch
+ exit:normal ->
+ ok;
+ Error:Reason ->
+ io:format("ERROR in index worker ~p ~p ~p ~n", [Error, Reason, erlang:display(erlang:get_stacktrace())]),
+ NewRetry = Retries + 1,
+ RetryLimit = retry_limit(),
+
+ case should_retry(NewRetry, RetryLimit, Reason) of
+ true ->
+ DataErr = Data#{<<"retries">> := NewRetry},
+ StateErr = State#{job_data := DataErr},
+ report_progress(StateErr, update);
+ false ->
+ NewData = add_error(Error, Reason, Data),
+ couch_jobs:finish(undefined, Job, NewData),
+ exit(normal)
+ end
+ end.
+
+
+% Transaction limit exceeded don't retry
+should_retry(_, _, {erlfdb_error, 2101}) ->
+ false;
+
+should_retry(Retries, RetryLimit, _) when Retries < RetryLimit ->
+ true;
+
+should_retry(_, _, _) ->
+ false.
+
+
+add_error(error, {erlfdb_error, Code}, Data) ->
+ CodeBin = couch_util:to_binary(Code),
+ CodeString = erlfdb:get_error_string(Code),
+ Data#{
+ error => foundationdb_error,
+ reason => list_to_binary([CodeBin, <<"-">>, CodeString])
+ };
+
+add_error(Error, Reason, Data) ->
+ Data#{
+ error => couch_util:to_binary(Error),
+ reason => couch_util:to_binary(Reason)
+ }.
+
+
+update(#{} = Db, #idx{} = Idx, State0) ->
+ {Idx2, State4} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ % In the first iteration of update we need
+ % to populate our db and view sequences
+ State1 = case State0 of
+ #{idx_vs := undefined} ->
+ #{
+ job := Job,
+ job_data := Data
+ } = State0,
+
+ {IdxVS, BuildState} = mango_fdb:get_build_vs(TxDb, Idx),
+ if BuildState == ?MANGO_INDEX_BUILDING -> ok; true ->
+ couch_jobs:finish(undefined, Job, Data#{
+ error => index_built,
+ reason => <<"Index is already built">>
+ }),
+ exit(normal)
+ end,
+
+ IdxSeq = mango_fdb:get_update_seq(TxDb, Idx),
+
+ State0#{
+ tx_db := TxDb,
+ idx_vs := IdxVS,
+ idx_seq := IdxSeq
+ };
+ _ ->
+ State0#{
+ tx_db := TxDb
+ }
+ end,
+
+ {ok, State2} = fold_changes(State1),
+
+ #{
+ idx_vs := IdxVS1,
+ count := Count,
+ limit := Limit,
+ doc_acc := DocAcc,
+ idx_seq := IdxSeq1
+ } = State2,
+
+ DocAcc1 = couch_views_indexer:fetch_docs(TxDb, DocAcc),
+ index_docs(TxDb, Idx, DocAcc1),
+ mango_fdb:set_update_seq(TxDb, Idx, IdxSeq1),
+ case Count < Limit of
+ true ->
+ mango_fdb:set_build_vs(TxDb, Idx, IdxVS1, ?MANGO_INDEX_READY),
+ report_progress(State2, finished),
+ {Idx, finished};
+ false ->
+ State3 = report_progress(State2, update),
+ {Idx, State3#{
+ tx_db := undefined,
+ count := 0,
+ doc_acc := [],
+ idx_seq := IdxSeq1
+ }}
+ end
+ end),
+
+ case State4 of
+ finished ->
+ ok;
+ _ ->
+ update(Db, Idx2, State4)
+ end.
+
+
+fold_changes(State) ->
+ #{
+ idx_seq := SinceSeq,
+ limit := Limit,
+ tx_db := TxDb
+ } = State,
+
+ Fun = fun process_changes/2,
+ fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, [{limit, Limit}]).
+
+
+process_changes(Change, Acc) ->
+ #{
+ doc_acc := DocAcc,
+ count := Count,
+ idx_vs := IdxVS
+ } = Acc,
+
+ #{
+ id := Id,
+ sequence := LastSeq
+ } = Change,
+
+ DocVS = fabric2_fdb:next_vs(fabric2_fdb:seq_to_vs(LastSeq)),
+
+ case IdxVS =< DocVS of
+ true ->
+ {stop, Acc};
+ false ->
+ Acc1 = case Id of
+ <<?DESIGN_DOC_PREFIX, _/binary>> ->
+ maps:merge(Acc, #{
+ count => Count + 1,
+ idx_seq => LastSeq
+ });
+ _ ->
+ Acc#{
+ doc_acc := DocAcc ++ [Change],
+ count := Count + 1,
+ idx_seq := LastSeq
+ }
+ end,
+ {ok, Acc1}
+ end.
+
+
+index_docs(Db, Idx, Docs) ->
+ lists:foreach(fun (Doc) ->
+ index_doc(Db, Idx, Doc)
+ end, Docs).
+
+
+index_doc(_Db, _Idx, #{deleted := true}) ->
+ ok;
+
+index_doc(Db, Idx, #{doc := Doc}) ->
+ mango_indexer:write_doc(Db, Doc, [Idx]).
+
+
+%%fetch_docs(Db, Changes) ->
+%% {Deleted, NotDeleted} = lists:partition(fun(Doc) ->
+%% #{deleted := Deleted} = Doc,
+%% Deleted
+%% end, Changes),
+%%
+%% RevState = lists:foldl(fun(Change, Acc) ->
+%% #{id := Id} = Change,
+%% RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
+%% Acc#{
+%% RevFuture => {Id, Change}
+%% }
+%% end, #{}, NotDeleted),
+%%
+%% RevFutures = maps:keys(RevState),
+%% BodyState = lists:foldl(fun(RevFuture, Acc) ->
+%% {Id, Change} = maps:get(RevFuture, RevState),
+%% Revs = fabric2_fdb:get_winning_revs_wait(Db, RevFuture),
+%%
+%% % I'm assuming that in this changes transaction that the winning
+%% % doc body exists since it is listed in the changes feed as not deleted
+%% #{winner := true} = RevInfo = lists:last(Revs),
+%% BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
+%% Acc#{
+%% BodyFuture => {Id, RevInfo, Change}
+%% }
+%% end, #{}, erlfdb:wait_for_all(RevFutures)),
+%%
+%% BodyFutures = maps:keys(BodyState),
+%% ChangesWithDocs = lists:map(fun (BodyFuture) ->
+%% {Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
+%% Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
+%% Change#{doc => Doc}
+%% end, erlfdb:wait_for_all(BodyFutures)),
+%%
+%% % This combines the deleted changes with the changes that contain docs
+%% % Important to note that this is now unsorted. Which is fine for now
+%% % But later could be an issue if we split this across transactions
+%% Deleted ++ ChangesWithDocs.
+
+
+report_progress(State, UpdateType) ->
+ #{
+ tx_db := TxDb,
+ job := Job1,
+ job_data := JobData
+ } = State,
+
+ #{
+ <<"db_name">> := DbName,
+ <<"ddoc_id">> := DDocId,
+ <<"columns">> := Columns,
+ <<"retries">> := Retries
+ } = JobData,
+
+ % Reconstruct from scratch to remove any
+ % possible existing error state.
+ NewData = #{
+ <<"db_name">> => DbName,
+ <<"ddoc_id">> => DDocId,
+ <<"columns">> => Columns,
+ <<"retries">> => Retries
+ },
+
+ case UpdateType of
+ update ->
+ case couch_jobs:update(TxDb, Job1, NewData) of
+ {ok, Job2} ->
+ State#{job := Job2};
+ {error, halt} ->
+ couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+ exit(normal)
+ end;
+ finished ->
+ case couch_jobs:finish(TxDb, Job1, NewData) of
+ ok ->
+ State;
+ {error, halt} ->
+ couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
+ exit(normal)
+ end
+ end.
+
+
+num_changes() ->
+ config:get_integer("mango", "change_limit", 100).
+
+
+retry_limit() ->
+ config:get_integer("mango", "retry_limit", 3).
diff --git a/src/mango/src/mango_sup.erl b/src/mango/src/mango_sup.erl
index b0dedf125..fc12dfe8e 100644
--- a/src/mango/src/mango_sup.erl
+++ b/src/mango/src/mango_sup.erl
@@ -21,4 +21,16 @@ start_link(Args) ->
supervisor:start_link({local,?MODULE}, ?MODULE, Args).
init([]) ->
- {ok, {{one_for_one, 3, 10}, couch_epi:register_service(mango_epi, [])}}.
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 3,
+ period => 10
+ },
+
+ Children = [
+ #{
+ id => mango_indexer_server,
+ start => {mango_indexer_server, start_link, []}
+ }
+ ] ++ couch_epi:register_service(mango_epi, []),
+ {ok, {Flags, Children}}.
diff --git a/src/mango/test/01-index-crud-test.py b/src/mango/test/01-index-crud-test.py
index dd9ab1ade..3434c6611 100644
--- a/src/mango/test/01-index-crud-test.py
+++ b/src/mango/test/01-index-crud-test.py
@@ -91,6 +91,7 @@ class IndexCrudTests(mango.DbPerClass):
for idx in self.db.list_indexes():
if idx["name"] != "idx_01":
continue
+ print(idx)
self.assertEqual(idx["def"]["fields"], [{"foo": "asc"}, {"bar": "asc"}])
return
raise AssertionError("index not created")
diff --git a/src/mango/test/eunit/mango_indexer_test.erl b/src/mango/test/eunit/mango_indexer_test.erl
index 778caea50..ee24b21c5 100644
--- a/src/mango/test/eunit/mango_indexer_test.erl
+++ b/src/mango/test/eunit/mango_indexer_test.erl
@@ -41,10 +41,7 @@ indexer_test_() ->
setup() ->
Ctx = test_util:start_couch([
- fabric,
- couch_jobs,
- couch_js,
- couch_views
+ fabric
]),
Ctx.
diff --git a/src/mango/test/eunit/mango_jobs_indexer_test.erl b/src/mango/test/eunit/mango_jobs_indexer_test.erl
new file mode 100644
index 000000000..7a8cb248a
--- /dev/null
+++ b/src/mango/test/eunit/mango_jobs_indexer_test.erl
@@ -0,0 +1,189 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(mango_jobs_indexer_test).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("mango/src/mango.hrl").
+-include_lib("mango/src/mango_cursor.hrl").
+-include_lib("mango/src/mango_idx.hrl").
+
+-include_lib("fabric/test/fabric2_test.hrl").
+
+indexer_test_() ->
+ {
+ "Test indexing",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ {
+ foreach,
+ fun foreach_setup/0,
+ fun foreach_teardown/1,
+ [
+ with([?TDEF(index_docs)]),
+ with([?TDEF(index_lots_of_docs, 10)]),
+ with([?TDEF(index_can_recover_from_crash, 60)])
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Ctx = test_util:start_couch([
+ fabric,
+ couch_jobs,
+ mango
+ ]),
+%% couch_jobs:set_type_timeout(?MANGO_INDEX_JOB_TYPE, 1),
+ Ctx.
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = fabric2_db:create(DbName, [{user_ctx, ?ADMIN_USER}]),
+ Db.
+
+
+foreach_teardown(Db) ->
+ meck:unload(),
+ ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+index_docs(Db) ->
+ DDoc = generate_docs(Db, 5),
+ wait_while_ddoc_builds(Db),
+ Docs = run_query(Db, DDoc),
+ ?assertEqual([
+ [{id, <<"1">>}, {value, 1}],
+ [{id, <<"2">>}, {value, 2}],
+ [{id, <<"3">>}, {value, 3}],
+ [{id, <<"4">>}, {value, 4}],
+ [{id, <<"5">>}, {value, 5}]
+], Docs).
+
+
+index_lots_of_docs(Db) ->
+ DDoc = generate_docs(Db, 150),
+ wait_while_ddoc_builds(Db),
+ Docs = run_query(Db, DDoc),
+ ?assertEqual(length(Docs), 150).
+
+
+index_can_recover_from_crash(Db) ->
+ meck:new(mango_indexer, [passthrough]),
+ meck:expect(mango_indexer, write_doc, fun (Db, Doc, Idxs) ->
+ ?debugFmt("doc ~p ~p ~n", [Doc, Idxs]),
+ Id = Doc#doc.id,
+ case Id == <<"2">> of
+ true ->
+ meck:unload(mango_indexer),
+ throw({fake_crash, test_jobs_restart});
+ false ->
+ meck:passthrough([Db, Doc, Idxs])
+ end
+ end),
+ DDoc = generate_docs(Db, 3),
+ wait_while_ddoc_builds(Db),
+ Docs = run_query(Db, DDoc),
+ ?assertEqual([
+ [{id, <<"1">>}, {value, 1}],
+ [{id, <<"2">>}, {value, 2}],
+ [{id, <<"3">>}, {value, 3}]
+ ], Docs).
+
+
+wait_while_ddoc_builds(Db) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ Idxs = mango_idx:list(TxDb),
+ [Idx] = lists:filter(fun (Idx) -> Idx#idx.type == <<"json">> end, Idxs),
+ if Idx#idx.build_status == ?MANGO_INDEX_READY -> ok; true ->
+ timer:sleep(100),
+ wait_while_ddoc_builds(Db)
+ end
+ end).
+
+
+run_query(Db, DDoc) ->
+ Args = #{
+ start_key => [],
+ start_key_docid => <<>>,
+ end_key => [],
+ end_key_docid => <<255>>,
+ dir => fwd,
+ skip => 0
+ },
+ [Idx] = mango_idx:from_ddoc(Db, DDoc),
+ Cursor = #cursor{
+ db = Db,
+ index = Idx,
+ user_acc = []
+ },
+ {ok, Cursor1} = mango_fdb:query(Db, fun query_cb/2, Cursor, Args),
+ Acc = Cursor1#cursor.user_acc,
+ lists:map(fun ({Props}) ->
+ [
+ {id, couch_util:get_value(<<"_id">>, Props)},
+ {value, couch_util:get_value(<<"value">>, Props)}
+ ]
+
+ end, Acc).
+
+
+generate_docs(Db, Count) ->
+ Docs = make_docs(Count),
+ fabric2_db:update_docs(Db, Docs),
+
+
+ DDoc = create_idx_ddoc(Db),
+ fabric2_db:update_docs(Db, [DDoc]),
+ couch_doc:to_json_obj(DDoc, []).
+
+
+create_idx_ddoc(Db) ->
+ Opts = [
+ {def, {[{<<"fields">>,{[{<<"value">>,<<"asc">>}]}}]}},
+ {type, <<"json">>},
+ {name, <<"idx_01">>},
+ {ddoc, auto_name},
+ {w, 3},
+ {partitioned, db_default}
+ ],
+
+ {ok, Idx} = mango_idx:new(Db, Opts),
+ {ok, DDoc} = mango_util:load_ddoc(Db, mango_idx:ddoc(Idx), []),
+ {ok, NewDDoc} = mango_idx:add(DDoc, Idx),
+ NewDDoc.
+
+
+make_docs(Count) ->
+ [doc(I) || I <- lists:seq(1, Count)].
+
+
+doc(Id) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, list_to_binary(integer_to_list(Id))},
+ {<<"value">>, Id}
+ ]}).
+
+
+query_cb({doc, Doc}, #cursor{user_acc = Acc} = Cursor) ->
+ {ok, Cursor#cursor{
+ user_acc = Acc ++ [Doc]
+ }}.
diff --git a/src/mango/test/mango.py b/src/mango/test/mango.py
index a39476dec..92cf2113a 100644
--- a/src/mango/test/mango.py
+++ b/src/mango/test/mango.py
@@ -161,8 +161,12 @@ class Database(object):
created = r.json()["result"] == "created"
if created:
- # wait until the database reports the index as available
- while len(self.get_index(r.json()["id"], r.json()["name"])) < 1:
+ # wait until the database reports the index as available and build
+ while len([
+ i
+ for i in self.get_index(r.json()["id"], r.json()["name"])
+ if i["build_status"] == "ready"
+ ]) < 1:
delay(t=0.1)
return created