summaryrefslogtreecommitdiff
path: root/src/couch/test/eunit/couch_changes_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/test/eunit/couch_changes_tests.erl')
-rw-r--r--src/couch/test/eunit/couch_changes_tests.erl1001
1 files changed, 1001 insertions, 0 deletions
diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl
new file mode 100644
index 000000000..0c2f5f91f
--- /dev/null
+++ b/src/couch/test/eunit/couch_changes_tests.erl
@@ -0,0 +1,1001 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_changes_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TIMEOUT, 6000).
+-define(TEST_TIMEOUT, 10000).
+
+-record(row, {
+ id,
+ seq,
+ deleted = false,
+ doc = nil
+}).
+
+setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = create_db(DbName),
+ Revs = [R || {ok, R} <- [
+ save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+ save_doc(Db, {[{<<"_id">>, <<"doc5">>}]})
+ ]],
+ Rev = lists:nth(3, Revs),
+ couch_db:ensure_full_commit(Db),
+ {ok, Db1} = couch_db:reopen(Db),
+
+ {ok, Rev1} = save_doc(Db1, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev}]}),
+ Revs1 = Revs ++ [Rev1],
+ Revs2 = Revs1 ++ [R || {ok, R} <- [
+ save_doc(Db1, {[{<<"_id">>, <<"doc6">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"_design/foo">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"doc7">>}]}),
+ save_doc(Db1, {[{<<"_id">>, <<"doc8">>}]})
+ ]],
+ config:set("native_query_servers", "erlang", "{couch_native_process, start_link, []}", _Persist=false),
+ {DbName, list_to_tuple(Revs2)}.
+
+teardown({DbName, _}) ->
+ config:delete("native_query_servers", "erlang", _Persist=false),
+ delete_db(DbName),
+ ok.
+
+
+changes_test_() ->
+ {
+ "Changes feed",
+ {
+ setup,
+ fun test_util:start_couch/0, fun test_util:stop_couch/1,
+ [
+ filter_by_selector(),
+ filter_by_doc_id(),
+ filter_by_design(),
+ continuous_feed(),
+ %%filter_by_custom_function()
+ filter_by_filter_function(),
+ filter_by_view()
+ ]
+ }
+ }.
+
+filter_by_doc_id() ->
+ {
+ "Filter _doc_id",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_filter_by_specific_doc_ids/1,
+ fun should_filter_by_specific_doc_ids_descending/1,
+ fun should_filter_by_specific_doc_ids_with_since/1,
+ fun should_filter_by_specific_doc_ids_no_result/1,
+ fun should_handle_deleted_docs/1
+ ]
+ }
+ }.
+
+filter_by_selector() ->
+ {
+ "Filter _selector",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_select_basic/1,
+ fun should_select_with_since/1,
+ fun should_select_when_no_result/1,
+ fun should_select_with_deleted_docs/1,
+ fun should_select_with_continuous/1,
+ fun should_stop_selector_when_db_deleted/1,
+ fun should_select_with_empty_fields/1,
+ fun should_select_with_fields/1
+ ]
+ }
+ }.
+
+
+filter_by_design() ->
+ {
+ "Filter _design",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_emit_only_design_documents/1
+ ]
+ }
+ }.
+
+%% filter_by_custom_function() ->
+%% {
+%% "Filter function",
+%% {
+%% foreach,
+%% fun setup/0, fun teardown/1,
+%% [
+%% fun should_receive_heartbeats/1
+%% ]
+%% }
+%% }.
+
+filter_by_filter_function() ->
+ {
+ "Filter by filters",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_filter_by_doc_attribute/1,
+ fun should_filter_by_user_ctx/1
+ ]
+ }
+ }.
+
+filter_by_view() ->
+ {
+ "Filter _view",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_filter_by_view/1,
+ fun should_filter_by_fast_view/1,
+ fun should_filter_by_erlang_view/1
+ ]
+ }
+ }.
+
+continuous_feed() ->
+ {
+ "Continuous Feed",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_filter_continuous_feed_by_specific_doc_ids/1,
+ fun should_end_changes_when_db_deleted/1
+ ]
+ }
+ }.
+
+
+should_filter_by_specific_doc_ids({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{
+ filter = "_doc_ids"
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(2, length(Rows)),
+ [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows,
+ ?assertEqual(<<"doc4">>, Id1),
+ ?assertEqual(4, Seq1),
+ ?assertEqual(<<"doc3">>, Id2),
+ ?assertEqual(6, Seq2),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_filter_by_specific_doc_ids_descending({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{
+ filter = "_doc_ids",
+ dir = rev
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(2, length(Rows)),
+ [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows,
+ ?assertEqual(<<"doc3">>, Id1),
+ ?assertEqual(6, Seq1),
+ ?assertEqual(<<"doc4">>, Id2),
+ ?assertEqual(4, Seq2),
+ ?assertEqual(4, LastSeq)
+ end).
+
+should_filter_by_specific_doc_ids_with_since({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{
+ filter = "_doc_ids",
+ since = 5
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq1, id = Id1}] = Rows,
+ ?assertEqual(<<"doc3">>, Id1),
+ ?assertEqual(6, Seq1),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_filter_by_specific_doc_ids_no_result({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{
+ filter = "_doc_ids",
+ since = 6
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(0, length(Rows)),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_handle_deleted_docs({DbName, Revs}) ->
+ ?_test(
+ begin
+ Rev3_2 = element(6, Revs),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, _} = save_doc(
+ Db,
+ {[{<<"_id">>, <<"doc3">>},
+ {<<"_deleted">>, true},
+ {<<"_rev">>, Rev3_2}]}),
+
+ ChArgs = #changes_args{
+ filter = "_doc_ids",
+ since = 9
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(1, length(Rows)),
+ ?assertMatch(
+ [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}],
+ Rows
+ ),
+ ?assertEqual(11, LastSeq)
+ end).
+
+should_filter_continuous_feed_by_specific_doc_ids({DbName, Revs}) ->
+ ?_test(
+ begin
+ {ok, Db} = couch_db:open_int(DbName, []),
+ ChangesArgs = #changes_args{
+ filter = "_doc_ids",
+ feed = "continuous"
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ reset_row_notifications(),
+ Consumer = spawn_consumer(DbName, ChangesArgs, Req),
+ ?assertEqual(ok, wait_row_notifications(2)),
+ ok = pause(Consumer),
+
+ Rows = get_rows(Consumer),
+ ?assertEqual(2, length(Rows)),
+ [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows,
+ ?assertEqual(<<"doc4">>, Id1),
+ ?assertEqual(4, Seq1),
+ ?assertEqual(<<"doc3">>, Id2),
+ ?assertEqual(6, Seq2),
+
+ clear_rows(Consumer),
+ {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}),
+ {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}),
+ ok = unpause(Consumer),
+ timer:sleep(100),
+ ok = pause(Consumer),
+ ?assertEqual([], get_rows(Consumer)),
+
+ Rev4 = element(4, Revs),
+ Rev3_2 = element(6, Revs),
+ {ok, Rev4_2} = save_doc(Db, {[{<<"_id">>, <<"doc4">>},
+ {<<"_rev">>, Rev4}]}),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>},
+ {<<"_rev">>, Rev4_2}]}),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}),
+ {ok, Rev3_3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>},
+ {<<"_rev">>, Rev3_2}]}),
+ reset_row_notifications(),
+ ok = unpause(Consumer),
+ ?assertEqual(ok, wait_row_notifications(2)),
+ ok = pause(Consumer),
+
+ NewRows = get_rows(Consumer),
+ ?assertEqual(2, length(NewRows)),
+ [Row14, Row16] = NewRows,
+ ?assertEqual(<<"doc4">>, Row14#row.id),
+ ?assertEqual(15, Row14#row.seq),
+ ?assertEqual(<<"doc3">>, Row16#row.id),
+ ?assertEqual(17, Row16#row.seq),
+
+ clear_rows(Consumer),
+ {ok, _Rev3_4} = save_doc(Db, {[{<<"_id">>, <<"doc3">>},
+ {<<"_rev">>, Rev3_3}]}),
+ reset_row_notifications(),
+ ok = unpause(Consumer),
+ ?assertEqual(ok, wait_row_notifications(1)),
+ ok = pause(Consumer),
+
+ FinalRows = get_rows(Consumer),
+
+ ok = unpause(Consumer),
+ stop_consumer(Consumer),
+
+ ?assertMatch([#row{seq = 18, id = <<"doc3">>}], FinalRows)
+ end).
+
+
+should_end_changes_when_db_deleted({DbName, _Revs}) ->
+ ?_test(begin
+ {ok, _Db} = couch_db:open_int(DbName, []),
+ ChangesArgs = #changes_args{
+ filter = "_doc_ids",
+ feed = "continuous"
+ },
+ DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
+ Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
+ Consumer = spawn_consumer(DbName, ChangesArgs, Req),
+ ok = pause(Consumer),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok = unpause(Consumer),
+ {_Rows, _LastSeq} = wait_finished(Consumer),
+ stop_consumer(Consumer),
+ ok
+ end).
+
+
+should_select_basic({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_with_since({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector", since = 9},
+ GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]},
+ Selector = {[{<<"_id">>, GteDoc2}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc8">>, Id),
+ ?assertEqual(10, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_when_no_result({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"nopers">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(0, length(Rows)),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_with_deleted_docs({DbName, Revs}) ->
+ ?_test(
+ begin
+ Rev3_2 = element(6, Revs),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, _} = save_doc(
+ Db,
+ {[{<<"_id">>, <<"doc3">>},
+ {<<"_deleted">>, true},
+ {<<"_rev">>, Rev3_2}]}),
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ {Rows, LastSeq, _} = run_changes_query(DbName, ChArgs, Req),
+ ?assertMatch(
+ [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}],
+ Rows
+ ),
+ ?assertEqual(11, LastSeq)
+ end).
+
+should_select_with_continuous({DbName, Revs}) ->
+ ?_test(
+ begin
+ {ok, Db} = couch_db:open_int(DbName, []),
+ ChArgs = #changes_args{filter = "_selector", feed = "continuous"},
+ GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]},
+ Selector = {[{<<"_id">>, GteDoc8}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ reset_row_notifications(),
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ ?assertEqual(ok, wait_row_notifications(1)),
+ ok = pause(Consumer),
+ Rows = get_rows(Consumer),
+ ?assertMatch(
+ [#row{seq = 10, id = <<"doc8">>, deleted = false}],
+ Rows
+ ),
+ clear_rows(Consumer),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}),
+ ok = unpause(Consumer),
+ timer:sleep(100),
+ ok = pause(Consumer),
+ ?assertEqual([], get_rows(Consumer)),
+ Rev4 = element(4, Revs),
+ Rev8 = element(10, Revs),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc8">>},
+ {<<"_rev">>, Rev8}]}),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>},
+ {<<"_rev">>, Rev4}]}),
+ reset_row_notifications(),
+ ok = unpause(Consumer),
+ ?assertEqual(ok, wait_row_notifications(1)),
+ ok = pause(Consumer),
+ NewRows = get_rows(Consumer),
+ ?assertMatch(
+ [#row{seq = _, id = <<"doc8">>, deleted = false}],
+ NewRows
+ )
+ end).
+
+should_stop_selector_when_db_deleted({DbName, _Revs}) ->
+ ?_test(
+ begin
+ {ok, _Db} = couch_db:open_int(DbName, []),
+ ChArgs = #changes_args{filter = "_selector", feed = "continuous"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ ok = pause(Consumer),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok = unpause(Consumer),
+ {_Rows, _LastSeq} = wait_finished(Consumer),
+ stop_consumer(Consumer),
+ ok
+ end).
+
+
+should_select_with_empty_fields({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector", include_docs=true},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector},
+ {<<"fields">>, []}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id, doc = Doc}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq),
+ ?assertMatch({[{_K1, _V1}, {_K2, _V2}]}, Doc)
+ end).
+
+should_select_with_fields({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector", include_docs=true},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector},
+ {<<"fields">>, [<<"_id">>, <<"nope">>]}]}},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id, doc = Doc}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq),
+ ?assertMatch(Doc, {[{<<"_id">>, <<"doc3">>}]})
+ end).
+
+
+should_emit_only_design_documents({DbName, Revs}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{
+ filter = "_design"
+ },
+ Req = {json_req, null},
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+
+ ?assertEqual(1, length(Rows)),
+ ?assertEqual(UpSeq, LastSeq),
+ ?assertEqual([#row{seq = 8, id = <<"_design/foo">>}], Rows),
+
+
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"_design/foo">>},
+ {<<"_rev">>, element(8, Revs)},
+ {<<"_deleted">>, true}]}),
+
+ couch_db:close(Db),
+ {Rows2, LastSeq2, _} = run_changes_query(DbName, ChArgs, Req),
+
+ UpSeq2 = UpSeq + 1,
+
+ ?assertEqual(1, length(Rows2)),
+ ?assertEqual(UpSeq2, LastSeq2),
+ ?assertEqual([#row{seq = 11,
+ id = <<"_design/foo">>,
+ deleted = true}],
+ Rows2)
+ end).
+
+%% should_receive_heartbeats(_) ->
+%% {timeout, ?TEST_TIMEOUT div 1000,
+%% ?_test(
+%% begin
+%% DbName = ?tempdb(),
+%% Timeout = 100,
+%% {ok, Db} = create_db(DbName),
+
+%% {ok, _} = save_doc(Db, {[
+%% {<<"_id">>, <<"_design/filtered">>},
+%% {<<"language">>, <<"javascript">>},
+%% {<<"filters">>, {[
+%% {<<"foo">>, <<"function(doc) {
+%% return ['doc10', 'doc11', 'doc12'].indexOf(doc._id) != -1;}">>
+%% }]}}
+%% ]}),
+
+%% ChangesArgs = #changes_args{
+%% filter = "filtered/foo",
+%% feed = "continuous",
+%% timeout = 10000,
+%% heartbeat = 1000
+%% },
+%% Consumer = spawn_consumer(DbName, ChangesArgs, {json_req, null}),
+
+%% {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}),
+
+%% Heartbeats = get_heartbeats(Consumer),
+%% ?assert(Heartbeats > 0),
+
+%% {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}),
+
+%% Heartbeats2 = get_heartbeats(Consumer),
+%% ?assert(Heartbeats2 > Heartbeats),
+
+%% Rows = get_rows(Consumer),
+%% ?assertEqual(3, length(Rows)),
+
+%% {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}),
+%% timer:sleep(Timeout),
+%% {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}),
+%% timer:sleep(Timeout),
+
+%% Heartbeats3 = get_heartbeats(Consumer),
+%% ?assert(Heartbeats3 > Heartbeats2)
+%% end)}.
+
+should_filter_by_doc_attribute({DbName, _}) ->
+ ?_test(
+ begin
+ DDocId = <<"_design/app">>,
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"filters">>, {[
+ {<<"valid">>, <<"function(doc, req) {"
+ " if (doc._id == 'doc3') {"
+ " return true; "
+ "} }">>}
+ ]}}
+ ]}),
+ ChArgs = #changes_args{filter = "app/valid"},
+ Req = {json_req, null},
+ ok = update_ddoc(DbName, DDoc),
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_filter_by_user_ctx({DbName, _}) ->
+ ?_test(
+ begin
+ DDocId = <<"_design/app">>,
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"filters">>, {[
+ {<<"valid">>, <<"function(doc, req) {"
+ " if (req.userCtx.name == doc._id) {"
+ " return true; "
+ "} }">>}
+ ]}}
+ ]}),
+ ChArgs = #changes_args{filter = "app/valid"},
+ UserCtx = #user_ctx{name = <<"doc3">>, roles = []},
+ {ok, DbRec} = couch_db:clustered_db(DbName, UserCtx),
+ Req = {json_req, {[{
+ <<"userCtx">>, couch_util:json_user_ctx(DbRec)
+ }]}},
+ ok = update_ddoc(DbName, DDoc),
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_filter_by_view({DbName, _}) ->
+ ?_test(
+ begin
+ DDocId = <<"_design/app">>,
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"views">>, {[
+ {<<"valid">>, {[
+ {<<"map">>, <<"function(doc) {"
+ " if (doc._id == 'doc3') {"
+ " emit(doc); "
+ "} }">>}
+ ]}}
+ ]}}
+ ]}),
+ ChArgs = #changes_args{filter = "_view"},
+ Req = {json_req, {[{
+ <<"query">>, {[
+ {<<"view">>, <<"app/valid">>}
+ ]}
+ }]}},
+ ok = update_ddoc(DbName, DDoc),
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_filter_by_fast_view({DbName, _}) ->
+ ?_test(
+ begin
+ DDocId = <<"_design/app">>,
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"javascript">>},
+ {<<"options">>, {[{<<"seq_indexed">>, true}]}},
+ {<<"views">>, {[
+ {<<"valid">>, {[
+ {<<"map">>, <<"function(doc) {"
+ " if (doc._id == 'doc3') {"
+ " emit(doc); "
+ "} }">>}
+ ]}}
+ ]}}
+ ]}),
+ ChArgs = #changes_args{filter = "_view"},
+ Req = {json_req, {[{
+ <<"query">>, {[
+ {<<"view">>, <<"app/valid">>}
+ ]}
+ }]}},
+ ok = update_ddoc(DbName, DDoc),
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, ViewInfo} = couch_mrview:get_view_info(Db, DDoc, <<"valid">>),
+ {update_seq, ViewUpSeq} = lists:keyfind(update_seq, 1, ViewInfo),
+ couch_db:close(Db),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(LastSeq, Seq),
+ ?assertEqual(UpSeq, ViewUpSeq)
+ end).
+
+should_filter_by_erlang_view({DbName, _}) ->
+ ?_test(
+ begin
+ DDocId = <<"_design/app">>,
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocId},
+ {<<"language">>, <<"erlang">>},
+ {<<"views">>, {[
+ {<<"valid">>, {[
+ {<<"map">>, <<"fun({Doc}) ->"
+ " case lists:keyfind(<<\"_id\">>, 1, Doc) of"
+ " {<<\"_id\">>, <<\"doc3\">>} -> Emit(Doc, null); "
+ " false -> ok"
+ " end "
+ "end.">>}
+ ]}}
+ ]}}
+ ]}),
+ ChArgs = #changes_args{filter = "_view"},
+ Req = {json_req, {[{
+ <<"query">>, {[
+ {<<"view">>, <<"app/valid">>}
+ ]}
+ }]}},
+ ok = update_ddoc(DbName, DDoc),
+ {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+update_ddoc(DbName, DDoc) ->
+ {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
+ {ok, _} = couch_db:update_doc(Db, DDoc, []),
+ couch_db:close(Db).
+
+run_changes_query(DbName, ChangesArgs, Opts) ->
+ Consumer = spawn_consumer(DbName, ChangesArgs, Opts),
+ {Rows, LastSeq} = wait_finished(Consumer),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ UpSeq = couch_db:get_update_seq(Db),
+ couch_db:close(Db),
+ stop_consumer(Consumer),
+ {Rows, LastSeq, UpSeq}.
+
+save_doc(Db, Json) ->
+ Doc = couch_doc:from_json_obj(Json),
+ {ok, Rev} = couch_db:update_doc(Db, Doc, []),
+ {ok, couch_doc:rev_to_str(Rev)}.
+
+get_rows({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {get_rows, Ref},
+ Resp = receive
+ {rows, Ref, Rows} ->
+ Rows
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
+%% get_heartbeats({Consumer, _}) ->
+%% Ref = make_ref(),
+%% Consumer ! {get_heartbeats, Ref},
+%% Resp = receive
+%% {hearthbeats, Ref, HeartBeats} ->
+%% HeartBeats
+%% after ?TIMEOUT ->
+%% timeout
+%% end,
+%% ?assertNotEqual(timeout, Resp),
+%% Resp.
+
+clear_rows({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {reset, Ref},
+ Resp = receive
+ {ok, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
+stop_consumer({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {stop, Ref},
+ Resp = receive
+ {ok, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
+pause({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {pause, Ref},
+ Resp = receive
+ {paused, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
+unpause({Consumer, _}) ->
+ Ref = make_ref(),
+ Consumer ! {continue, Ref},
+ Resp = receive
+ {ok, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ timeout
+ end,
+ ?assertNotEqual(timeout, Resp),
+ Resp.
+
+wait_finished({_, ConsumerRef}) ->
+ receive
+ {consumer_finished, Rows, LastSeq} ->
+ {Rows, LastSeq};
+ {'DOWN', ConsumerRef, _, _, Msg} when Msg == normal; Msg == ok ->
+ ok;
+ {'DOWN', ConsumerRef, _, _, Msg} ->
+ erlang:error({consumer_died, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, Msg}
+ ]})
+ after ?TIMEOUT ->
+ erlang:error({consumer_died, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {value, timeout}
+ ]})
+ end.
+
+
+reset_row_notifications() ->
+ receive
+ row ->
+ reset_row_notifications()
+ after 0 ->
+ ok
+ end.
+
+
+wait_row_notifications(N) ->
+ receive
+ row when N == 1 ->
+ ok;
+ row when N > 1 ->
+ wait_row_notifications(N - 1)
+ after ?TIMEOUT ->
+ timeout
+ end.
+
+
+spawn_consumer(DbName, ChangesArgs0, Req) ->
+ Parent = self(),
+ spawn_monitor(fun() ->
+ put(heartbeat_count, 0),
+ Callback = fun
+ ({change, {Change}, _}, _, Acc) ->
+ Id = couch_util:get_value(<<"id">>, Change),
+ Seq = couch_util:get_value(<<"seq">>, Change),
+ Del = couch_util:get_value(<<"deleted">>, Change, false),
+ Doc = couch_util:get_value(doc, Change, nil),
+ Parent ! row,
+ [#row{id = Id, seq = Seq, deleted = Del, doc = Doc} | Acc];
+ ({stop, LastSeq}, _, Acc) ->
+ Parent ! {consumer_finished, lists:reverse(Acc), LastSeq},
+ stop_loop(Parent, Acc);
+ (timeout, _, Acc) ->
+ put(heartbeat_count, get(heartbeat_count) + 1),
+ maybe_pause(Parent, Acc);
+ (_, _, Acc) ->
+ maybe_pause(Parent, Acc)
+ end,
+ {ok, Db} = couch_db:open_int(DbName, []),
+ ChangesArgs = case (ChangesArgs0#changes_args.timeout =:= undefined)
+ andalso (ChangesArgs0#changes_args.heartbeat =:= undefined) of
+ true ->
+ ChangesArgs0#changes_args{timeout = 1000, heartbeat = 100};
+ false ->
+ ChangesArgs0
+ end,
+ FeedFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db),
+ try
+ FeedFun({Callback, []})
+ catch
+ throw:{stop, _} -> ok;
+ _:Error -> exit(Error)
+ after
+ couch_db:close(Db)
+ end
+ end).
+
+maybe_pause(Parent, Acc) ->
+ receive
+ {get_rows, Ref} ->
+ Parent ! {rows, Ref, lists:reverse(Acc)},
+ maybe_pause(Parent, Acc);
+ {get_heartbeats, Ref} ->
+ Parent ! {hearthbeats, Ref, get(heartbeat_count)},
+ maybe_pause(Parent, Acc);
+ {reset, Ref} ->
+ Parent ! {ok, Ref},
+ maybe_pause(Parent, []);
+ {pause, Ref} ->
+ Parent ! {paused, Ref},
+ pause_loop(Parent, Acc);
+ {stop, Ref} ->
+ Parent ! {ok, Ref},
+ throw({stop, Acc});
+ V when V /= updated ->
+ erlang:error({assertion_failed,
+ [{module, ?MODULE},
+ {line, ?LINE},
+ {value, V},
+ {reason, "Received unexpected message"}]})
+ after 0 ->
+ Acc
+ end.
+
+pause_loop(Parent, Acc) ->
+ receive
+ {stop, Ref} ->
+ Parent ! {ok, Ref},
+ throw({stop, Acc});
+ {reset, Ref} ->
+ Parent ! {ok, Ref},
+ pause_loop(Parent, []);
+ {continue, Ref} ->
+ Parent ! {ok, Ref},
+ Acc;
+ {get_rows, Ref} ->
+ Parent ! {rows, Ref, lists:reverse(Acc)},
+ pause_loop(Parent, Acc)
+ end.
+
+stop_loop(Parent, Acc) ->
+ receive
+ {get_rows, Ref} ->
+ Parent ! {rows, Ref, lists:reverse(Acc)},
+ stop_loop(Parent, Acc);
+ {stop, Ref} ->
+ Parent ! {ok, Ref},
+ Acc
+ end.
+
+create_db(DbName) ->
+ couch_db:create(DbName, [?ADMIN_CTX, overwrite]).
+
+delete_db(DbName) ->
+ couch_server:delete(DbName, [?ADMIN_CTX]).