diff options
author | Eric Avdey <eiri@eiri.ca> | 2019-12-03 19:47:08 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-03 19:47:08 -0400 |
commit | 87edbaecd2cdb7f9266364e5fec65e31fe1fc251 (patch) | |
tree | 26a73b91892baed89f40b9f4b9f91df0868555f7 | |
parent | 08a9a5ffe6b2dcfeb480c4cdb15d60ececea8442 (diff) | |
parent | e1c66f2c0f5d5473df2727cf1a4a000b773ee11a (diff) | |
download | couchdb-87edbaecd2cdb7f9266364e5fec65e31fe1fc251.tar.gz |
Merge pull request #2324 from apache/2167-no-view-changes
Remove vestiges of view-based `_changes` feed from codebase
19 files changed, 206 insertions, 1490 deletions
diff --git a/src/couch/src/couch.app.src b/src/couch/src/couch.app.src index 2b642c085..12ec29e12 100644 --- a/src/couch/src/couch.app.src +++ b/src/couch/src/couch.app.src @@ -79,8 +79,7 @@ {"_rewrite", "{couch_httpd_rewrite, handle_rewrite_req}"}, {"_show", "{couch_mrview_show, handle_doc_show_req}"}, {"_update", "{couch_mrview_show, handle_doc_update_req}"}, - {"_view", "{couch_mrview_http, handle_view_req}"}, - {"_view_changes", "{couch_mrview_http, handle_view_changes_req}"} + {"_view", "{couch_mrview_http, handle_view_req}"} ]} ]} ]}. diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index c5b5edf9f..6e9294a56 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -16,7 +16,6 @@ -export([ handle_db_changes/3, - handle_changes/4, get_changes_timeout/2, wait_updated/3, get_rest_updated/1, @@ -24,7 +23,6 @@ filter/3, handle_db_event/3, handle_view_event/3, - view_filter/3, send_changes_doc_ids/6, send_changes_design_docs/6 ]). @@ -38,9 +36,6 @@ -record(changes_acc, { db, - view_name, - ddoc_name, - view, seq, prepend, filter, @@ -57,53 +52,21 @@ aggregation_results }). -handle_db_changes(Args, Req, Db) -> - handle_changes(Args, Req, Db, db). - -handle_changes(Args1, Req, Db0, Type) -> +handle_db_changes(Args0, Req, Db0) -> #changes_args{ style = Style, filter = FilterName, feed = Feed, dir = Dir, since = Since - } = Args1, + } = Args0, Filter = configure_filter(FilterName, Style, Req, Db0), - Args = Args1#changes_args{filter_fun = Filter}, - % The type of changes feed depends on the supplied filter. If the query is - % for an optimized view-filtered db changes, we need to use the view - % sequence tree. - {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of - {{view, DDocName0, ViewName0}, _} -> - {true, DDocName0, ViewName0}; - {_, {fast_view, _, DDoc, ViewName0}} -> - {true, DDoc#doc.id, ViewName0}; - _ -> - {false, undefined, undefined} - end, + Args = Args0#changes_args{filter_fun = Filter}, DbName = couch_db:name(Db0), - {StartListenerFun, View} = if UseViewChanges -> - {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( - DbName, DDocName, ViewName, #mrargs{}), - case View0#mrview.seq_btree of - #btree{} -> - ok; - _ -> - throw({bad_request, "view changes not enabled"}) - end, - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}] - ) - end, - {SNFun, View0}; - true -> - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, DbName}] - ) - end, - {SNFun, undefined} + StartListenerFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, DbName}] + ) end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), @@ -113,14 +76,7 @@ handle_changes(Args1, Req, Db0, Type) -> fwd -> Since end, - View2 = if UseViewChanges -> - {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( - DbName, DDocName, ViewName, #mrargs{}), - View1; - true -> - undefined - end, - {Db, View2, StartSeq} + {Db, StartSeq} end, % begin timer to deal with heartbeat when filter function fails case Args#changes_args.heartbeat of @@ -136,12 +92,11 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), {ok, Listener} = StartListenerFun(), - {Db, View, StartSeq} = Start(), + {Db, StartSeq} = Start(), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, - <<"">>, Timeout, TimeoutFun, DDocName, ViewName, - View), + <<"">>, Timeout, TimeoutFun), try keep_sending_changes( Args#changes_args{dir=fwd}, @@ -157,10 +112,10 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), - {Db, View, StartSeq} = Start(), + {Db, StartSeq} = Start(), Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, - UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun, - DDocName, ViewName, View), + UserAcc2, Db, StartSeq, <<>>, + Timeout, TimeoutFun), {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = send_changes( Acc0, @@ -214,21 +169,12 @@ configure_filter("_view", Style, Req, Db) -> [DName, VName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"views">>, VName]), - FilterType = try - true = couch_util:get_nested_json_value( - DDoc#doc.body, - [<<"options">>, <<"seq_indexed">>] - ), - fast_view - catch _:_ -> - view - end, case couch_db:is_clustered(Db) of true -> DIR = fabric_util:doc_id_and_rev(DDoc), - {fetch, FilterType, Style, DIR, VName}; + {fetch, view, Style, DIR, VName}; false -> - {FilterType, Style, DDoc, VName} + {view, Style, DDoc, VName} end; [] -> Msg = "`view` must be of the form `designname/viewname`", @@ -285,8 +231,7 @@ filter(_Db, DocInfo, {design_docs, Style}) -> _ -> [] end; -filter(Db, DocInfo, {FilterType, Style, DDoc, VName}) - when FilterType == view; FilterType == fast_view -> +filter(Db, DocInfo, {view, Style, DDoc, VName}) -> Docs = open_revs(Db, DocInfo, Style), {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), filter_revs(Passes, Docs); @@ -299,35 +244,6 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). -fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) -> - case couch_db:get_doc_info(Db, ID) of - {ok, #doc_info{high_seq=Seq}=DocInfo} -> - Docs = open_revs(Db, DocInfo, Style), - Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) -> - RevStr = couch_doc:rev_to_str({RevPos, RevId}), - {[{<<"rev">>, RevStr}]} - end, Docs), - {DocInfo, Changes}; - {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq -> - % If the view seq tree is out of date (or if the view seq tree - % was opened before the db) seqs may come by from the seq tree - % which correspond to the not-most-current revision of a document. - % The proper thing to do is to not send this old revision, but wait - % until we reopen the up-to-date view seq tree and continue the - % fold. - % I left the Seq > HighSeq guard in so if (for some godforsaken - % reason) the seq in the view is more current than the database, - % we'll throw an error. - {undefined, []}; - {error, not_found} -> - {undefined, []} - end. - - - -view_filter(Db, KV, {default, Style}) -> - apply_view_style(Db, KV, Style). - get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), @@ -425,16 +341,6 @@ apply_style(#doc_info{revs=Revs}, main_only) -> apply_style(#doc_info{revs=Revs}, all_docs) -> [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. -apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; -apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) -> - case couch_db:get_doc_info(Db, ID) of - {ok, DocInfo} -> - apply_style(DocInfo, all_docs); - {error, not_found} -> - [] - end. - open_revs(Db, DocInfo, Style) -> DocInfos = case Style of @@ -493,7 +399,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType) start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). -build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) -> +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ include_docs = IncludeDocs, doc_options = DocOpts, @@ -516,9 +422,6 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D conflicts = Conflicts, timeout = Timeout, timeout_fun = TimeoutFun, - ddoc_name = DDocName, - view_name = ViewName, - view = View, aggregation_results=[], aggregation_kvs=[] }. @@ -527,41 +430,15 @@ send_changes(Acc, Dir, FirstRound) -> #changes_acc{ db = Db, seq = StartSeq, - filter = Filter, - view = View - } = Acc, + filter = Filter + } = maybe_upgrade_changes_acc(Acc), DbEnumFun = fun changes_enumerator/2, case can_optimize(FirstRound, Filter) of {true, Fun} -> Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter); _ -> - case {View, Filter} of - {#mrview{}, {fast_view, _, _, _}} -> - couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); - {undefined, _} -> - Opts = [{dir, Dir}], - couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); - {#mrview{}, _} -> - ViewEnumFun = fun view_changes_enumerator/2, - {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), - case Acc0 of - #changes_acc{aggregation_results=[]} -> - {Go, Acc0}; - _ -> - #changes_acc{ - aggregation_results = AggResults, - aggregation_kvs = AggKVs, - user_acc = UserAcc, - callback = Callback, - resp_type = ResponseType, - prepend = Prepend - } = Acc0, - ChangesRow = view_changes_row(AggResults, AggKVs, Acc0), - UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc0#changes_acc{user_acc=UserAcc0}} - end - end + Opts = [{dir, Dir}], + couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts) end. @@ -653,9 +530,8 @@ keep_sending_changes(Args, Acc0, FirstRound) -> #changes_acc{ db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, - prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit, - ddoc_name = DDocName, view_name = ViewName - } = ChangesAcc, + prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit + } = maybe_upgrade_changes_acc(ChangesAcc), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> @@ -670,7 +546,6 @@ keep_sending_changes(Args, Acc0, FirstRound) -> Args#changes_args{limit=NewLimit}, ChangesAcc#changes_acc{ db = Db2, - view = maybe_refresh_view(Db2, DDocName, ViewName), user_acc = UserAcc4, seq = EndSeq, prepend = Prepend2, @@ -685,104 +560,22 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end end. -maybe_refresh_view(_, undefined, undefined) -> - undefined; -maybe_refresh_view(Db, DDocName, ViewName) -> - DbName = couch_db:name(Db), - {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), - View. - end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -view_changes_enumerator(Value, Acc) -> - #changes_acc{ - filter = Filter, callback = Callback, prepend = Prepend, - user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq, - aggregation_kvs=AggKVs, aggregation_results=AggResults - } = Acc, - - Results0 = view_filter(Db, Value, Filter), - Results = [Result || Result <- Results0, Result /= null], - {{Seq, _}, _} = Value, - - Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, - - if CurrentSeq =:= Seq -> - NewAggKVs = case Results of - [] -> AggKVs; - _ -> [Value|AggKVs] - end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - Acc0 = Acc#changes_acc{ - seq = Seq, - user_acc = UserAcc2, - aggregation_kvs=NewAggKVs - }, - case Done of - stop -> {stop, Acc0}; - ok -> {Go, Acc0} - end; - AggResults =/= [] -> - {NewAggKVs, NewAggResults} = case Results of - [] -> {[], []}; - _ -> {[Value], Results} - end, - if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> - ChangesRow = view_changes_row(AggResults, AggKVs, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, user_acc = UserAcc2, limit = Limit - 1, - aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}}; - true -> - ChangesRow = view_changes_row(AggResults, AggKVs, Acc), - UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2, - limit = Limit - 1, aggregation_kvs=[Value], - aggregation_results=Results}} - end; - true -> - {NewAggKVs, NewAggResults} = case Results of - [] -> {[], []}; - _ -> {[Value], Results} - end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - Acc0 = Acc#changes_acc{ - seq = Seq, - user_acc = UserAcc2, - aggregation_kvs=NewAggKVs, - aggregation_results=NewAggResults - }, - case Done of - stop -> {stop, Acc0}; - ok -> {Go, Acc0} - end - end. - -changes_enumerator(Value0, Acc) -> +changes_enumerator(Value, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, timeout = Timeout, timeout_fun = TimeoutFun - } = Acc, - {Value, Results0} = case Filter of - {fast_view, _, _, _} -> - fast_view_filter(Db, Value0, Filter); - _ -> - {Value0, filter(Db, Value0, Filter)} - end, + } = maybe_upgrade_changes_acc(Acc), + Results0 = filter(Db, Value, Filter), Results = [Result || Result <- Results0, Result /= null], Seq = case Value of #full_doc_info{} -> Value#full_doc_info.update_seq; #doc_info{} -> - Value#doc_info.high_seq; - {{Seq0, _}, _} -> - Seq0 + Value#doc_info.high_seq end, Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of @@ -812,35 +605,10 @@ changes_enumerator(Value0, Acc) -> -view_changes_row(Results, KVs, Acc) -> - {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) -> - {{_Seq, Key}, {_Id, Value, _Rev}} = Row, - case Value of - removed -> - {AddAcc, [Key|RemAcc]}; - {dups, DupValues} -> - AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) -> - [[Key, DupValue]|AddAcc0] - end, AddAcc, DupValues), - {AddAcc1, RemAcc}; - _ -> - {[[Key, Value]|AddAcc], RemAcc} - end - end, {[], []}, KVs), - - % Seq, Id, and Rev should be the same for all KVs, since we're aggregating - % by seq. - [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs, - - {[ - {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add}, - {<<"remove">>, Remove}, {<<"changes">>, Results} - ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. - - changes_row(Results, #full_doc_info{} = FDI, Acc) -> changes_row(Results, couch_doc:to_doc_info(FDI), Acc); -changes_row(Results, DocInfo, Acc) -> +changes_row(Results, DocInfo, Acc0) -> + Acc = maybe_upgrade_changes_acc(Acc0), #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, @@ -932,3 +700,25 @@ maybe_heartbeat(Timeout, TimeoutFun, Acc) -> {ok, Acc} end end. + + +maybe_upgrade_changes_acc(#changes_acc{} = Acc) -> + Acc; +maybe_upgrade_changes_acc(Acc) when tuple_size(Acc) == 19 -> + #changes_acc{ + db = element(2, Acc), + seq = element(6, Acc), + prepend = element(7, Acc), + filter = element(8, Acc), + callback = element(9, Acc), + user_acc = element(10, Acc), + resp_type = element(11, Acc), + limit = element(12, Acc), + include_docs = element(13, Acc), + doc_options = element(14, Acc), + conflicts = element(15, Acc), + timeout = element(16, Acc), + timeout_fun = element(17, Acc), + aggregation_kvs = element(18, Acc), + aggregation_results = element(19, Acc) + }. diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index 7c0f9679b..dad363695 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -257,7 +257,7 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> ok = scan_local_db(Server, DbSuffix), {ok, Db} = mem3_util:ensure_exists( config:get("mem3", "shards_db", "_dbs")), - ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), + ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db), ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). @@ -383,7 +383,7 @@ setup() -> meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"), meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}), ChangesFun = meck:val(fun(_) -> ok end), - meck:expect(couch_changes, handle_changes, 4, ChangesFun), + meck:expect(couch_changes, handle_db_changes, 3, ChangesFun), meck:expect(couch_db, open_int, fun(?DBNAME, [?CTX, sys_db]) -> {ok, db}; (_, _) -> {not_found, no_db_file} diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl index 10cd56cee..848b471f9 100644 --- a/src/couch/test/eunit/couch_changes_tests.erl +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -154,7 +154,6 @@ filter_by_view() -> fun setup/0, fun teardown/1, [ fun should_filter_by_view/1, - fun should_filter_by_fast_view/1, fun should_filter_by_erlang_view/1 ] } @@ -698,43 +697,6 @@ should_filter_by_view({DbName, _}) -> ?assertEqual(UpSeq, LastSeq) end). -should_filter_by_fast_view({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"options">>, {[{<<"seq_indexed">>, true}]}}, - {<<"views">>, {[ - {<<"valid">>, {[ - {<<"map">>, <<"function(doc) {" - " if (doc._id == 'doc3') {" - " emit(doc); " - "} }">>} - ]}} - ]}} - ]}), - ChArgs = #changes_args{filter = "_view"}, - Req = {json_req, {[{ - <<"query">>, {[ - {<<"view">>, <<"app/valid">>} - ]} - }]}}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, ViewInfo} = couch_mrview:get_view_info(Db, DDoc, <<"valid">>), - {update_seq, ViewUpSeq} = lists:keyfind(update_seq, 1, ViewInfo), - couch_db:close(Db), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(LastSeq, Seq), - ?assertEqual(UpSeq, ViewUpSeq) - end). - should_filter_by_erlang_view({DbName, _}) -> ?_test( begin diff --git a/src/couch/test/eunit/couchdb_views_tests.erl b/src/couch/test/eunit/couchdb_views_tests.erl index d1aba8fd5..06e2f03eb 100644 --- a/src/couch/test/eunit/couchdb_views_tests.erl +++ b/src/couch/test/eunit/couchdb_views_tests.erl @@ -42,13 +42,14 @@ setup_legacy() -> DbName = <<"test">>, DbFileName = "test.couch", OldDbFilePath = filename:join([?FIXTURESDIR, DbFileName]), - OldViewName = "3b835456c235b1827e012e25666152f3.view", + OldViewName = "6cf2c2f766f87b618edf6630b00f8736.view", FixtureViewFilePath = filename:join([?FIXTURESDIR, OldViewName]), - NewViewName = "6cf2c2f766f87b618edf6630b00f8736.view", + NewViewName = "a1c5929f912aca32f13446122cc6ce50.view", DbDir = config:get("couchdb", "database_dir"), ViewDir = config:get("couchdb", "view_index_dir"), - OldViewFilePath = filename:join([ViewDir, ".test_design", OldViewName]), + OldViewFilePath = filename:join([ViewDir, ".test_design", "mrview", + OldViewName]), NewViewFilePath = filename:join([ViewDir, ".test_design", "mrview", NewViewName]), @@ -192,28 +193,31 @@ should_upgrade_legacy_view_files({DbName, Files}) -> % ensure old header OldHeader = read_header(OldViewFilePath), - ?assertMatch(#index_header{}, OldHeader), + ?assertEqual(6, tuple_size(OldHeader)), + ?assertMatch(mrheader, element(1, OldHeader)), % query view for expected results Rows0 = query_view(DbName, "test", "test"), - ?assertEqual(2, length(Rows0)), + ?assertEqual(3, length(Rows0)), % ensure old file gone ?assertNot(filelib:is_regular(OldViewFilePath)), % add doc to trigger update - DocUrl = db_url(DbName) ++ "/boo", + DocUrl = db_url(DbName) ++ "/bar", {ok, _, _, _} = test_request:put( - DocUrl, [{"Content-Type", "application/json"}], <<"{\"a\":3}">>), + DocUrl, [{"Content-Type", "application/json"}], <<"{\"a\":4}">>), % query view for expected results Rows1 = query_view(DbName, "test", "test"), - ?assertEqual(3, length(Rows1)), + ?assertEqual(4, length(Rows1)), % ensure new header timer:sleep(2000), % have to wait for awhile to upgrade the index NewHeader = read_header(NewViewFilePath), - ?assertMatch(#mrheader{}, NewHeader) + ?assertMatch(#mrheader{}, NewHeader), + NewViewStatus = hd(NewHeader#mrheader.view_states), + ?assertEqual(3, tuple_size(NewViewStatus)) end). diff --git a/src/couch/test/eunit/fixtures/3b835456c235b1827e012e25666152f3.view b/src/couch/test/eunit/fixtures/6cf2c2f766f87b618edf6630b00f8736.view Binary files differindex 9c67648be..a5668eeaa 100644 --- a/src/couch/test/eunit/fixtures/3b835456c235b1827e012e25666152f3.view +++ b/src/couch/test/eunit/fixtures/6cf2c2f766f87b618edf6630b00f8736.view diff --git a/src/couch/test/eunit/fixtures/test.couch b/src/couch/test/eunit/fixtures/test.couch Binary files differindex 32c79af32..5347a222f 100644 --- a/src/couch/test/eunit/fixtures/test.couch +++ b/src/couch/test/eunit/fixtures/test.couch diff --git a/src/couch_mrview/include/couch_mrview.hrl b/src/couch_mrview/include/couch_mrview.hrl index 29fe52bdb..bb0ab0b46 100644 --- a/src/couch_mrview/include/couch_mrview.hrl +++ b/src/couch_mrview/include/couch_mrview.hrl @@ -18,13 +18,10 @@ idx_name, language, design_opts=[], - seq_indexed=false, - keyseq_indexed=false, partitioned=false, lib, views, id_btree=nil, - log_btree=nil, update_seq=0, purge_seq=0, first_build, @@ -44,10 +41,6 @@ reduce_funs=[], def, btree=nil, - seq_btree=nil, - key_byseq_btree=nil, - seq_indexed=false, - keyseq_indexed=false, options=[] }). @@ -56,7 +49,6 @@ seq=0, purge_seq=0, id_btree_state=nil, - log_btree_state=nil, view_states=nil }). diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index d549568ec..1cdc91809 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -15,9 +15,6 @@ -export([validate/2]). -export([query_all_docs/2, query_all_docs/4]). -export([query_view/3, query_view/4, query_view/6, get_view_index_pid/4]). --export([view_changes_since/5]). --export([view_changes_since/6, view_changes_since/7]). --export([count_view_changes_since/4, count_view_changes_since/5]). -export([get_info/2]). -export([trigger_update/2, trigger_update/3]). -export([get_view_info/3]). @@ -288,68 +285,6 @@ query_view(Db, {Type, View, Ref}, Args, Callback, Acc) -> erlang:demonitor(Ref, [flush]) end. -view_changes_since(View, StartSeq, Fun, Opts0, Acc) -> - Wrapper = fun(KV, _, Acc1) -> - Fun(KV, Acc1) - end, - Opts = [{start_key, {StartSeq + 1, <<>>}}] ++ Opts0, - {ok, _LastRed, AccOut} = couch_btree:fold(View#mrview.seq_btree, Wrapper, Acc, Opts), - {ok, AccOut}. - -view_changes_since(Db, DDoc, VName, StartSeq, Fun, Acc) -> - view_changes_since(Db, DDoc, VName, StartSeq, Fun, [], Acc). - -view_changes_since(Db, DDoc, VName, StartSeq, Fun, Options, Acc) -> - Args0 = make_view_changes_args(Options), - {ok, {_, View, _}, _, Args} = couch_mrview_util:get_view(Db, DDoc, VName, - Args0), - #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, - IsKSQuery = is_key_byseq(Options), - if (SIndexed andalso not IsKSQuery) orelse (KSIndexed andalso IsKSQuery) -> - OptList = make_view_changes_opts(StartSeq, Options, Args), - Btree = case IsKSQuery of - true -> View#mrview.key_byseq_btree; - _ -> View#mrview.seq_btree - end, - AccOut = lists:foldl(fun(Opts, Acc0) -> - {ok, _R, A} = couch_mrview_util:fold_changes( - Btree, Fun, Acc0, Opts), - A - end, Acc, OptList), - {ok, AccOut}; - true -> - {error, seqs_not_indexed} - end. - -count_view_changes_since(Db, DDoc, VName, SinceSeq) -> - count_view_changes_since(Db, DDoc, VName, SinceSeq, []). - -count_view_changes_since(Db, DDoc, VName, SinceSeq, Options) -> - Args0 = make_view_changes_args(Options), - {ok, {_Type, View, _Ref}, _, Args} = couch_mrview_util:get_view( - Db, DDoc, VName, Args0), - case View#mrview.seq_indexed of - true -> - OptList = make_view_changes_opts(SinceSeq, Options, Args), - Btree = case is_key_byseq(Options) of - true -> View#mrview.key_byseq_btree; - _ -> View#mrview.seq_btree - end, - RedFun = fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(Btree, PartialReds)} - end, - lists:foldl(fun(Opts, Acc0) -> - case couch_btree:fold_reduce(Btree, RedFun, 0, Opts) of - {ok, N} when is_integer(N) -> - Acc0 + N; - {ok, N} when is_tuple(N) -> - Acc0 + element(1, N) - end - end, 0, OptList); - _ -> - {error, seqs_not_indexed} - end. - get_info(Db, DDoc) -> {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), @@ -371,19 +306,9 @@ get_view_info(Db, DDoc, VName) -> %% get the total number of rows {ok, TotalRows} = couch_mrview_util:get_row_count(View), - %% get the total number of sequence logged in this view - SeqBtree = View#mrview.seq_btree, - {ok, TotalSeqs} = case SeqBtree of - nil -> {ok, 0}; - _ -> - couch_btree:full_reduce(SeqBtree) - end, - - {ok, [{seq_indexed, View#mrview.seq_indexed}, - {update_seq, View#mrview.update_seq}, + {ok, [{update_seq, View#mrview.update_seq}, {purge_seq, View#mrview.purge_seq}, - {total_rows, TotalRows}, - {total_seqs, TotalSeqs}]}. + {total_rows, TotalRows}]}. %% @doc refresh a view index @@ -774,26 +699,3 @@ lookup_index(Key) -> record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs)) ), couch_util:get_value(Key, Index). - - -is_key_byseq(Options) -> - lists:any(fun({K, _}) -> - lists:member(K, [start_key, end_key, start_key_docid, - end_key_docid, keys]) - end, Options). - -make_view_changes_args(Options) -> - case is_key_byseq(Options) of - true -> - to_mrargs(Options); - false -> - #mrargs{} - end. - -make_view_changes_opts(StartSeq, Options, Args) -> - case is_key_byseq(Options) of - true -> - couch_mrview_util:changes_key_opts(StartSeq, Args); - false -> - [[{start_key, {StartSeq+1, <<>>}}] ++ Options] - end. diff --git a/src/couch_mrview/src/couch_mrview_changes.erl b/src/couch_mrview/src/couch_mrview_changes.erl deleted file mode 100644 index ae5aa6e94..000000000 --- a/src/couch_mrview/src/couch_mrview_changes.erl +++ /dev/null @@ -1,18 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. -% --module(couch_mrview_changes). - --export([handle_view_changes/5]). - -handle_view_changes(Args, Req, Db, DDocId, ViewName) -> - couch_changes:handle_changes(Args, Req, Db, {view, DDocId, ViewName}). diff --git a/src/couch_mrview/src/couch_mrview_compactor.erl b/src/couch_mrview/src/couch_mrview_compactor.erl index 9a069cec0..17d67f192 100644 --- a/src/couch_mrview/src/couch_mrview_compactor.erl +++ b/src/couch_mrview/src/couch_mrview_compactor.erl @@ -41,9 +41,6 @@ compact(State) -> sig=Sig, update_seq=Seq, id_btree=IdBtree, - log_btree=LogBtree, - seq_indexed=SeqIndexed, - keyseq_indexed=KeySeqIndexed, views=Views } = State, erlang:put(io_priority, {view_compact, DbName, IdxName}), @@ -60,27 +57,15 @@ compact(State) -> #mrst{ id_btree = EmptyIdBtree, - log_btree = EmptyLogBtree, views = EmptyViews } = EmptyState, - TotalChanges0 = case SeqIndexed orelse KeySeqIndexed of - true -> NumDocIds * 2; - _ -> NumDocIds - end, - TotalChanges = lists:foldl( fun(View, Acc) -> {ok, Kvs} = couch_mrview_util:get_row_count(View), - case SeqIndexed orelse KeySeqIndexed of - true -> - {ok, SKvs} = couch_mrview_util:get_view_changes_count(View), - Acc + Kvs + SKvs; - false -> - Acc + Kvs - end + Acc + Kvs end, - TotalChanges0, Views), + NumDocIds, Views), couch_task_status:add_task([ {type, view_compaction}, @@ -126,24 +111,13 @@ compact(State) -> FinalAcc2 = update_task(FinalAcc, length(Uncopied)), - {NewLogBtree, FinalAcc3} = case SeqIndexed of - true -> - compact_log(LogBtree, BufferSize, - FinalAcc2#acc{kvs=[], - kvs_size=0, - btree=EmptyLogBtree}); - _ -> - {nil, FinalAcc2} - end, - {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> compact_view(View, EmptyView, BufferSize, Acc) - end, FinalAcc3, lists:zip(Views, EmptyViews)), + end, FinalAcc2, lists:zip(Views, EmptyViews)), unlink(EmptyState#mrst.fd), {ok, EmptyState#mrst{ id_btree=NewIdBtree, - log_btree=NewLogBtree, views=NewViews, update_seq=Seq }}. @@ -186,58 +160,16 @@ recompact_retry_count() -> ). -compact_log(LogBtree, BufferSize, Acc0) -> - FoldFun = fun(KV, Acc) -> - #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc, - KvsSize2 = KvsSize + ?term_size(KV), - case KvsSize2 >= BufferSize of - true -> - {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), - Acc2 = update_task(Acc, 1 + length(Kvs)), - {ok, Acc2#acc{ - btree = Bt2, kvs = [], kvs_size = 0}}; - _ -> - {ok, Acc#acc{ - kvs = [KV | Kvs], kvs_size = KvsSize2}} - end - end, - - {ok, _, FinalAcc} = couch_btree:foldl(LogBtree, FoldFun, Acc0), - #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, - {ok, NewLogBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), - FinalAcc2 = update_task(FinalAcc, length(Uncopied)), - {NewLogBtree, FinalAcc2}. - %% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc} compact_view(#mrview{id_num=VID}=View, EmptyView, BufferSize, Acc0) -> - {NewBt, Acc1} = compact_view_btree(View#mrview.btree, + {NewBt, FinalAcc} = compact_view_btree(View#mrview.btree, EmptyView#mrview.btree, VID, BufferSize, Acc0), - %% are we indexing changes by sequences? - {NewSeqBt, Acc2} = case View#mrview.seq_indexed of - true -> - compact_view_btree(View#mrview.seq_btree, - EmptyView#mrview.seq_btree, - VID, BufferSize, Acc1); - _ -> - {nil, Acc1} - end, - {NewKeyBySeqBt, FinalAcc} = case View#mrview.keyseq_indexed of - true -> - compact_view_btree(View#mrview.key_byseq_btree, - EmptyView#mrview.key_byseq_btree, - VID, BufferSize, Acc2); - _ -> - {nil, Acc2} - end, - {EmptyView#mrview{btree=NewBt, - seq_btree=NewSeqBt, update_seq=View#mrview.update_seq, - purge_seq=View#mrview.purge_seq, - key_byseq_btree=NewKeyBySeqBt}, FinalAcc}. + purge_seq=View#mrview.purge_seq}, FinalAcc}. compact_view_btree(Btree, EmptyBtree, VID, BufferSize, Acc0) -> Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) -> @@ -302,7 +234,7 @@ swap_compacted(OldState, NewState) -> unlink(OldState#mrst.fd), erlang:demonitor(OldState#mrst.fd_monitor, [flush]), - + {ok, NewState#mrst{fd_monitor=Ref}}. diff --git a/src/couch_mrview/src/couch_mrview_http.erl b/src/couch_mrview/src/couch_mrview_http.erl index cdf498e5d..74d5ca274 100644 --- a/src/couch_mrview/src/couch_mrview_http.erl +++ b/src/couch_mrview/src/couch_mrview_http.erl @@ -16,7 +16,6 @@ handle_all_docs_req/2, handle_local_docs_req/2, handle_design_docs_req/2, - handle_view_changes_req/3, handle_reindex_req/3, handle_view_req/3, handle_temp_view_req/2, @@ -81,25 +80,6 @@ handle_reindex_req(Req, _Db, _DDoc) -> chttpd:send_method_not_allowed(Req, "POST"). -handle_view_changes_req(#httpd{path_parts=[_,<<"_design">>,DDocName,<<"_view_changes">>,ViewName]}=Req, Db, DDoc) -> - {DDocBody} = DDoc#doc.body, - case lists:keyfind(<<"options">>, 1, DDocBody) of - {<<"options">>, {Options}} when is_list(Options) -> - case lists:keyfind(<<"seq_indexed">>, 1, Options) of - {<<"seq_indexed">>, true} -> - ok; - _ -> - throw({bad_request, "view changes not enabled"}) - end; - _ -> - throw({bad_request, "view changes not enabled"}) - end, - - ChangesArgs = couch_httpd_db:parse_changes_query(Req, Db), - ChangesFun = couch_mrview_changes:handle_view_changes(ChangesArgs, Req, Db, <<"_design/", DDocName/binary>>, ViewName), - couch_httpd_db:handle_changes_req(Req, Db, ChangesArgs, ChangesFun). - - handle_view_req(#httpd{method='GET', path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req, Db, _DDoc) -> diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 91703bd4d..8542cc63f 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -38,13 +38,9 @@ get(purge_seq, #mrst{purge_seq = PurgeSeq}) -> get(update_options, #mrst{design_opts = Opts}) -> IncDesign = couch_util:get_value(<<"include_design">>, Opts, false), LocalSeq = couch_util:get_value(<<"local_seq">>, Opts, false), - SeqIndexed = couch_util:get_value(<<"seq_indexed">>, Opts, false), - KeySeqIndexed = couch_util:get_value(<<"keyseq_indexed">>, Opts, false), Partitioned = couch_util:get_value(<<"partitioned">>, Opts, false), if IncDesign -> [include_design]; true -> [] end ++ if LocalSeq -> [local_seq]; true -> [] end - ++ if KeySeqIndexed -> [keyseq_indexed]; true -> [] end - ++ if SeqIndexed -> [seq_indexed]; true -> [] end ++ if Partitioned -> [partitioned]; true -> [] end; get(fd, #mrst{fd = Fd}) -> Fd; @@ -57,7 +53,6 @@ get(info, State) -> fd = Fd, sig = Sig, id_btree = IdBtree, - log_btree = LogBtree, language = Lang, update_seq = UpdateSeq, purge_seq = PurgeSeq, @@ -66,13 +61,7 @@ get(info, State) -> {ok, FileSize} = couch_file:bytes(Fd), {ok, ExternalSize} = couch_mrview_util:calculate_external_size(Views), {ok, ActiveViewSize} = couch_mrview_util:calculate_active_size(Views), - LogBtSize = case LogBtree of - nil -> - 0; - _ -> - couch_btree:size(LogBtree) - end, - ActiveSize = couch_btree:size(IdBtree) + LogBtSize + ActiveViewSize, + ActiveSize = couch_btree:size(IdBtree) + ActiveViewSize, UpdateOptions0 = get(update_options, State), UpdateOptions = [atom_to_binary(O, latin1) || O <- UpdateOptions0], @@ -105,15 +94,19 @@ open(Db, State0) -> } = State = set_partitioned(Db, State0), IndexFName = couch_mrview_util:index_file(DbName, Sig), - % If we are upgrading from <=1.2.x, we upgrade the view + % If we are upgrading from <= 2.x, we upgrade the view % index file on the fly, avoiding an index reset. + % We are making commit with a new state + % right after the upgrade to ensure + % that we have a proper sig in the header + % when open the view next time % % OldSig is `ok` if no upgrade happened. % - % To remove suppport for 1.2.x auto-upgrades in the + % To remove support for 2.x auto-upgrades in the % future, just remove the next line and the code - % between "upgrade code for <= 1.2.x" and - % "end upgrade code for <= 1.2.x" and the corresponding + % between "upgrade code for <= 2.x" and + % "end of upgrade code for <= 2.x" and the corresponding % code in couch_mrview_util OldSig = couch_mrview_util:maybe_update_index_file(State), @@ -121,13 +114,14 @@ open(Db, State0) -> case couch_mrview_util:open_file(IndexFName) of {ok, Fd} -> case couch_file:read_header(Fd) of - % upgrade code for <= 1.2.x + % upgrade code for <= 2.x {ok, {OldSig, Header}} -> % Matching view signatures. NewSt = couch_mrview_util:init_state(Db, Fd, State, Header), + ok = commit(NewSt), ensure_local_purge_doc(Db, NewSt), {ok, NewSt}; - % end of upgrade code for <= 1.2.x + % end of upgrade code for <= 2.x {ok, {Sig, Header}} -> % Matching view signatures. NewSt = couch_mrview_util:init_state(Db, Fd, State, Header), diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl index a6242cde7..2dfa64e61 100644 --- a/src/couch_mrview/src/couch_mrview_test_util.erl +++ b/src/couch_mrview/src/couch_mrview_test_util.erl @@ -59,30 +59,6 @@ make_docs(_, Since, Count) -> [doc(I) || I <- lists:seq(Since, Count)]. -ddoc({changes, Opts}) -> - ViewOpts = case Opts of - seq_indexed -> - [{<<"seq_indexed">>, true}]; - keyseq_indexed -> - [{<<"keyseq_indexed">>, true}]; - seq_indexed_keyseq_indexed -> - [ - {<<"seq_indexed">>, true}, - {<<"keyseq_indexed">>, true} - ] - end, - couch_doc:from_json_obj({[ - {<<"_id">>, <<"_design/bar">>}, - {<<"options">>, {ViewOpts}}, - {<<"views">>, {[ - {<<"baz">>, {[ - { - <<"map">>, - <<"function(doc) {emit(doc.val.toString(), doc.val);}">> - } - ]}} - ]}} - ]}); ddoc(map) -> couch_doc:from_json_obj({[ {<<"_id">>, <<"_design/bar">>}, diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 09b0d27ac..522367c1d 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -68,21 +68,12 @@ start_update(Partial, State, NumChanges, NumChangesDone) -> purge(_Db, PurgeSeq, PurgedIdRevs, State) -> #mrst{ id_btree=IdBtree, - log_btree=LogBtree, views=Views, partitioned=Partitioned } = State, Ids = [Id || {Id, _Revs} <- PurgedIdRevs], - {ok, Lookups, LLookups, LogBtree2, IdBtree2} = case LogBtree of - nil -> - {ok, L, Bt} = couch_btree:query_modify(IdBtree, Ids, [], Ids), - {ok, L, [], nil, Bt}; - _ -> - {ok, L, Bt} = couch_btree:query_modify(IdBtree, Ids, [], Ids), - {ok, LL, LBt} = couch_btree:query_modify(LogBtree, Ids, [], Ids), - {ok, L, LL, LBt, Bt} - end, + {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), MakeDictFun = fun ({ok, {DocId, ViewNumRowKeys}}, DictAcc) -> @@ -101,48 +92,20 @@ purge(_Db, PurgeSeq, PurgedIdRevs, State) -> DictAcc end, KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups), - SeqsToRemove = lists:foldl(MakeDictFun, dict:new(), LLookups), RemKeysFun = fun(#mrview{id_num=ViewId}=View) -> - #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, ToRem = couch_util:dict_find(ViewId, KeysToRemove, []), {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, [], ToRem), NewPurgeSeq = case VBtree2 =/= View#mrview.btree of true -> PurgeSeq; _ -> View#mrview.purge_seq end, - {SeqBtree3, KeyBySeqBtree3} = if SIndexed orelse KSIndexed -> - SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []), - {ok, SeqBtree2} = if SIndexed -> - SKs = [{Seq, Key} || {Key, Seq, _} <- SToRem], - couch_btree:add_remove(View#mrview.seq_btree, - [], SKs); - true -> - {ok, nil} - end, - {ok, KeyBySeqBtree2} = if KSIndexed -> - KSs = [{[Seq, Key], DocId} || {Key, Seq, DocId} <- SToRem], - couch_btree:add_remove(View#mrview.key_byseq_btree, - [], KSs); - true -> - {ok, nil} - end, - {SeqBtree2, KeyBySeqBtree2}; - true -> - {nil, nil} - end, - - View#mrview{btree=VBtree2, - seq_btree=SeqBtree3, - key_byseq_btree=KeyBySeqBtree3, - purge_seq=NewPurgeSeq} - + View#mrview{btree=VBtree2, purge_seq=NewPurgeSeq} end, Views2 = lists:map(RemKeysFun, Views), {ok, State#mrst{ id_btree=IdBtree2, - log_btree=LogBtree2, views=Views2, purge_seq=PurgeSeq }}. @@ -152,18 +115,12 @@ process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 -> couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)), process_doc(Doc, Seq, State#mrst{doc_acc=[]}); process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) -> - {ok, State#mrst{doc_acc=[{nil, Seq, nil, nil} | Acc]}}; -process_doc(#doc{id=Id, deleted=true}=Doc, Seq, #mrst{doc_acc=Acc}=State) -> - Rev= extract_rev(Doc#doc.revs), - {ok, State#mrst{doc_acc=[{Id, Seq, Rev, deleted} | Acc]}}; + {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}}; +process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) -> + {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}}; process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) -> - Rev = extract_rev(Doc#doc.revs), - {ok, State#mrst{doc_acc=[{Id, Seq, Rev, Doc} | Acc]}}. + {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}. -extract_rev({0, []}) -> - {0, []}; -extract_rev({RevPos, [Rev | _]}) -> - {RevPos, Rev}. finish_update(#mrst{doc_acc=Acc}=State) -> if Acc /= [] -> @@ -199,14 +156,14 @@ map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) -> end, QServer = State1#mrst.qserver, DocFun = fun - ({nil, Seq, _, _}, {SeqAcc, Results}) -> + ({nil, Seq, _}, {SeqAcc, Results}) -> {erlang:max(Seq, SeqAcc), Results}; - ({Id, Seq, Rev, deleted}, {SeqAcc, Results}) -> - {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]}; - ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) -> + ({Id, Seq, deleted}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; + ({Id, Seq, Doc}, {SeqAcc, Results}) -> couch_stats:increment_counter([couchdb, mrview, map_doc]), {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), - {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]} + {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} end, FoldFun = fun(Docs, Acc) -> update_task(length(Docs)), @@ -222,8 +179,8 @@ write_results(Parent, #mrst{} = State) -> case accumulate_writes(State, State#mrst.write_queue, nil) of stop -> Parent ! {new_state, State}; - {Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} -> - NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log), + {Go, {Seq, ViewKVs, DocIdKeys}} -> + NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys), if Go == stop -> Parent ! {new_state, NewState}; true -> @@ -245,17 +202,17 @@ start_query_server(State) -> accumulate_writes(State, W, Acc0) -> - {Seq, ViewKVs, DocIdKVs, Seqs, Log} = case Acc0 of - nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new(), dict:new()}; + {Seq, ViewKVs, DocIdKVs} = case Acc0 of + nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []}; _ -> Acc0 end, case couch_work_queue:dequeue(W) of closed when Seq == 0 -> stop; closed -> - {stop, {Seq, ViewKVs, DocIdKVs, Seqs, Log}}; + {stop, {Seq, ViewKVs, DocIdKVs}}; {ok, Info} -> - {_, _, NewIds, _, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Seqs, Log), + {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs), case accumulate_more(length(NewIds), Acc) of true -> accumulate_writes(State, W, Acc); false -> {ok, Acc} @@ -272,77 +229,60 @@ accumulate_more(NumDocIds, Acc) -> andalso CurrMem < list_to_integer(MinSize). -merge_results([], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) -> - {SeqAcc, ViewKVs, DocIdKeys, Seqs, Log}; -merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) -> - Fun = fun(RawResults, {VKV, DIK, Seqs2, Log2}) -> - merge_results(RawResults, VKV, DIK, Seqs2, Log2) +merge_results([], SeqAcc, ViewKVs, DocIdKeys) -> + {SeqAcc, ViewKVs, DocIdKeys}; +merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) -> + Fun = fun(RawResults, {VKV, DIK}) -> + merge_results(RawResults, VKV, DIK) end, - {ViewKVs1, DocIdKeys1, Seqs1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Seqs, Log}, Results), - merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1, Seqs1, Log1). + {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results), + merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1). -merge_results({DocId, Seq, Rev, []}, ViewKVs, DocIdKeys, Seqs, Log) -> - {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)}; -merge_results({DocId, Seq, Rev, RawResults}, ViewKVs, DocIdKeys, Seqs, Log) -> +merge_results({DocId, []}, ViewKVs, DocIdKeys) -> + {ViewKVs, [{DocId, []} | DocIdKeys]}; +merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) -> JsonResults = couch_query_servers:raw_to_ejson(RawResults), Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults], case lists:flatten(Results) of [] -> - {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)}; + {ViewKVs, [{DocId, []} | DocIdKeys]}; _ -> - {ViewKVs1, ViewIdKeys, Log1} = insert_results(DocId, Seq, Rev, Results, ViewKVs, [], [], Log), - {ViewKVs1, [ViewIdKeys | DocIdKeys], dict:store(DocId, Seq, Seqs), Log1} + {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []), + {ViewKVs1, [ViewIdKeys | DocIdKeys]} end. -insert_results(DocId, _Seq, _Rev, [], [], ViewKVs, ViewIdKeys, Log) -> - {lists:reverse(ViewKVs), {DocId, ViewIdKeys}, Log}; -insert_results(DocId, Seq, Rev, [KVs | RKVs], [{Id, {VKVs, SKVs}} | RVKVs], VKVAcc, - VIdKeys, Log) -> +insert_results(DocId, [], [], ViewKVs, ViewIdKeys) -> + {lists:reverse(ViewKVs), {DocId, ViewIdKeys}}; +insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> CombineDupesFun = fun - ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys, Log2}) -> - {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys, Log2}; - ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys, Log2}) -> - {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys, Log2}; - ({Key, Value}, {Rest, IdKeys, Log2}) -> - {[{Key, Value} | Rest], [{Id, Key} | IdKeys], - dict:append({DocId, Rev}, {Id, {Key, Seq, add}}, Log2)} + ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) -> + {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys}; + ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) -> + {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys}; + ({Key, Value}, {Rest, IdKeys}) -> + {[{Key, Value} | Rest], [{Id, Key} | IdKeys]} end, - InitAcc = {[], VIdKeys, Log}, + InitAcc = {[], VIdKeys}, couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)), - {Duped, VIdKeys0, Log1} = lists:foldl(CombineDupesFun, InitAcc, + {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, lists:sort(KVs)), FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs, - FinalSKVs = [{{Seq, Key}, {DocId, Val, Rev}} || {Key, Val} <- Duped] ++ SKVs, - insert_results(DocId, Seq, Rev, RKVs, RVKVs, - [{Id, {FinalKVs, FinalSKVs}} | VKVAcc], VIdKeys0, Log1). + insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0). -write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> +write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> #mrst{ id_btree=IdBtree, - log_btree=LogBtree, first_build=FirstBuild, partitioned=Partitioned } = State, - Revs = dict:from_list(dict:fetch_keys(Log0)), - - Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) -> - dict:store(Id, DIKeys, Acc) - end, dict:new(), Log0), - {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), ToRemByView = collapse_rem_keys(ToRemove, dict:new()), - {ok, SeqsToAdd, SeqsToRemove, LogBtree2} = case LogBtree of - nil -> {ok, undefined, undefined, nil}; - _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild) - end, - - UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs0, SKVs}}) -> - #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View, + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs0}) -> ToRem0 = couch_util:dict_find(ViewId, ToRemByView, []), {KVs, ToRem} = case Partitioned of true -> @@ -358,36 +298,7 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> _ -> View#mrview.update_seq end, - %% store the view changes. - {SeqBtree3, KeyBySeqBtree3} = if SIndexed orelse KSIndexed -> - SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []), - SToAdd = couch_util:dict_find(ViewId, SeqsToAdd, []), - SKVs1 = SKVs ++ SToAdd, - - {ok, SeqBtree2} = if SIndexed -> - RemSKs = lists:sort([{Seq, Key} || {Key, Seq, _} <- SToRem]), - couch_btree:add_remove(View#mrview.seq_btree, - SKVs1, RemSKs); - true -> - {ok, nil} - end, - - {ok, KeyBySeqBtree2} = if KSIndexed -> - RemKSs = [{[Key, Seq], DocId} || {Key, Seq, DocId} <- SToRem], - couch_btree:add_remove(View#mrview.key_byseq_btree, - couch_mrview_util:to_key_seq(SKVs1), - RemKSs); - true -> - {ok, nil} - end, - {SeqBtree2, KeyBySeqBtree2}; - true -> - {nil, nil} - end, - View2 = View#mrview{btree=VBtree2, - seq_btree=SeqBtree3, - key_byseq_btree=KeyBySeqBtree3, - update_seq=NewUpdateSeq}, + View2 = View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}, maybe_notify(State, View2, KVs, ToRem), View2 end, @@ -395,8 +306,7 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) -> State#mrst{ views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs), update_seq=UpdateSeq, - id_btree=IdBtree2, - log_btree=LogBtree2 + id_btree=IdBtree2 }. @@ -423,64 +333,6 @@ update_id_btree(Btree, DocIdKeys, _) -> couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem). -update_log(Btree, Log, _Revs, _Seqs, true) -> - ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log), - DIKeys /= []], - {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []), - {ok, dict:new(), dict:new(), LogBtree2}; -update_log(Btree, Log, Revs, Seqs, _) -> - %% build list of updated keys and Id - {ToLook, Updated, Removed} = dict:fold( - fun(Id, [], {IdsAcc, KeysAcc, RemAcc}) -> - {[Id | IdsAcc], KeysAcc, RemAcc}; - (Id, DIKeys, {IdsAcc, KeysAcc, RemAcc}) -> - {KeysAcc1, RemAcc1} = lists:foldl(fun({ViewId, {Key, _Seq, Op}}, {KeysAcc2, RemAcc2}) -> - case Op of - add -> {[{Id, ViewId, Key}|KeysAcc2], RemAcc2}; - del -> {KeysAcc2, [{Id, ViewId, Key}|RemAcc2]} - end - end, {KeysAcc, RemAcc}, DIKeys), - {[Id | IdsAcc], KeysAcc1, RemAcc1} - end, {[], [], []}, Log), - - MapFun = fun({ok, KV}) -> [KV]; (not_found) -> [] end, - KVsToLook = lists:flatmap(MapFun, couch_btree:lookup(Btree, ToLook)), - - {Log1, AddAcc, DelAcc} = lists:foldl(fun({DocId, VIdKeys}, Acc) -> - lists:foldl(fun({ViewId, {Key, OldSeq, _Op}}, {Log4, AddAcc4, DelAcc4}) -> - - IsUpdated = lists:member({DocId, ViewId, Key}, Updated), - IsRemoved = lists:member({DocId, ViewId, Key}, Removed), - - case IsUpdated of - true -> - % the log is updated, deleted old record from the view - DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4), - {Log4, AddAcc4, DelAcc5}; - false -> - % an update operation has been logged for this key. We must - % now record it as deleted in the log, remove the old record - % in the view and update the view with a removed record. - NewSeq = dict:fetch(DocId, Seqs), - Log5 = case IsRemoved of - false -> - dict:append(DocId, {ViewId, {Key, NewSeq, del}}, Log4); - true -> - Log4 - end, - Rev = dict:fetch(DocId, Revs), - DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4), - AddAcc5 = dict:append(ViewId, {{NewSeq, Key}, {DocId, ?REM_VAL, Rev}}, AddAcc4), - {Log5, AddAcc5, DelAcc5} - end - end, Acc, VIdKeys) - end, {Log, dict:new(), dict:new()}, KVsToLook), - - ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log1), DIKeys /= []], - % store the new logs - {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []), - {ok, AddAcc, DelAcc, LogBtree2}. - collapse_rem_keys([], Acc) -> Acc; collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) -> diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index eb68124a0..d0d2b3949 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -20,7 +20,6 @@ -export([index_file/2, compaction_file/2, open_file/1]). -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]). -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]). --export([get_view_changes_count/1]). -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). -export([fold/4, fold_reduce/4]). -export([temp_view_to_ddoc/1]). @@ -33,9 +32,6 @@ -export([get_view_keys/1, get_view_queries/1]). -export([set_view_type/3]). -export([set_extra/3, get_extra/2, get_extra/3]). --export([changes_key_opts/2]). --export([fold_changes/4]). --export([to_key_seq/1]). -define(MOD, couch_mrview_index). -define(GET_VIEW_RETRY_COUNT, 1). @@ -172,15 +168,13 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> DictBySrcAcc end, {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}), - SeqIndexed = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), - KeySeqIndexed = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false), Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false), {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), BySrc = lists:foldl(MakeDict, dict:new(), RawViews), NumViews = fun({_, View}, N) -> - {View#mrview{id_num=N, seq_indexed=SeqIndexed, keyseq_indexed=KeySeqIndexed}, N+1} + {View#mrview{id_num=N}, N+1} end, {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))), @@ -194,8 +188,6 @@ ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> views=Views, language=Language, design_opts=DesignOpts, - seq_indexed=SeqIndexed, - keyseq_indexed=KeySeqIndexed, partitioned=Partitioned }, SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, @@ -253,11 +245,7 @@ view_sig(Db, State, View, #mrargs{include_docs=true}=Args) -> BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}), UpdateSeq = couch_db:get_update_seq(Db), PurgeSeq = couch_db:get_purge_seq(Db), - #mrst{ - seq_indexed=SeqIndexed, - keyseq_indexed=KeySeqIndexed - } = State, - Term = view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed), + Term = view_sig_term(BaseSig, UpdateSeq, PurgeSeq), couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))); view_sig(Db, State, {_Nth, _Lang, View}, Args) -> view_sig(Db, State, View, Args); @@ -265,24 +253,18 @@ view_sig(_Db, State, View, Args0) -> Sig = State#mrst.sig, UpdateSeq = View#mrview.update_seq, PurgeSeq = View#mrview.purge_seq, - SeqIndexed = View#mrview.seq_indexed, - KeySeqIndexed = View#mrview.keyseq_indexed, Args = Args0#mrargs{ preflight_fun=undefined, extra=[] }, - Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args), + Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, Args), couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))). -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false) -> - {BaseSig, UpdateSeq, PurgeSeq}; -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed) -> - {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed}. +view_sig_term(BaseSig, UpdateSeq, PurgeSeq) -> + {BaseSig, UpdateSeq, PurgeSeq}. -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, false, false, Args) -> - {BaseSig, UpdateSeq, PurgeSeq, Args}; -view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) -> - {BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args}. +view_sig_term(BaseSig, UpdateSeq, PurgeSeq, Args) -> + {BaseSig, UpdateSeq, PurgeSeq, Args}. init_state(Db, Fd, #mrst{views=Views}=State, nil) -> @@ -291,47 +273,25 @@ init_state(Db, Fd, #mrst{views=Views}=State, nil) -> seq=0, purge_seq=PurgeSeq, id_btree_state=nil, - log_btree_state=nil, view_states=[make_view_state(#mrview{}) || _ <- Views] }, init_state(Db, Fd, State, Header); -% read <= 1.2.x header record and transpile it to >=1.3.x -% header record -init_state(Db, Fd, State, #index_header{ - seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, - view_states=ViewStates}) -> - init_state(Db, Fd, State, #mrheader{ - seq=Seq, - purge_seq=PurgeSeq, - id_btree_state=IdBtreeState, - log_btree_state=nil, - view_states=[make_view_state(V) || V <- ViewStates] - }); init_state(Db, Fd, State, Header) -> #mrst{ language=Lang, - views=Views, - seq_indexed=SeqIndexed, - keyseq_indexed=KeySeqIndexed + views=Views } = State, #mrheader{ seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, - log_btree_state=LogBtreeState, view_states=ViewStates - } = Header, + } = maybe_update_header(Header), IdBtOpts = [ {compression, couch_compress:get_compression_method()} ], {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts), - {ok, LogBtree} = case SeqIndexed orelse KeySeqIndexed of - true -> couch_btree:open(LogBtreeState, Fd, IdBtOpts); - false -> {ok, nil} - end, OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end, Views2 = lists:zipwith(OpenViewFun, ViewStates, Views), @@ -342,7 +302,6 @@ init_state(Db, Fd, State, Header) -> update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, - log_btree=LogBtree, views=Views2 }. @@ -358,29 +317,7 @@ open_view(_Db, Fd, Lang, ViewState, View) -> ], {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts), - BySeqReduceFun = make_seq_reduce_fun(), - {ok, SeqBtree} = if View#mrview.seq_indexed -> - SeqBTState = get_seq_btree_state(ViewState), - ViewSeqBtOpts = [{reduce, BySeqReduceFun}, - {compression, Compression}], - - couch_btree:open(SeqBTState, Fd, ViewSeqBtOpts); - true -> - {ok, nil} - end, - {ok, KeyBySeqBtree} = if View#mrview.keyseq_indexed -> - KSeqBTState = get_kseq_btree_state(ViewState), - KeyBySeqBtOpts = [{less, LessFun}, - {reduce, BySeqReduceFun}, - {compression, Compression}], - couch_btree:open(KSeqBTState, Fd, KeyBySeqBtOpts); - true -> - {ok, nil} - end, - View#mrview{btree=Btree, - seq_btree=SeqBtree, - key_byseq_btree=KeyBySeqBtree, update_seq=get_update_seq(ViewState), purge_seq=get_purge_seq(ViewState)}. @@ -424,26 +361,6 @@ reduce_to_count(Reductions) -> FinalReduction = couch_btree:final_reduce(CountReduceFun, Reductions), get_count(FinalReduction). -%% @doc get all changes for a view -get_view_changes_count(View) -> - #mrview{seq_btree=SBtree, key_byseq_btree=KSBtree} = View, - CountFun = fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(SBtree, PartialReds)} - end, - {ok, Count} = case {SBtree, KSBtree} of - {nil, nil} -> - {ok, 0}; - {#btree{}, nil} -> - couch_btree:fold_reduce(SBtree, CountFun, 0, []); - {_, #btree{}} -> - couch_btree:fold_reduce(KSBtree, CountFun, 0, []) - end, - case {SBtree, KSBtree} of - {#btree{}, #btree{}} -> - {ok, Count*2}; - _ -> - {ok, Count} - end. fold(#mrview{btree=Bt}, Fun, Acc, Opts) -> WrapperFun = fun(KV, Reds, Acc2) -> @@ -462,23 +379,6 @@ fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> end. -fold_changes(Bt, Fun, Acc, Opts) -> - WrapperFun = fun(KV, _Reds, Acc2) -> - fold_changes_fun(Fun, changes_expand_dups([KV], []), Acc2) - end, - {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts). - -fold_changes_fun(_Fun, [], Acc) -> - {ok, Acc}; -fold_changes_fun(Fun, [KV|Rest], Acc) -> - case Fun(KV, Acc) of - {ok, Acc2} -> - fold_changes_fun(Fun, Rest, Acc2); - {stop, Acc2} -> - {stop, Acc2} - end. - - fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> #mrview{ btree=Bt, @@ -812,7 +712,6 @@ make_header(State) -> update_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, - log_btree=LogBtree, views=Views } = State, @@ -820,7 +719,6 @@ make_header(State) -> seq=Seq, purge_seq=PurgeSeq, id_btree_state=get_btree_state(IdBtree), - log_btree_state=get_btree_state(LogBtree), view_states=[make_view_state(V) || V <- Views] }. @@ -876,16 +774,9 @@ reset_state(State) -> State#mrst{ fd=nil, qserver=nil, - seq_indexed=State#mrst.seq_indexed, - keyseq_indexed=State#mrst.keyseq_indexed, update_seq=0, id_btree=nil, - log_btree=nil, - views=[View#mrview{btree=nil, seq_btree=nil, - key_byseq_btree=nil, - seq_indexed=View#mrview.seq_indexed, - keyseq_indexed=View#mrview.keyseq_indexed} - || View <- State#mrst.views] + views=[View#mrview{btree=nil} || View <- State#mrst.views] }. @@ -953,45 +844,6 @@ reverse_key_default(<<255>>) -> <<>>; reverse_key_default(Key) -> Key. -changes_key_opts(StartSeq, Args) -> - changes_key_opts(StartSeq, Args, []). - - -changes_key_opts(StartSeq, #mrargs{keys=undefined, direction=Dir}=Args, Extra) -> - [[{dir, Dir}] ++ changes_skey_opts(StartSeq, Args) ++ - changes_ekey_opts(StartSeq, Args) ++ Extra]; -changes_key_opts(StartSeq, #mrargs{keys=Keys, direction=Dir}=Args, Extra) -> - lists:map(fun(K) -> - [{dir, Dir}] - ++ changes_skey_opts(StartSeq, Args#mrargs{start_key=K}) - ++ changes_ekey_opts(StartSeq, Args#mrargs{end_key=K}) - ++ Extra - end, Keys). - - -changes_skey_opts(StartSeq, #mrargs{start_key=undefined}) -> - [{start_key, [<<>>, StartSeq+1]}]; -changes_skey_opts(StartSeq, #mrargs{start_key=SKey, - start_key_docid=SKeyDocId}) -> - [{start_key, {[SKey, StartSeq+1], SKeyDocId}}]. - - -changes_ekey_opts(_StartSeq, #mrargs{end_key=undefined}) -> - []; -changes_ekey_opts(_StartSeq, #mrargs{end_key=EKey, - end_key_docid=EKeyDocId, - direction=Dir}=Args) -> - EndSeq = case Dir of - fwd -> 16#10000000; - rev -> 0 - end, - - case Args#mrargs.inclusive_end of - true -> [{end_key, {[EKey, EndSeq], EKeyDocId}}]; - false -> [{end_key_gt, {[EKey, EndSeq], EKeyDocId}}] - end. - - reduced_external_size(Tree) -> case couch_btree:full_reduce(Tree) of {ok, {_, _, Size}} -> Size; @@ -1000,53 +852,26 @@ reduced_external_size(Tree) -> end. -reduced_seq_external_size(Tree) -> - case couch_btree:full_reduce(Tree) of - {ok, {_, Size}} -> Size; - % return 0 for older versions that only returned number of docs - {ok, NumDocs} when is_integer(NumDocs) -> 0 - end. - - calculate_external_size(Views) -> - SumFun = fun(#mrview{btree=Bt, seq_btree=SBt, key_byseq_btree=KSBt}, Acc) -> - Size0 = sum_btree_sizes(Acc, reduced_external_size(Bt)), - Size1 = case SBt of - nil -> Size0; - _ -> sum_btree_sizes(Size0, reduced_seq_external_size(SBt)) - end, - case KSBt of - nil -> Size1; - _ -> sum_btree_sizes(Size1, reduced_seq_external_size(KSBt)) - end + SumFun = fun + (#mrview{btree=nil}, Acc) -> + Acc; + (#mrview{btree=Bt}, Acc) -> + Acc + reduced_external_size(Bt) end, {ok, lists:foldl(SumFun, 0, Views)}. calculate_active_size(Views) -> - BtSize = fun - (nil) -> 0; - (Bt) -> couch_btree:size(Bt) - end, - FoldFun = fun(View, Acc) -> - Sizes = [ - BtSize(View#mrview.btree), - BtSize(View#mrview.seq_btree), - BtSize(View#mrview.key_byseq_btree) - ], - Acc + lists:sum([S || S <- Sizes, is_integer(S)]) + FoldFun = fun + (#mrview{btree=nil}, Acc) -> + Acc; + (#mrview{btree=Bt}, Acc) -> + Acc + couch_btree:size(Bt) end, {ok, lists:foldl(FoldFun, 0, Views)}. -sum_btree_sizes(nil, _) -> - 0; -sum_btree_sizes(_, nil) -> - 0; -sum_btree_sizes(Size1, Size2) -> - Size1 + Size2. - - detuple_kvs([], Acc) -> lists:reverse(Acc); detuple_kvs([KV | Rest], Acc) -> @@ -1064,19 +889,6 @@ expand_dups([KV | Rest], Acc) -> expand_dups(Rest, [KV | Acc]). -changes_expand_dups([], Acc) -> - lists:reverse(Acc); -changes_expand_dups([{{[Key, Seq], DocId}, {dups, Vals}} | Rest], Acc) -> - Expanded = [{{Seq, Key, DocId}, Val} || Val <- Vals], - changes_expand_dups(Rest, Expanded ++ Acc); -changes_expand_dups([{{Seq, Key}, {DocId, {dups, Vals}}} | Rest], Acc) -> - Expanded = [{{Seq, Key, DocId}, Val} || Val <- Vals], - changes_expand_dups(Rest, Expanded ++ Acc); -changes_expand_dups([{{[Key, Seq], DocId}, {Val, _}} | Rest], Acc) -> - changes_expand_dups(Rest, [{{Seq, Key, DocId}, Val} | Acc]); -changes_expand_dups([{{Seq, Key}, {DocId, Val, _}} | Rest], Acc) -> - changes_expand_dups(Rest, [{{Seq, Key, DocId}, Val} | Acc]). - maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) -> []; maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true, doc_options=Opts}) -> @@ -1126,33 +938,18 @@ mrverror(Mesg) -> throw({query_parse_error, Mesg}). -to_key_seq(L) -> - [{{[Key, Seq], DocId}, {Val, Rev}} || {{Seq, Key}, {DocId, Val, Rev}} <- L]. - -%% Updates 1.2.x or earlier view files to 1.3.x or later view files -%% transparently, the first time the 1.2.x view file is opened by -%% 1.3.x or later. +%% Updates 2.x view files to 3.x or later view files +%% transparently, the first time the 2.x view file is opened by +%% 3.x or later. %% %% Here's how it works: %% %% Before opening a view index, %% If no matching index file is found in the new location: -%% calculate the <= 1.2.x view signature +%% calculate the <= 2.x view signature %% if a file with that signature lives in the old location %% rename it to the new location with the new signature in the name. %% Then proceed to open the view index as usual. -%% After opening, read its header. -%% -%% If the header matches the <= 1.2.x style #index_header record: -%% upgrade the header to the new #mrheader record -%% The next time the view is used, the new header is used. -%% -%% If we crash after the rename, but before the header upgrade, -%% the header upgrade is done on the next view opening. -%% -%% If we crash between upgrading to the new header and writing -%% that header to disk, we start with the old header again, -%% do the upgrade and write to disk. maybe_update_index_file(State) -> DbName = State#mrst.db_name, @@ -1168,10 +965,10 @@ maybe_update_index_file(State) -> end. update_index_file(State) -> - Sig = sig_vsn_12x(State), + Sig = sig_vsn_2x(State), DbName = State#mrst.db_name, FileName = couch_index_util:hexsig(Sig) ++ ".view", - IndexFile = couch_index_util:index_file("", DbName, FileName), + IndexFile = couch_index_util:index_file("mrview", DbName, FileName), % If we have an old index, rename it to the new position. case file:read_file_info(IndexFile) of @@ -1180,82 +977,85 @@ update_index_file(State) -> % If the target exists, e.g. the next request will find the % new file and we are good. We might need to catch this % further up to avoid a full server crash. - couch_log:info("Attempting to update legacy view index file.", []), NewIndexFile = index_file(DbName, State#mrst.sig), + couch_log:notice("Attempting to update legacy view index file" + " from ~p to ~s", [IndexFile, NewIndexFile]), ok = filelib:ensure_dir(NewIndexFile), ok = file:rename(IndexFile, NewIndexFile), - couch_log:info("Successfully updated legacy view index file.", []), + couch_log:notice("Successfully updated legacy view index file" + " ~s", [IndexFile]), Sig; - _ -> + {error, enoent} -> % Ignore missing index file + ok; + {error, Reason} -> + couch_log:error("Failed to update legacy view index file" + " ~s : ~s", [IndexFile, file:format_error(Reason)]), ok end. -sig_vsn_12x(State) -> - ViewInfo = [old_view_format(V) || V <- State#mrst.views], - SigData = case State#mrst.lib of - {[]} -> - {ViewInfo, State#mrst.language, State#mrst.design_opts}; - _ -> - {ViewInfo, State#mrst.language, State#mrst.design_opts, - couch_index_util:sort_lib(State#mrst.lib)} - end, - couch_hash:md5_hash(term_to_binary(SigData)). +sig_vsn_2x(State) -> + #mrst{ + lib = Lib, + language = Language, + design_opts = DesignOpts + } = State, + SI = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), + KSI = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false), + Views = [old_view_format(V, SI, KSI) || V <- State#mrst.views], + SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, + couch_hash:md5_hash(term_to_binary(SigInfo)). -old_view_format(View) -> +old_view_format(View, SI, KSI) -> { - view, + mrview, View#mrview.id_num, + View#mrview.update_seq, + View#mrview.purge_seq, View#mrview.map_names, + View#mrview.reduce_funs, View#mrview.def, View#mrview.btree, - View#mrview.reduce_funs, + nil, + nil, + SI, + KSI, View#mrview.options }. -%% End of <= 1.2.x upgrade code. +maybe_update_header(#mrheader{} = Header) -> + Header; +maybe_update_header(Header) when tuple_size(Header) == 6 -> + #mrheader{ + seq = element(2, Header), + purge_seq = element(3, Header), + id_btree_state = element(4, Header), + view_states = [make_view_state(S) || S <- element(6, Header)] + }. + +%% End of <= 2.x upgrade code. make_view_state(#mrview{} = View) -> BTState = get_btree_state(View#mrview.btree), - SeqBTState = case View#mrview.seq_indexed of - true -> - get_btree_state(View#mrview.seq_btree); - _ -> - nil - end, - KSeqBTState = case View#mrview.keyseq_indexed of - true -> - get_btree_state(View#mrview.key_byseq_btree); - _ -> - nil - end, { BTState, - SeqBTState, - KSeqBTState, View#mrview.update_seq, View#mrview.purge_seq }; -make_view_state({BTState, UpdateSeq, PurgeSeq}) -> - {BTState, nil, nil, UpdateSeq, PurgeSeq}; +make_view_state({BTState, _SeqBTState, _KSeqBTState, UpdateSeq, PurgeSeq}) -> + {BTState, UpdateSeq, PurgeSeq}; make_view_state(nil) -> - {nil, nil, nil, 0, 0}. + {nil, 0, 0}. get_key_btree_state(ViewState) -> element(1, ViewState). -get_seq_btree_state(ViewState) -> - element(2, ViewState). - -get_kseq_btree_state(ViewState) -> - element(3, ViewState). - get_update_seq(ViewState) -> - element(4, ViewState). + element(2, ViewState). get_purge_seq(ViewState) -> - element(5, ViewState). + element(3, ViewState). get_count(Reduction) -> element(1, Reduction). @@ -1296,22 +1096,6 @@ make_reduce_fun(Lang, ReduceFuns) -> {Counts, Result, ExternalSize} end. -make_seq_reduce_fun() -> - fun - (reduce, KVs0) -> - KVs = detuple_kvs(expand_dups(KVs0, []), []), - NumDocs = length(KVs), - ExternalSize = kv_external_size(KVs, NumDocs), - {NumDocs, ExternalSize}; - (rereduce, Reds) -> - ExtractFun = fun(Red, {NumDocsAcc0, ExtAcc0}) -> - NumDocsAcc = NumDocsAcc0 + get_count(Red), - ExtAcc = ExtAcc0 + get_external_size_reds(Red), - {NumDocsAcc, ExtAcc} - end, - lists:foldl(ExtractFun, {0, 0}, Reds) - end. - maybe_define_less_fun(#mrview{options = Options}) -> case couch_util:get_value(<<"collation">>, Options) of diff --git a/src/couch_mrview/test/eunit/couch_mrview_changes_since_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_changes_since_tests.erl deleted file mode 100644 index 67106419f..000000000 --- a/src/couch_mrview/test/eunit/couch_mrview_changes_since_tests.erl +++ /dev/null @@ -1,209 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_mrview_changes_since_tests). - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). - --define(TIMEOUT, 1000). - -teardown(Db) -> - couch_db:close(Db), - couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), - ok. - -changes_since_basic_test_() -> - { - "changes_since tests", - { - setup, - fun test_util:start_couch/0, fun test_util:stop_couch/1, - { - foreach, - fun() -> - Type = {changes, seq_indexed}, - {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), Type), - Db - end, - fun teardown/1, - [ - fun test_basic/1, - fun test_basic_since/1, - fun test_basic_count/1, - fun test_basic_count_since/1, - fun test_compact/1 - ] - } - } - }. - -changes_since_range_test_() -> - { - "changes_since_range tests", - { - setup, - fun test_util:start_couch/0, fun test_util:stop_couch/1, - { - foreach, - fun() -> - Type = {changes, keyseq_indexed}, - {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), Type), - Db - end, - fun teardown/1, - [ - fun test_range/1, - fun test_range_since/1 - ] - } - } - }. - -changes_since_range_count_test_() -> - { - "changes_since_range_count tests", - { - setup, - fun test_util:start_couch/0, fun test_util:stop_couch/1, - { - foreach, - fun() -> - Type = {changes, seq_indexed_keyseq_indexed}, - {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), Type), - Db - end, - fun teardown/1, - [ - fun test_range_count/1, - fun test_range_count_since/1, - fun test_remove_key/1 - ] - } - } - }. - -test_basic(Db) -> - Result = run_query(Db, 0, []), - Expect = {ok, [ - {{2, <<"1">>, <<"1">>}, 1}, - {{3, <<"10">>, <<"10">>}, 10}, - {{4, <<"2">>, <<"2">>}, 2}, - {{5, <<"3">>, <<"3">>}, 3}, - {{6, <<"4">>, <<"4">>}, 4}, - {{7, <<"5">>, <<"5">>}, 5}, - {{8, <<"6">>, <<"6">>}, 6}, - {{9, <<"7">>, <<"7">>}, 7}, - {{10, <<"8">>, <<"8">>}, 8}, - {{11, <<"9">>, <<"9">>}, 9} - ]}, - ?_assertEqual(Result, Expect). - - -test_range(Db) -> - Range = [{start_key, <<"3">>}, {end_key, <<"5">>}], - Result = run_query(Db, 0, Range), - Expect = {ok, [ - {{5, <<"3">>, <<"3">>}, 3}, - {{6, <<"4">>, <<"4">>}, 4}, - {{7, <<"5">>, <<"5">>}, 5} - ]}, - ?_assertEqual(Result, Expect). - -test_basic_since(Db) -> - Result = run_query(Db, 5, []), - Expect = {ok, [ - {{6, <<"4">>, <<"4">>}, 4}, - {{7, <<"5">>, <<"5">>}, 5}, - {{8, <<"6">>, <<"6">>}, 6}, - {{9, <<"7">>, <<"7">>}, 7}, - {{10, <<"8">>, <<"8">>}, 8}, - {{11, <<"9">>, <<"9">>}, 9} - ]}, - ?_assertEqual(Result, Expect). - -test_range_since(Db) -> - Range = [{start_key, <<"3">>}, {end_key, <<"5">>}], - Result = run_query(Db, 5, Range), - Expect = {ok, [ - {{6, <<"4">>, <<"4">>}, 4}, - {{7, <<"5">>, <<"5">>}, 5} - ]}, - ?_assertEqual(Result, Expect). - -test_basic_count(Db) -> - Result = run_count_query(Db, 0, []), - ?_assertEqual(Result, 10). - -test_range_count(Db) -> - Range = [{start_key, <<"3">>}, {end_key, <<"5">>}], - Result = run_count_query(Db, 0, Range), - ?_assertEqual(Result, 3). - -test_basic_count_since(Db) -> - Result = run_count_query(Db, 5, []), - ?_assertEqual(Result, 6). - -test_range_count_since(Db) -> - Range = [{start_key, <<"3">>}, {end_key, <<"5">>}], - Result = run_count_query(Db, 5, Range), - ?_assertEqual(Result, 2). - -test_compact(Db) -> - Result = couch_mrview:compact(Db, <<"_design/bar">>), - Count = run_count_query(Db, 0, []), - [?_assertEqual(Result, ok), ?_assertEqual(Count, 10)]. - -test_remove_key(Db) -> - %% add new doc - Doc = couch_mrview_test_util:doc(11), - {ok, Rev} = couch_db:update_doc(Db, Doc, []), - RevStr = couch_doc:rev_to_str(Rev), - {ok, Db1} = couch_db:reopen(Db), - Result = run_count_query(Db1, 0, []), - %% check new view key - Range = [{start_key, <<"11">>}, {end_key, <<"11">>}], - Result1 = run_query(Db1, 0, Range), - Expect = {ok, [ - {{12, <<"11">>, <<"11">>}, 11} - ]}, - - %% delete doc - Doc2 = couch_doc:from_json_obj({[ - {<<"_id">>, <<"11">>}, - {<<"_rev">>, RevStr}, - {<<"_deleted">>, true} - ]}), - {ok, _} = couch_db:update_doc(Db1, Doc2, []), - {ok, Db2} = couch_db:reopen(Db1), - Result2 = run_count_query(Db2, 0, []), - %% check new view key - Result3 = run_query(Db2, 0, Range), - Expect2 = {ok, [ - {{13, <<"11">>, <<"11">>}, removed} - ]}, - [ - ?_assertEqual(Result, 11), - ?_assertEqual(Result1, Expect), - ?_assertEqual(Result2, 11), - ?_assertEqual(Result3, Expect2) - ]. - -run_query(Db, Since, Opts) -> - Fun = fun(KV, Acc) -> {ok, [KV | Acc]} end, - {ok, R} = couch_mrview:view_changes_since(Db, <<"_design/bar">>, <<"baz">>, - Since, Fun, Opts, []), - {ok, lists:reverse(R)}. - -run_count_query(Db, Since, Opts) -> - couch_mrview:count_view_changes_since(Db, <<"_design/bar">>, <<"baz">>, - Since, Opts). diff --git a/src/couch_mrview/test/eunit/couch_mrview_index_changes_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_index_changes_tests.erl deleted file mode 100644 index d8dd28773..000000000 --- a/src/couch_mrview/test/eunit/couch_mrview_index_changes_tests.erl +++ /dev/null @@ -1,223 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_mrview_index_changes_tests). - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). - - -setup() -> - {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map), - Db. - -teardown(Db) -> - couch_db:close(Db), - couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), - ok. - -changes_index_test() -> - { - "changes index tests", - { - setup, - fun test_util:start_couch/0, fun test_util:stop_couch/1, - { - foreach, - fun setup/0, fun teardown/1, - [ - fun test_normal_changes/1, - fun test_stream_once/1, - fun test_stream_once_since/1, - fun test_stream_once_timeout/1, - fun test_stream_once_heartbeat/1, - fun test_stream/1, - fun test_indexer/1 - ] - } - } - }. - - -test_normal_changes(Db) -> - Result = run_query(Db, []), - Expect = {ok, 11, [ - {{2, 1, <<"1">>}, 1}, - {{3, 10, <<"10">>}, 10}, - {{4, 2, <<"2">>}, 2}, - {{5, 3, <<"3">>}, 3}, - {{6, 4, <<"4">>}, 4}, - {{7, 5, <<"5">>}, 5}, - {{8, 6, <<"6">>}, 6}, - {{9, 7, <<"7">>}, 7}, - {{10, 8, <<"8">>}, 8}, - {{11, 9, <<"9">>}, 9} - ]}, - ?_assertEqual(Result, Expect). - -test_stream_once(Db) -> - Result = run_query(Db, [{stream, once}]), - Expect = {ok, 11, [ - {{2, 1, <<"1">>}, 1}, - {{3, 10, <<"10">>}, 10}, - {{4, 2, <<"2">>}, 2}, - {{5, 3, <<"3">>}, 3}, - {{6, 4, <<"4">>}, 4}, - {{7, 5, <<"5">>}, 5}, - {{8, 6, <<"6">>}, 6}, - {{9, 7, <<"7">>}, 7}, - {{10, 8, <<"8">>}, 8}, - {{11, 9, <<"9">>}, 9} - ]}, - ?_assertEqual(Result, Expect). - - -test_stream_once_since(Db) -> - Self = self(), - spawn(fun() -> - Result = run_query(Db, [{since, 11}, - {stream, once}]), - Self ! {result, Result} - end), - - spawn(fun() -> - timer:sleep(1000), - {ok, Db1} = save_doc(Db, 11), - couch_mrview:refresh(Db1, <<"_design/bar">>) - end), - - Expect = {ok,12,[{{12,11,<<"11">>},11}]}, - - receive - {result, Result} -> - ?_assertEqual(Result, Expect) - after 5000 -> - io:format("never got the change", []) - end. - - -test_stream_once_timeout(Db) -> - Self = self(), - spawn(fun() -> - Result = run_query(Db, [{since, 12}, - {stream, once}, - {timeout, 3000}]), - Self ! {result, Result} - end), - - - - Expect = {ok, 12, []}, - - receive - {result, Result} -> - ?_assertEqual(Result, Expect) - after 5000 -> - io:format("never got the change", []) - end. - -test_stream_once_heartbeat(Db) -> - Self = self(), - spawn(fun() -> - Result = run_query(Db, [{since, 12}, - {stream, once}, - {heartbeat, 1000}]), - Self ! {result, Result} - end), - - spawn(fun() -> - timer:sleep(3000), - {ok, Db1} = save_doc(Db, 12), - couch_mrview:refresh(Db1, <<"_design/bar">>) - end), - - Expect = {ok,13,[heartbeat, - heartbeat, - heartbeat, - {{13,12,<<"12">>},12}]}, - - - - receive - {result, Result} -> - ?_assertEqual(Result, Expect) - after 5000 -> - io:format("never got the change", []) - end. - - -test_stream(Db) -> - Self = self(), - spawn(fun() -> - Result = run_query(Db, [{since, 13}, - stream, - {timeout, 3000}]), - Self ! {result, Result} - end), - - spawn(fun() -> - timer:sleep(1000), - {ok, Db1} = save_doc(Db, 13), - couch_mrview:refresh(Db1, <<"_design/bar">>), - {ok, Db2} = save_doc(Db1, 14), - couch_mrview:refresh(Db2, <<"_design/bar">>) - end), - - Expect = {ok, 15,[{{14,13,<<"13">>},13}, - {{15,14,<<"14">>},14}]}, - - receive - {result, Result} -> - ?_assertEqual(Result, Expect) - after 5000 -> - io:format("never got the change", []) - end. - - -test_indexer(Db) -> - Result = run_query(Db, [{since, 14}, refresh]), - Expect = {ok, 15, [{{15,14,<<"14">>},14}]}, - - {ok, Db1} = save_doc(Db, 15), - timer:sleep(1500), - Result1 = run_query(Db1, [{since, 14}], false), - Expect1 = {ok, 16, [{{15,14,<<"14">>},14}, - {{16,15,<<"15">>},15}]}, - ?_assert(Result == Expect andalso Result1 == Expect1). - - -save_doc(Db, Id) -> - Doc = couch_mrview_test_util:doc(Id), - {ok, _Rev} = couch_db:update_doc(Db, Doc, []), - couch_db:reopen(Db). - -run_query(Db, Opts) -> - run_query(Db, Opts, true). - -run_query(Db, Opts, Refresh) -> - Fun = fun - (stop, {LastSeq, Acc}) -> - {ok, LastSeq, Acc}; - (heartbeat, Acc) -> - {ok, [heartbeat | Acc]}; - (Event, Acc) -> - {ok, [Event | Acc]} - end, - case Refresh of - true -> - couch_mrview:refresh(Db, <<"_design/bar">>); - false -> - ok - end, - {ok, LastSeq, R} = couch_mrview_changes:handle_changes(Db, <<"_design/bar">>, - <<"baz">>, Fun, [], Opts), - {ok, LastSeq, lists:reverse(R)}. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 97374be1f..7b688b2b9 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -52,10 +52,9 @@ changes(DbName, Options, StartVector, DbOptions) -> Args0#changes_args{ filter_fun={custom, Style, Req, DDoc, FName} }; - {fetch, FilterType, Style, {DDocId, Rev}, VName} - when FilterType == view orelse FilterType == fast_view -> + {fetch, view, Style, {DDocId, Rev}, VName} -> {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), - Args0#changes_args{filter_fun={FilterType, Style, DDoc, VName}}; + Args0#changes_args{filter_fun={view, Style, DDoc, VName}}; _ -> Args0 end, |