summaryrefslogtreecommitdiff
path: root/src/couch_views/test/couch_views_indexer_test.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_views/test/couch_views_indexer_test.erl')
-rw-r--r--src/couch_views/test/couch_views_indexer_test.erl699
1 files changed, 699 insertions, 0 deletions
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
new file mode 100644
index 000000000..75be2459f
--- /dev/null
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -0,0 +1,699 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_views_indexer_test).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("couch_views/include/couch_views.hrl").
+-include_lib("fabric/test/fabric2_test.hrl").
+
+
+-define(MAP_FUN1, <<"map_fun1">>).
+-define(MAP_FUN2, <<"map_fun2">>).
+
+
+indexer_test_() ->
+ {
+ "Test view indexing",
+ {
+ setup,
+ fun setup/0,
+ fun cleanup/1,
+ {
+ foreach,
+ fun foreach_setup/0,
+ fun foreach_teardown/1,
+ [
+ ?TDEF_FE(indexed_empty_db),
+ ?TDEF_FE(indexed_single_doc),
+ ?TDEF_FE(updated_docs_are_reindexed),
+ ?TDEF_FE(updated_docs_without_changes_are_reindexed),
+ ?TDEF_FE(deleted_docs_not_indexed),
+ ?TDEF_FE(deleted_docs_are_unindexed),
+ ?TDEF_FE(multipe_docs_with_same_key),
+ ?TDEF_FE(multipe_keys_from_same_doc),
+ ?TDEF_FE(multipe_identical_keys_from_same_doc),
+ ?TDEF_FE(fewer_multipe_identical_keys_from_same_doc),
+ ?TDEF_FE(multiple_design_docs),
+ ?TDEF_FE(handle_size_key_limits),
+ ?TDEF_FE(handle_size_value_limits),
+ ?TDEF_FE(index_autoupdater_callback),
+ ?TDEF_FE(handle_db_recreated_when_running),
+ ?TDEF_FE(handle_db_recreated_after_finished),
+ ?TDEF_FE(index_can_recover_from_crash, 60)
+ ]
+ }
+ }
+ }.
+
+
+setup() ->
+ Ctx = test_util:start_couch([
+ fabric,
+ couch_jobs,
+ couch_js,
+ couch_views
+ ]),
+ Ctx.
+
+
+cleanup(Ctx) ->
+ test_util:stop_couch(Ctx).
+
+
+foreach_setup() ->
+ {ok, Db} = fabric2_db:create(?tempdb(), [{user_ctx, ?ADMIN_USER}]),
+ Db.
+
+
+foreach_teardown(Db) ->
+ meck:unload(),
+ config:delete("couch_views", "change_limit"),
+ ok = fabric2_db:delete(fabric2_db:name(Db), []).
+
+
+indexed_empty_db(Db) ->
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ ?assertEqual({ok, []}, run_query(Db, DDoc, ?MAP_FUN1)).
+
+
+indexed_single_doc(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"0">>, 0, 0)], Out).
+
+
+updated_docs_are_reindexed(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"0">>, 0, 0)], Out1),
+
+ Doc2 = Doc1#doc{
+ revs = {Pos, [Rev]},
+ body = {[{<<"val">>, 1}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+ {ok, Out2} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"0">>, 1, 1)], Out2),
+
+ % Check that our id index is updated properly
+ % as well.
+ DbName = fabric2_db:name(Db),
+ {ok, Mrst0} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{tx := Tx} = TxDb,
+ Mrst1 = couch_views_trees:open(TxDb, Mrst0),
+ IdRow = ebtree:lookup(Tx, Mrst1#mrst.id_btree, <<"0">>),
+ ?assertEqual({<<"0">>, [{1, []}, {0, [1]}]}, IdRow)
+ end).
+
+
+updated_docs_without_changes_are_reindexed(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"0">>, 0, 0)], Out1),
+
+ Doc2 = Doc1#doc{
+ revs = {Pos, [Rev]},
+ body = {[{<<"val">>, 0}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+ {ok, Out2} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"0">>, 0, 0)], Out2),
+
+ % Check fdb directly to make sure we've also
+ % removed the id idx keys properly.
+ DbName = fabric2_db:name(Db),
+ {ok, Mrst0} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{tx := Tx} = TxDb,
+ Mrst1 = couch_views_trees:open(TxDb, Mrst0),
+ IdRow = ebtree:lookup(Tx, Mrst1#mrst.id_btree, <<"0">>),
+ ?assertEqual({<<"0">>, [{1, []}, {0, [0]}]}, IdRow)
+ end).
+
+
+deleted_docs_not_indexed(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+ Doc2 = Doc1#doc{
+ revs = {Pos, [Rev]},
+ deleted = true,
+ body = {[{<<"val">>, 1}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+ ?assertEqual({ok, []}, run_query(Db, DDoc, ?MAP_FUN1)).
+
+
+deleted_docs_are_unindexed(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc1, []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+ ?assertEqual([row(<<"0">>, 0, 0)], Out1),
+
+ Doc2 = Doc1#doc{
+ revs = {Pos, [Rev]},
+ deleted = true,
+ body = {[{<<"val">>, 1}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+
+ ?assertEqual({ok, []}, run_query(Db, DDoc, ?MAP_FUN1)),
+
+ % Check fdb directly to make sure we've also
+ % removed the id idx keys properly.
+ DbName = fabric2_db:name(Db),
+ {ok, Mrst0} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ #{tx := Tx} = TxDb,
+ Mrst1 = couch_views_trees:open(TxDb, Mrst0),
+ IdRow = ebtree:lookup(Tx, Mrst1#mrst.id_btree, <<"0">>),
+ ?assertEqual(false, IdRow)
+ end).
+
+
+multipe_docs_with_same_key(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0, 1),
+ Doc2 = doc(1, 1),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_docs(Db, [Doc1, Doc2], []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"0">>, 1, 1),
+ row(<<"1">>, 1, 1)
+ ], Out).
+
+
+multipe_keys_from_same_doc(Db) ->
+ DDoc = create_ddoc(multi_emit_different),
+ Doc = doc(0, 1),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, Doc, []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"0">>, 1, 1),
+ row(<<"0">>, <<"0">>, <<"0">>)
+ ], Out).
+
+
+multipe_identical_keys_from_same_doc(Db) ->
+ DDoc = create_ddoc(multi_emit_same),
+ Doc = doc(0, 1),
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, Doc, []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"0">>, 1, 1),
+ row(<<"0">>, 1, 2)
+ ], Out).
+
+
+fewer_multipe_identical_keys_from_same_doc(Db) ->
+ DDoc = create_ddoc(multi_emit_same),
+ Doc0 = #doc{
+ id = <<"0">>,
+ body = {[{<<"val">>, 1}, {<<"extra">>, 3}]}
+ },
+
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, {Pos, Rev}} = fabric2_db:update_doc(Db, Doc0, []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"0">>, 1, 1),
+ row(<<"0">>, 1, 2),
+ row(<<"0">>, 1, 3)
+ ], Out1),
+
+ Doc1 = #doc{
+ id = <<"0">>,
+ revs = {Pos, [Rev]},
+ body = {[{<<"val">>, 1}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+
+ {ok, Out2} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"0">>, 1, 1),
+ row(<<"0">>, 1, 2)
+ ], Out2).
+
+
+handle_size_key_limits(Db) ->
+ ok = meck:new(config, [passthrough]),
+ ok = meck:expect(config, get_integer, fun
+ ("couch_views", "key_size_limit", _Default) -> 15;
+ (_Section, _Key, Default) -> Default
+ end),
+
+ DDoc = create_ddoc(multi_emit_key_limit),
+ Docs = [doc(1, 2)] ++ [doc(2, 1)],
+
+ {ok, _} = fabric2_db:update_docs(Db, [DDoc | Docs], []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([row(<<"1">>, 2, 2)], Out),
+
+ {ok, Doc} = fabric2_db:open_doc(Db, <<"2">>),
+ Doc2 = Doc#doc {
+ body = {[{<<"val">>, 2}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+
+ ?assertEqual([
+ row(<<"1">>, 2, 2),
+ row(<<"2">>, 2, 2)
+ ], Out1).
+
+
+handle_size_value_limits(Db) ->
+ ok = meck:new(config, [passthrough]),
+ ok = meck:expect(config, get_integer, fun
+ ("couch_views", "value_size_limit", _Default) -> 15;
+ (_Section, _Key, Default) -> Default
+ end),
+
+ DDoc = create_ddoc(multi_emit_key_limit),
+ Docs = [doc(1, 2)] ++ [doc(2, 3)],
+
+ {ok, _} = fabric2_db:update_docs(Db, [DDoc | Docs], []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN2),
+
+ ?assertEqual([
+ row(<<"1">>, 2, 2),
+ row(<<"2">>, 3, 3),
+ row(<<"1">>, 22, 2),
+ row(<<"2">>, 23, 3)
+ ], Out),
+
+ {ok, Doc} = fabric2_db:open_doc(Db, <<"1">>),
+ Doc2 = Doc#doc{
+ body = {[{<<"val">>, 1}]}
+ },
+ {ok, _} = fabric2_db:update_doc(Db, Doc2),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN2),
+
+ ?assertEqual([
+ row(<<"2">>, 3, 3),
+ row(<<"2">>, 23, 3)
+ ], Out1).
+
+
+index_autoupdater_callback(Db) ->
+ DDoc = create_ddoc(),
+ Doc1 = doc(0),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+
+ DbSeq = fabric2_db:get_update_seq(Db),
+
+ Result = couch_views:build_indices(Db, [DDoc]),
+ ?assertMatch([{ok, <<_/binary>>}], Result),
+ [{ok, JobId}] = Result,
+
+ ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DDoc#doc.id, DbSeq)).
+
+
+multiple_design_docs(Db) ->
+ Cleanup = fun() ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ DDocs = fabric2_db:get_design_docs(Db),
+ ok = couch_views:cleanup_indices(TxDb, DDocs)
+ end)
+ end,
+
+ DDoc1 = create_ddoc(simple, <<"_design/bar1">>),
+ DDoc2 = create_ddoc(simple, <<"_design/bar2">>),
+
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, {Pos1, Rev1}} = fabric2_db:update_doc(Db, DDoc1, []),
+ ?assertEqual({ok, [row(<<"0">>, 0, 0)]}, run_query(Db, DDoc1, ?MAP_FUN1)),
+
+ % Because run_query/3 can return, and unsubscribe from the job,
+ % before it actually finishes, ensure we wait for the job to
+ % finish so we get a deterministic setup every time.
+ JobId = get_job_id(Db, DDoc1),
+ ?assertEqual(ok, wait_job_finished(JobId, 5000)),
+
+ % Add the second ddoc with same view as first one.
+ {ok, {Pos2, Rev2}} = fabric2_db:update_doc(Db, DDoc2, []),
+
+ DDoc1Del = DDoc1#doc{revs = {Pos1, [Rev1]}, deleted = true},
+ {ok, _} = fabric2_db:update_doc(Db, DDoc1Del, []),
+
+ Cleanup(),
+
+ % Assert that no updates are applied
+ meck:new(couch_views_fdb, [passthrough]),
+ meck:expect(couch_views_trees, update_views, fun(TxDb, Mrst, Docs) ->
+ case Docs of
+ [] -> meck:passthrough([TxDb, Mrst, Docs]);
+ [_ | _] -> erlang:error(update_triggered)
+ end
+ end),
+ ?assertEqual({ok, [row(<<"0">>, 0, 0)]}, run_query(Db, DDoc2, ?MAP_FUN1)),
+ ?assertEqual(ok, wait_job_finished(JobId, 5000)),
+
+ DDoc2Del = DDoc2#doc{revs = {Pos2, [Rev2]}, deleted = true},
+ {ok, _} = fabric2_db:update_doc(Db, DDoc2Del, []),
+
+ Cleanup(),
+
+ % After the last ddoc is deleted we should get an error
+ ?assertError({ddoc_deleted, _}, run_query(Db, DDoc2, ?MAP_FUN1)).
+
+
+handle_db_recreated_when_running(Db) ->
+ DbName = fabric2_db:name(Db),
+
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+ % To intercept job building while it is running ensure updates happen one
+ % row at a time.
+ config:set("couch_views", "batch_initial_size", "1", false),
+
+ meck_intercept_job_update(self()),
+
+ [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]),
+
+ {Indexer, _Job, _Data} = wait_indexer_update(10000),
+
+ {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId),
+ ?assertEqual(running, State),
+
+ {ok, SubId, running, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId),
+
+ ok = fabric2_db:delete(DbName, []),
+ {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+ Indexer ! continue,
+
+ ?assertMatch({
+ ?INDEX_JOB_TYPE,
+ JobId,
+ finished,
+ #{<<"error">> := <<"db_deleted">>}
+ }, couch_jobs:wait(SubId, infinity)),
+
+ {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+ reset_intercept_job_update(Indexer),
+
+ {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out2).
+
+
+handle_db_recreated_after_finished(Db) ->
+ DbName = fabric2_db:name(Db),
+
+ DDoc = create_ddoc(),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+ {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+ {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"0">>, 0, 0),
+ row(<<"1">>, 1, 1)
+ ], Out1),
+
+ ok = fabric2_db:delete(DbName, []),
+
+ ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+ {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+ {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+ {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+ ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+ {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out2).
+
+
+index_can_recover_from_crash(Db) ->
+ ok = meck:new(config, [passthrough]),
+ ok = meck:expect(config, get_integer, fun(Section, Key, Default) ->
+ case Section == "couch_views" andalso Key == "change_limit" of
+ true -> 1;
+ _ -> Default
+ end
+ end),
+ meck:new(couch_eval, [passthrough]),
+ meck:expect(couch_eval, map_docs, fun(State, Docs) ->
+ Doc = hd(Docs),
+ case Doc#doc.id == <<"2">> of
+ true ->
+ % remove the mock so that next time the doc is processed
+ % it will work
+ meck:unload(couch_eval),
+ throw({fake_crash, test_jobs_restart});
+ false ->
+ meck:passthrough([State, Docs])
+ end
+ end),
+
+ DDoc = create_ddoc(),
+ Docs = make_docs(3),
+ {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+ {ok, _} = fabric2_db:update_docs(Db, Docs, []),
+
+ {ok, Out} = run_query(Db, DDoc, ?MAP_FUN1),
+ ?assertEqual([
+ row(<<"1">>, 1, 1),
+ row(<<"2">>, 2, 2),
+ row(<<"3">>, 3, 3)
+ ], Out).
+
+
+row(Id, Key, Value) ->
+ {row, [
+ {id, Id},
+ {key, Key},
+ {value, Value}
+ ]}.
+
+
+fold_fun({meta, _Meta}, Acc) ->
+ {ok, Acc};
+fold_fun({row, _} = Row, Acc) ->
+ {ok, [Row | Acc]};
+fold_fun(complete, Acc) ->
+ {ok, lists:reverse(Acc)}.
+
+
+create_ddoc() ->
+ create_ddoc(simple).
+
+
+create_ddoc(Type) ->
+ create_ddoc(Type, <<"_design/bar">>).
+
+
+create_ddoc(simple, DocId) when is_binary(DocId) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"views">>, {[
+ {?MAP_FUN1, {[
+ {<<"map">>, <<"function(doc) {emit(doc.val, doc.val);}">>}
+ ]}},
+ {?MAP_FUN2, {[
+ {<<"map">>, <<"function(doc) {}">>}
+ ]}}
+ ]}}
+ ]});
+
+create_ddoc(multi_emit_different, DocId) when is_binary(DocId) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"views">>, {[
+ {?MAP_FUN1, {[
+ {<<"map">>, <<"function(doc) { "
+ "emit(doc._id, doc._id); "
+ "emit(doc.val, doc.val); "
+ "}">>}
+ ]}},
+ {?MAP_FUN2, {[
+ {<<"map">>, <<"function(doc) {}">>}
+ ]}}
+ ]}}
+ ]});
+
+create_ddoc(multi_emit_same, DocId) when is_binary(DocId) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"views">>, {[
+ {?MAP_FUN1, {[
+ {<<"map">>, <<"function(doc) { "
+ "emit(doc.val, doc.val * 2); "
+ "emit(doc.val, doc.val); "
+ "if(doc.extra) {"
+ " emit(doc.val, doc.extra);"
+ "}"
+ "}">>}
+ ]}},
+ {?MAP_FUN2, {[
+ {<<"map">>, <<"function(doc) {}">>}
+ ]}}
+ ]}}
+ ]});
+
+create_ddoc(multi_emit_key_limit, DocId) when is_binary(DocId) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, DocId},
+ {<<"views">>, {[
+ {?MAP_FUN1, {[
+ {<<"map">>, <<"function(doc) { "
+ "if (doc.val === 1) { "
+ "emit('a very long string to be limited', doc.val);"
+ "} else {"
+ "emit(doc.val, doc.val)"
+ "}"
+ "}">>}
+ ]}},
+ {?MAP_FUN2, {[
+ {<<"map">>, <<"function(doc) { "
+ "emit(doc.val + 20, doc.val);"
+ "if (doc.val === 1) { "
+ "emit(doc.val, 'a very long string to be limited');"
+ "} else {"
+ "emit(doc.val, doc.val)"
+ "}"
+ "}">>}
+ ]}}
+ ]}}
+ ]}).
+
+
+make_docs(Count) ->
+ [doc(I) || I <- lists:seq(1, Count)].
+
+
+doc(Id) ->
+ doc(Id, Id).
+
+
+doc(Id, Val) ->
+ couch_doc:from_json_obj({[
+ {<<"_id">>, list_to_binary(integer_to_list(Id))},
+ {<<"val">>, Val}
+ ]}).
+
+
+run_query(#{} = Db, DDoc, <<_/binary>> = View) ->
+ couch_views:query(Db, DDoc, View, fun fold_fun/2, [], #mrargs{}).
+
+
+get_job_id(#{} = Db, DDoc) ->
+ DbName = fabric2_db:name(Db),
+ {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
+ couch_views_jobs:job_id(Db, Mrst).
+
+
+wait_job_finished(JobId, Timeout) ->
+ case couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId) of
+ {ok, Sub, _, _} ->
+ case couch_jobs:wait(Sub, finished, Timeout) of
+ {?INDEX_JOB_TYPE, _, _, _} -> ok;
+ timeout -> timeout
+ end;
+ {ok, finished, _} ->
+ ok
+ end.
+
+
+meck_intercept_job_update(ParentPid) ->
+ meck:new(couch_jobs, [passthrough]),
+ meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+ ParentPid ! {self(), Job, Data},
+ receive continue -> ok end,
+ meck:passthrough([Db, Job, Data])
+ end).
+
+
+reset_intercept_job_update(IndexerPid) ->
+ meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+ meck:passthrough([Db, Job, Data])
+ end),
+ IndexerPid ! continue.
+
+
+wait_indexer_update(Timeout) ->
+ receive
+ {Pid, Job, Data} when is_pid(Pid) -> {Pid, Job, Data}
+ after Timeout ->
+ error(timeout_in_wait_indexer_update)
+ end.