diff options
Diffstat (limited to 'src/couch/test/eunit/couch_changes_tests.erl')
-rw-r--r-- | src/couch/test/eunit/couch_changes_tests.erl | 1001 |
1 files changed, 1001 insertions, 0 deletions
diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl new file mode 100644 index 000000000..0c2f5f91f --- /dev/null +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -0,0 +1,1001 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_changes_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(TIMEOUT, 6000). +-define(TEST_TIMEOUT, 10000). + +-record(row, { + id, + seq, + deleted = false, + doc = nil +}). + +setup() -> + DbName = ?tempdb(), + {ok, Db} = create_db(DbName), + Revs = [R || {ok, R} <- [ + save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), + save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}) + ]], + Rev = lists:nth(3, Revs), + couch_db:ensure_full_commit(Db), + {ok, Db1} = couch_db:reopen(Db), + + {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}), + Revs1 = Revs ++ [Rev1], + Revs2 = Revs1 ++ [R || {ok, R} <- [ + save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}), + save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]}) + ]], + config:set("native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist=false), + {DbName, list_to_tuple(Revs2)}. + +teardown({DbName, _}) -> + config:delete("native_query_servers", "erlang", _Persist=false), + delete_db(DbName), + ok. + + +changes_test_() -> + { + "Changes feed", + { + setup, + fun test_util:start_couch/0, fun test_util:stop_couch/1, + [ + filter_by_selector(), + filter_by_doc_id(), + filter_by_design(), + continuous_feed(), + %%filter_by_custom_function() + filter_by_filter_function(), + filter_by_view() + ] + } + }. + +filter_by_doc_id() -> + { + "Filter _doc_id", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_filter_by_specific_doc_ids/1, + fun should_filter_by_specific_doc_ids_descending/1, + fun should_filter_by_specific_doc_ids_with_since/1, + fun should_filter_by_specific_doc_ids_no_result/1, + fun should_handle_deleted_docs/1 + ] + } + }. + +filter_by_selector() -> + { + "Filter _selector", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_select_basic/1, + fun should_select_with_since/1, + fun should_select_when_no_result/1, + fun should_select_with_deleted_docs/1, + fun should_select_with_continuous/1, + fun should_stop_selector_when_db_deleted/1, + fun should_select_with_empty_fields/1, + fun should_select_with_fields/1 + ] + } + }. + + +filter_by_design() -> + { + "Filter _design", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_emit_only_design_documents/1 + ] + } + }. + +%% filter_by_custom_function() -> +%% { +%% "Filter function", +%% { +%% foreach, +%% fun setup/0, fun teardown/1, +%% [ +%% fun should_receive_heartbeats/1 +%% ] +%% } +%% }. + +filter_by_filter_function() -> + { + "Filter by filters", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_filter_by_doc_attribute/1, + fun should_filter_by_user_ctx/1 + ] + } + }. + +filter_by_view() -> + { + "Filter _view", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_filter_by_view/1, + fun should_filter_by_fast_view/1, + fun should_filter_by_erlang_view/1 + ] + } + }. + +continuous_feed() -> + { + "Continuous Feed", + { + foreach, + fun setup/0, fun teardown/1, + [ + fun should_filter_continuous_feed_by_specific_doc_ids/1, + fun should_end_changes_when_db_deleted/1 + ] + } + }. + + +should_filter_by_specific_doc_ids({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{ + filter = "_doc_ids" + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(2, length(Rows)), + [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, + ?assertEqual(<<"doc4">>, Id1), + ?assertEqual(4, Seq1), + ?assertEqual(<<"doc3">>, Id2), + ?assertEqual(6, Seq2), + ?assertEqual(UpSeq, LastSeq) + end). + +should_filter_by_specific_doc_ids_descending({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{ + filter = "_doc_ids", + dir = rev + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(2, length(Rows)), + [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, + ?assertEqual(<<"doc3">>, Id1), + ?assertEqual(6, Seq1), + ?assertEqual(<<"doc4">>, Id2), + ?assertEqual(4, Seq2), + ?assertEqual(4, LastSeq) + end). + +should_filter_by_specific_doc_ids_with_since({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{ + filter = "_doc_ids", + since = 5 + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(1, length(Rows)), + [#row{seq = Seq1, id = Id1}] = Rows, + ?assertEqual(<<"doc3">>, Id1), + ?assertEqual(6, Seq1), + ?assertEqual(UpSeq, LastSeq) + end). + +should_filter_by_specific_doc_ids_no_result({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{ + filter = "_doc_ids", + since = 6 + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(0, length(Rows)), + ?assertEqual(UpSeq, LastSeq) + end). + +should_handle_deleted_docs({DbName, Revs}) -> + ?_test( + begin + Rev3_2 = element(6, Revs), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = save_doc( + Db, + {[{<<"_id">>, <<"doc3">>}, + {<<"_deleted">>, true}, + {<<"_rev">>, Rev3_2}]}), + + ChArgs = #changes_args{ + filter = "_doc_ids", + since = 9 + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(1, length(Rows)), + ?assertMatch( + [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], + Rows + ), + ?assertEqual(11, LastSeq) + end). + +should_filter_continuous_feed_by_specific_doc_ids({DbName, Revs}) -> + ?_test( + begin + {ok, Db} = couch_db:open_int(DbName, []), + ChangesArgs = #changes_args{ + filter = "_doc_ids", + feed = "continuous" + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + reset_row_notifications(), + Consumer = spawn_consumer(DbName, ChangesArgs, Req), + ?assertEqual(ok, wait_row_notifications(2)), + ok = pause(Consumer), + + Rows = get_rows(Consumer), + ?assertEqual(2, length(Rows)), + [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, + ?assertEqual(<<"doc4">>, Id1), + ?assertEqual(4, Seq1), + ?assertEqual(<<"doc3">>, Id2), + ?assertEqual(6, Seq2), + + clear_rows(Consumer), + {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + ?assertEqual([], get_rows(Consumer)), + + Rev4 = element(4, Revs), + Rev3_2 = element(6, Revs), + {ok, Rev4_2} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, + {<<"_rev">>, Rev4}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, + {<<"_rev">>, Rev4_2}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + {ok, Rev3_3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, + {<<"_rev">>, Rev3_2}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(2)), + ok = pause(Consumer), + + NewRows = get_rows(Consumer), + ?assertEqual(2, length(NewRows)), + [Row14, Row16] = NewRows, + ?assertEqual(<<"doc4">>, Row14#row.id), + ?assertEqual(15, Row14#row.seq), + ?assertEqual(<<"doc3">>, Row16#row.id), + ?assertEqual(17, Row16#row.seq), + + clear_rows(Consumer), + {ok, _Rev3_4} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, + {<<"_rev">>, Rev3_3}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + + FinalRows = get_rows(Consumer), + + ok = unpause(Consumer), + stop_consumer(Consumer), + + ?assertMatch([#row{seq = 18, id = <<"doc3">>}], FinalRows) + end). + + +should_end_changes_when_db_deleted({DbName, _Revs}) -> + ?_test(begin + {ok, _Db} = couch_db:open_int(DbName, []), + ChangesArgs = #changes_args{ + filter = "_doc_ids", + feed = "continuous" + }, + DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], + Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, + Consumer = spawn_consumer(DbName, ChangesArgs, Req), + ok = pause(Consumer), + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + ok = unpause(Consumer), + {_Rows, _LastSeq} = wait_finished(Consumer), + stop_consumer(Consumer), + ok + end). + + +should_select_basic({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_with_since({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector", since = 9}, + GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]}, + Selector = {[{<<"_id">>, GteDoc2}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc8">>, Id), + ?assertEqual(10, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_when_no_result({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"nopers">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(0, length(Rows)), + ?assertEqual(UpSeq, LastSeq) + end). + +should_select_with_deleted_docs({DbName, Revs}) -> + ?_test( + begin + Rev3_2 = element(6, Revs), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, _} = save_doc( + Db, + {[{<<"_id">>, <<"doc3">>}, + {<<"_deleted">>, true}, + {<<"_rev">>, Rev3_2}]}), + ChArgs = #changes_args{filter = "_selector"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req), + ?assertMatch( + [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}], + Rows + ), + ?assertEqual(11, LastSeq) + end). + +should_select_with_continuous({DbName, Revs}) -> + ?_test( + begin + {ok, Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]}, + Selector = {[{<<"_id">>, GteDoc8}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + reset_row_notifications(), + Consumer = spawn_consumer(DbName, ChArgs, Req), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + Rows = get_rows(Consumer), + ?assertMatch( + [#row{seq = 10, id = <<"doc8">>, deleted = false}], + Rows + ), + clear_rows(Consumer), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}), + ok = unpause(Consumer), + timer:sleep(100), + ok = pause(Consumer), + ?assertEqual([], get_rows(Consumer)), + Rev4 = element(4, Revs), + Rev8 = element(10, Revs), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}, + {<<"_rev">>, Rev8}]}), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, + {<<"_rev">>, Rev4}]}), + reset_row_notifications(), + ok = unpause(Consumer), + ?assertEqual(ok, wait_row_notifications(1)), + ok = pause(Consumer), + NewRows = get_rows(Consumer), + ?assertMatch( + [#row{seq = _, id = <<"doc8">>, deleted = false}], + NewRows + ) + end). + +should_stop_selector_when_db_deleted({DbName, _Revs}) -> + ?_test( + begin + {ok, _Db} = couch_db:open_int(DbName, []), + ChArgs = #changes_args{filter = "_selector", feed = "continuous"}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}]}}, + Consumer = spawn_consumer(DbName, ChArgs, Req), + ok = pause(Consumer), + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + ok = unpause(Consumer), + {_Rows, _LastSeq} = wait_finished(Consumer), + stop_consumer(Consumer), + ok + end). + + +should_select_with_empty_fields({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector", include_docs=true}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}, + {<<"fields">>, []}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id, doc = Doc}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq), + ?assertMatch({[{_K1, _V1}, {_K2, _V2}]}, Doc) + end). + +should_select_with_fields({DbName, _}) -> + ?_test( + begin + ChArgs = #changes_args{filter = "_selector", include_docs=true}, + Selector = {[{<<"_id">>, <<"doc3">>}]}, + Req = {json_req, {[{<<"selector">>, Selector}, + {<<"fields">>, [<<"_id">>, <<"nope">>]}]}}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id, doc = Doc}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq), + ?assertMatch(Doc, {[{<<"_id">>, <<"doc3">>}]}) + end). + + +should_emit_only_design_documents({DbName, Revs}) -> + ?_test( + begin + ChArgs = #changes_args{ + filter = "_design" + }, + Req = {json_req, null}, + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + + ?assertEqual(1, length(Rows)), + ?assertEqual(UpSeq, LastSeq), + ?assertEqual([#row{seq = 8, id = <<"_design/foo">>}], Rows), + + + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + {ok, _} = save_doc(Db, {[{<<"_id">>, <<"_design/foo">>}, + {<<"_rev">>, element(8, Revs)}, + {<<"_deleted">>, true}]}), + + couch_db:close(Db), + {Rows2, LastSeq2, _} = run_changes_query(DbName, ChArgs, Req), + + UpSeq2 = UpSeq + 1, + + ?assertEqual(1, length(Rows2)), + ?assertEqual(UpSeq2, LastSeq2), + ?assertEqual([#row{seq = 11, + id = <<"_design/foo">>, + deleted = true}], + Rows2) + end). + +%% should_receive_heartbeats(_) -> +%% {timeout, ?TEST_TIMEOUT div 1000, +%% ?_test( +%% begin +%% DbName = ?tempdb(), +%% Timeout = 100, +%% {ok, Db} = create_db(DbName), + +%% {ok, _} = save_doc(Db, {[ +%% {<<"_id">>, <<"_design/filtered">>}, +%% {<<"language">>, <<"javascript">>}, +%% {<<"filters">>, {[ +%% {<<"foo">>, <<"function(doc) { +%% return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}">> +%% }]}} +%% ]}), + +%% ChangesArgs = #changes_args{ +%% filter = "filtered/foo", +%% feed = "continuous", +%% timeout = 10000, +%% heartbeat = 1000 +%% }, +%% Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}), + +%% {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), + +%% Heartbeats = get_heartbeats(Consumer), +%% ?assert(Heartbeats > 0), + +%% {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), + +%% Heartbeats2 = get_heartbeats(Consumer), +%% ?assert(Heartbeats2 > Heartbeats), + +%% Rows = get_rows(Consumer), +%% ?assertEqual(3, length(Rows)), + +%% {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), +%% timer:sleep(Timeout), +%% {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), +%% timer:sleep(Timeout), + +%% Heartbeats3 = get_heartbeats(Consumer), +%% ?assert(Heartbeats3 > Heartbeats2) +%% end)}. + +should_filter_by_doc_attribute({DbName, _}) -> + ?_test( + begin + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, {[ + {<<"valid">>, <<"function(doc, req) {" + " if (doc._id == 'doc3') {" + " return true; " + "} }">>} + ]}} + ]}), + ChArgs = #changes_args{filter = "app/valid"}, + Req = {json_req, null}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_filter_by_user_ctx({DbName, _}) -> + ?_test( + begin + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"filters">>, {[ + {<<"valid">>, <<"function(doc, req) {" + " if (req.userCtx.name == doc._id) {" + " return true; " + "} }">>} + ]}} + ]}), + ChArgs = #changes_args{filter = "app/valid"}, + UserCtx = #user_ctx{name = <<"doc3">>, roles = []}, + {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx), + Req = {json_req, {[{ + <<"userCtx">>, couch_util:json_user_ctx(DbRec) + }]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_filter_by_view({DbName, _}) -> + ?_test( + begin + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"views">>, {[ + {<<"valid">>, {[ + {<<"map">>, <<"function(doc) {" + " if (doc._id == 'doc3') {" + " emit(doc); " + "} }">>} + ]}} + ]}} + ]}), + ChArgs = #changes_args{filter = "_view"}, + Req = {json_req, {[{ + <<"query">>, {[ + {<<"view">>, <<"app/valid">>} + ]} + }]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +should_filter_by_fast_view({DbName, _}) -> + ?_test( + begin + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"javascript">>}, + {<<"options">>, {[{<<"seq_indexed">>, true}]}}, + {<<"views">>, {[ + {<<"valid">>, {[ + {<<"map">>, <<"function(doc) {" + " if (doc._id == 'doc3') {" + " emit(doc); " + "} }">>} + ]}} + ]}} + ]}), + ChArgs = #changes_args{filter = "_view"}, + Req = {json_req, {[{ + <<"query">>, {[ + {<<"view">>, <<"app/valid">>} + ]} + }]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + {ok, Db} = couch_db:open_int(DbName, []), + {ok, ViewInfo} = couch_mrview:get_view_info(Db, DDoc, <<"valid">>), + {update_seq, ViewUpSeq} = lists:keyfind(update_seq, 1, ViewInfo), + couch_db:close(Db), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(LastSeq, Seq), + ?assertEqual(UpSeq, ViewUpSeq) + end). + +should_filter_by_erlang_view({DbName, _}) -> + ?_test( + begin + DDocId = <<"_design/app">>, + DDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DDocId}, + {<<"language">>, <<"erlang">>}, + {<<"views">>, {[ + {<<"valid">>, {[ + {<<"map">>, <<"fun({Doc}) ->" + " case lists:keyfind(<<\"_id\">>, 1, Doc) of" + " {<<\"_id\">>, <<\"doc3\">>} -> Emit(Doc, null); " + " false -> ok" + " end " + "end.">>} + ]}} + ]}} + ]}), + ChArgs = #changes_args{filter = "_view"}, + Req = {json_req, {[{ + <<"query">>, {[ + {<<"view">>, <<"app/valid">>} + ]} + }]}}, + ok = update_ddoc(DbName, DDoc), + {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), + ?assertEqual(1, length(Rows)), + [#row{seq = Seq, id = Id}] = Rows, + ?assertEqual(<<"doc3">>, Id), + ?assertEqual(6, Seq), + ?assertEqual(UpSeq, LastSeq) + end). + +update_ddoc(DbName, DDoc) -> + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), + {ok, _} = couch_db:update_doc(Db, DDoc, []), + couch_db:close(Db). + +run_changes_query(DbName, ChangesArgs, Opts) -> + Consumer = spawn_consumer(DbName, ChangesArgs, Opts), + {Rows, LastSeq} = wait_finished(Consumer), + {ok, Db} = couch_db:open_int(DbName, []), + UpSeq = couch_db:get_update_seq(Db), + couch_db:close(Db), + stop_consumer(Consumer), + {Rows, LastSeq, UpSeq}. + +save_doc(Db, Json) -> + Doc = couch_doc:from_json_obj(Json), + {ok, Rev} = couch_db:update_doc(Db, Doc, []), + {ok, couch_doc:rev_to_str(Rev)}. + +get_rows({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {get_rows, Ref}, + Resp = receive + {rows, Ref, Rows} -> + Rows + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + +%% get_heartbeats({Consumer, _}) -> +%% Ref = make_ref(), +%% Consumer ! {get_heartbeats, Ref}, +%% Resp = receive +%% {hearthbeats, Ref, HeartBeats} -> +%% HeartBeats +%% after ?TIMEOUT -> +%% timeout +%% end, +%% ?assertNotEqual(timeout, Resp), +%% Resp. + +clear_rows({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {reset, Ref}, + Resp = receive + {ok, Ref} -> + ok + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + +stop_consumer({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {stop, Ref}, + Resp = receive + {ok, Ref} -> + ok + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + +pause({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {pause, Ref}, + Resp = receive + {paused, Ref} -> + ok + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + +unpause({Consumer, _}) -> + Ref = make_ref(), + Consumer ! {continue, Ref}, + Resp = receive + {ok, Ref} -> + ok + after ?TIMEOUT -> + timeout + end, + ?assertNotEqual(timeout, Resp), + Resp. + +wait_finished({_, ConsumerRef}) -> + receive + {consumer_finished, Rows, LastSeq} -> + {Rows, LastSeq}; + {'DOWN', ConsumerRef, _, _, Msg} when Msg == normal; Msg == ok -> + ok; + {'DOWN', ConsumerRef, _, _, Msg} -> + erlang:error({consumer_died, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, Msg} + ]}) + after ?TIMEOUT -> + erlang:error({consumer_died, [ + {module, ?MODULE}, + {line, ?LINE}, + {value, timeout} + ]}) + end. + + +reset_row_notifications() -> + receive + row -> + reset_row_notifications() + after 0 -> + ok + end. + + +wait_row_notifications(N) -> + receive + row when N == 1 -> + ok; + row when N > 1 -> + wait_row_notifications(N - 1) + after ?TIMEOUT -> + timeout + end. + + +spawn_consumer(DbName, ChangesArgs0, Req) -> + Parent = self(), + spawn_monitor(fun() -> + put(heartbeat_count, 0), + Callback = fun + ({change, {Change}, _}, _, Acc) -> + Id = couch_util:get_value(<<"id">>, Change), + Seq = couch_util:get_value(<<"seq">>, Change), + Del = couch_util:get_value(<<"deleted">>, Change, false), + Doc = couch_util:get_value(doc, Change, nil), + Parent ! row, + [#row{id = Id, seq = Seq, deleted = Del, doc = Doc} | Acc]; + ({stop, LastSeq}, _, Acc) -> + Parent ! {consumer_finished, lists:reverse(Acc), LastSeq}, + stop_loop(Parent, Acc); + (timeout, _, Acc) -> + put(heartbeat_count, get(heartbeat_count) + 1), + maybe_pause(Parent, Acc); + (_, _, Acc) -> + maybe_pause(Parent, Acc) + end, + {ok, Db} = couch_db:open_int(DbName, []), + ChangesArgs = case (ChangesArgs0#changes_args.timeout =:= undefined) + andalso (ChangesArgs0#changes_args.heartbeat =:= undefined) of + true -> + ChangesArgs0#changes_args{timeout = 1000, heartbeat = 100}; + false -> + ChangesArgs0 + end, + FeedFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db), + try + FeedFun({Callback, []}) + catch + throw:{stop, _} -> ok; + _:Error -> exit(Error) + after + couch_db:close(Db) + end + end). + +maybe_pause(Parent, Acc) -> + receive + {get_rows, Ref} -> + Parent ! {rows, Ref, lists:reverse(Acc)}, + maybe_pause(Parent, Acc); + {get_heartbeats, Ref} -> + Parent ! {hearthbeats, Ref, get(heartbeat_count)}, + maybe_pause(Parent, Acc); + {reset, Ref} -> + Parent ! {ok, Ref}, + maybe_pause(Parent, []); + {pause, Ref} -> + Parent ! {paused, Ref}, + pause_loop(Parent, Acc); + {stop, Ref} -> + Parent ! {ok, Ref}, + throw({stop, Acc}); + V when V /= updated -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {value, V}, + {reason, "Received unexpected message"}]}) + after 0 -> + Acc + end. + +pause_loop(Parent, Acc) -> + receive + {stop, Ref} -> + Parent ! {ok, Ref}, + throw({stop, Acc}); + {reset, Ref} -> + Parent ! {ok, Ref}, + pause_loop(Parent, []); + {continue, Ref} -> + Parent ! {ok, Ref}, + Acc; + {get_rows, Ref} -> + Parent ! {rows, Ref, lists:reverse(Acc)}, + pause_loop(Parent, Acc) + end. + +stop_loop(Parent, Acc) -> + receive + {get_rows, Ref} -> + Parent ! {rows, Ref, lists:reverse(Acc)}, + stop_loop(Parent, Acc); + {stop, Ref} -> + Parent ! {ok, Ref}, + Acc + end. + +create_db(DbName) -> + couch_db:create(DbName, [?ADMIN_CTX, overwrite]). + +delete_db(DbName) -> + couch_server:delete(DbName, [?ADMIN_CTX]). |