diff options
author | Eric Avdey <eiri@eiri.ca> | 2019-11-12 13:45:26 -0400 |
---|---|---|
committer | Eric Avdey <eiri@eiri.ca> | 2019-12-03 13:54:40 -0400 |
commit | 6a58d8c508621aadfcb72a43c9c37e048f2a10e5 (patch) | |
tree | d297bbb49000abbb7a57818f8275596e84c00e40 | |
parent | 2c8966f122943f6aba65bb571dfb50ba6b699fb1 (diff) | |
download | couchdb-6a58d8c508621aadfcb72a43c9c37e048f2a10e5.tar.gz |
Remove view_changes functionality from couch_changes side
-rw-r--r-- | src/couch/src/couch_changes.erl | 278 | ||||
-rw-r--r-- | src/couch/src/couch_multidb_changes.erl | 4 | ||||
-rw-r--r-- | src/couch/test/eunit/couch_changes_tests.erl | 38 |
3 files changed, 26 insertions, 294 deletions
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index c5b5edf9f..2d6b58174 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -16,7 +16,6 @@ -export([ handle_db_changes/3, - handle_changes/4, get_changes_timeout/2, wait_updated/3, get_rest_updated/1, @@ -24,7 +23,6 @@ filter/3, handle_db_event/3, handle_view_event/3, - view_filter/3, send_changes_doc_ids/6, send_changes_design_docs/6 ]). @@ -57,53 +55,21 @@ aggregation_results }). -handle_db_changes(Args, Req, Db) -> - handle_changes(Args, Req, Db, db). - -handle_changes(Args1, Req, Db0, Type) -> +handle_db_changes(Args0, Req, Db0) -> #changes_args{ style = Style, filter = FilterName, feed = Feed, dir = Dir, since = Since - } = Args1, + } = Args0, Filter = configure_filter(FilterName, Style, Req, Db0), - Args = Args1#changes_args{filter_fun = Filter}, - % The type of changes feed depends on the supplied filter. If the query is - % for an optimized view-filtered db changes, we need to use the view - % sequence tree. - {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of - {{view, DDocName0, ViewName0}, _} -> - {true, DDocName0, ViewName0}; - {_, {fast_view, _, DDoc, ViewName0}} -> - {true, DDoc#doc.id, ViewName0}; - _ -> - {false, undefined, undefined} - end, + Args = Args0#changes_args{filter_fun = Filter}, DbName = couch_db:name(Db0), - {StartListenerFun, View} = if UseViewChanges -> - {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view( - DbName, DDocName, ViewName, #mrargs{}), - case View0#mrview.seq_btree of - #btree{} -> - ok; - _ -> - throw({bad_request, "view changes not enabled"}) - end, - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}] - ) - end, - {SNFun, View0}; - true -> - SNFun = fun() -> - couch_event:link_listener( - ?MODULE, handle_db_event, self(), [{dbname, DbName}] - ) - end, - {SNFun, undefined} + StartListenerFun = fun() -> + couch_event:link_listener( + ?MODULE, handle_db_event, self(), [{dbname, DbName}] + ) end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), @@ -113,14 +79,7 @@ handle_changes(Args1, Req, Db0, Type) -> fwd -> Since end, - View2 = if UseViewChanges -> - {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view( - DbName, DDocName, ViewName, #mrargs{}), - View1; - true -> - undefined - end, - {Db, View2, StartSeq} + {Db, StartSeq} end, % begin timer to deal with heartbeat when filter function fails case Args#changes_args.heartbeat of @@ -136,12 +95,11 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), {ok, Listener} = StartListenerFun(), - {Db, View, StartSeq} = Start(), + {Db, StartSeq} = Start(), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, - <<"">>, Timeout, TimeoutFun, DDocName, ViewName, - View), + <<"">>, Timeout, TimeoutFun), try keep_sending_changes( Args#changes_args{dir=fwd}, @@ -157,10 +115,10 @@ handle_changes(Args1, Req, Db0, Type) -> {Callback, UserAcc} = get_callback_acc(CallbackAcc), UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), - {Db, View, StartSeq} = Start(), + {Db, StartSeq} = Start(), Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, - UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun, - DDocName, ViewName, View), + UserAcc2, Db, StartSeq, <<>>, + Timeout, TimeoutFun), {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = send_changes( Acc0, @@ -214,21 +172,12 @@ configure_filter("_view", Style, Req, Db) -> [DName, VName] -> {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>), check_member_exists(DDoc, [<<"views">>, VName]), - FilterType = try - true = couch_util:get_nested_json_value( - DDoc#doc.body, - [<<"options">>, <<"seq_indexed">>] - ), - fast_view - catch _:_ -> - view - end, case couch_db:is_clustered(Db) of true -> DIR = fabric_util:doc_id_and_rev(DDoc), - {fetch, FilterType, Style, DIR, VName}; + {fetch, view, Style, DIR, VName}; false -> - {FilterType, Style, DDoc, VName} + {view, Style, DDoc, VName} end; [] -> Msg = "`view` must be of the form `designname/viewname`", @@ -285,8 +234,7 @@ filter(_Db, DocInfo, {design_docs, Style}) -> _ -> [] end; -filter(Db, DocInfo, {FilterType, Style, DDoc, VName}) - when FilterType == view; FilterType == fast_view -> +filter(Db, DocInfo, {view, Style, DDoc, VName}) -> Docs = open_revs(Db, DocInfo, Style), {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), filter_revs(Passes, Docs); @@ -299,35 +247,6 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). -fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) -> - case couch_db:get_doc_info(Db, ID) of - {ok, #doc_info{high_seq=Seq}=DocInfo} -> - Docs = open_revs(Db, DocInfo, Style), - Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) -> - RevStr = couch_doc:rev_to_str({RevPos, RevId}), - {[{<<"rev">>, RevStr}]} - end, Docs), - {DocInfo, Changes}; - {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq -> - % If the view seq tree is out of date (or if the view seq tree - % was opened before the db) seqs may come by from the seq tree - % which correspond to the not-most-current revision of a document. - % The proper thing to do is to not send this old revision, but wait - % until we reopen the up-to-date view seq tree and continue the - % fold. - % I left the Seq > HighSeq guard in so if (for some godforsaken - % reason) the seq in the view is more current than the database, - % we'll throw an error. - {undefined, []}; - {error, not_found} -> - {undefined, []} - end. - - - -view_filter(Db, KV, {default, Style}) -> - apply_view_style(Db, KV, Style). - get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), @@ -425,16 +344,6 @@ apply_style(#doc_info{revs=Revs}, main_only) -> apply_style(#doc_info{revs=Revs}, all_docs) -> [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. -apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; -apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) -> - case couch_db:get_doc_info(Db, ID) of - {ok, DocInfo} -> - apply_style(DocInfo, all_docs); - {error, not_found} -> - [] - end. - open_revs(Db, DocInfo, Style) -> DocInfos = case Style of @@ -493,7 +402,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType) start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). -build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) -> +build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ include_docs = IncludeDocs, doc_options = DocOpts, @@ -516,9 +425,6 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D conflicts = Conflicts, timeout = Timeout, timeout_fun = TimeoutFun, - ddoc_name = DDocName, - view_name = ViewName, - view = View, aggregation_results=[], aggregation_kvs=[] }. @@ -527,41 +433,15 @@ send_changes(Acc, Dir, FirstRound) -> #changes_acc{ db = Db, seq = StartSeq, - filter = Filter, - view = View + filter = Filter } = Acc, DbEnumFun = fun changes_enumerator/2, case can_optimize(FirstRound, Filter) of {true, Fun} -> Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter); _ -> - case {View, Filter} of - {#mrview{}, {fast_view, _, _, _}} -> - couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); - {undefined, _} -> - Opts = [{dir, Dir}], - couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); - {#mrview{}, _} -> - ViewEnumFun = fun view_changes_enumerator/2, - {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), - case Acc0 of - #changes_acc{aggregation_results=[]} -> - {Go, Acc0}; - _ -> - #changes_acc{ - aggregation_results = AggResults, - aggregation_kvs = AggKVs, - user_acc = UserAcc, - callback = Callback, - resp_type = ResponseType, - prepend = Prepend - } = Acc0, - ChangesRow = view_changes_row(AggResults, AggKVs, Acc0), - UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc0#changes_acc{user_acc=UserAcc0}} - end - end + Opts = [{dir, Dir}], + couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts) end. @@ -653,8 +533,7 @@ keep_sending_changes(Args, Acc0, FirstRound) -> #changes_acc{ db = Db, callback = Callback, timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, - prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit, - ddoc_name = DDocName, view_name = ViewName + prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit } = ChangesAcc, couch_db:close(Db), @@ -670,7 +549,6 @@ keep_sending_changes(Args, Acc0, FirstRound) -> Args#changes_args{limit=NewLimit}, ChangesAcc#changes_acc{ db = Db2, - view = maybe_refresh_view(Db2, DDocName, ViewName), user_acc = UserAcc4, seq = EndSeq, prepend = Prepend2, @@ -685,104 +563,22 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end end. -maybe_refresh_view(_, undefined, undefined) -> - undefined; -maybe_refresh_view(Db, DDocName, ViewName) -> - DbName = couch_db:name(Db), - {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}), - View. - end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> Callback({stop, EndSeq}, ResponseType, UserAcc). -view_changes_enumerator(Value, Acc) -> - #changes_acc{ - filter = Filter, callback = Callback, prepend = Prepend, - user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq, - aggregation_kvs=AggKVs, aggregation_results=AggResults - } = Acc, - - Results0 = view_filter(Db, Value, Filter), - Results = [Result || Result <- Results0, Result /= null], - {{Seq, _}, _} = Value, - - Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, - - if CurrentSeq =:= Seq -> - NewAggKVs = case Results of - [] -> AggKVs; - _ -> [Value|AggKVs] - end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - Acc0 = Acc#changes_acc{ - seq = Seq, - user_acc = UserAcc2, - aggregation_kvs=NewAggKVs - }, - case Done of - stop -> {stop, Acc0}; - ok -> {Go, Acc0} - end; - AggResults =/= [] -> - {NewAggKVs, NewAggResults} = case Results of - [] -> {[], []}; - _ -> {[Value], Results} - end, - if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> - ChangesRow = view_changes_row(AggResults, AggKVs, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, user_acc = UserAcc2, limit = Limit - 1, - aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}}; - true -> - ChangesRow = view_changes_row(AggResults, AggKVs, Acc), - UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2, - limit = Limit - 1, aggregation_kvs=[Value], - aggregation_results=Results}} - end; - true -> - {NewAggKVs, NewAggResults} = case Results of - [] -> {[], []}; - _ -> {[Value], Results} - end, - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - Acc0 = Acc#changes_acc{ - seq = Seq, - user_acc = UserAcc2, - aggregation_kvs=NewAggKVs, - aggregation_results=NewAggResults - }, - case Done of - stop -> {stop, Acc0}; - ok -> {Go, Acc0} - end - end. - -changes_enumerator(Value0, Acc) -> +changes_enumerator(Value, Acc) -> #changes_acc{ filter = Filter, callback = Callback, prepend = Prepend, user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, timeout = Timeout, timeout_fun = TimeoutFun } = Acc, - {Value, Results0} = case Filter of - {fast_view, _, _, _} -> - fast_view_filter(Db, Value0, Filter); - _ -> - {Value0, filter(Db, Value0, Filter)} - end, + Results0 = filter(Db, Value, Filter), Results = [Result || Result <- Results0, Result /= null], Seq = case Value of #full_doc_info{} -> Value#full_doc_info.update_seq; #doc_info{} -> - Value#doc_info.high_seq; - {{Seq0, _}, _} -> - Seq0 + Value#doc_info.high_seq end, Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of @@ -812,32 +608,6 @@ changes_enumerator(Value0, Acc) -> -view_changes_row(Results, KVs, Acc) -> - {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) -> - {{_Seq, Key}, {_Id, Value, _Rev}} = Row, - case Value of - removed -> - {AddAcc, [Key|RemAcc]}; - {dups, DupValues} -> - AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) -> - [[Key, DupValue]|AddAcc0] - end, AddAcc, DupValues), - {AddAcc1, RemAcc}; - _ -> - {[[Key, Value]|AddAcc], RemAcc} - end - end, {[], []}, KVs), - - % Seq, Id, and Rev should be the same for all KVs, since we're aggregating - % by seq. - [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs, - - {[ - {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add}, - {<<"remove">>, Remove}, {<<"changes">>, Results} - ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. - - changes_row(Results, #full_doc_info{} = FDI, Acc) -> changes_row(Results, couch_doc:to_doc_info(FDI), Acc); changes_row(Results, DocInfo, Acc) -> diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index 7c0f9679b..dad363695 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -257,7 +257,7 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> ok = scan_local_db(Server, DbSuffix), {ok, Db} = mem3_util:ensure_exists( config:get("mem3", "shards_db", "_dbs")), - ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), + ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db), ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). @@ -383,7 +383,7 @@ setup() -> meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"), meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}), ChangesFun = meck:val(fun(_) -> ok end), - meck:expect(couch_changes, handle_changes, 4, ChangesFun), + meck:expect(couch_changes, handle_db_changes, 3, ChangesFun), meck:expect(couch_db, open_int, fun(?DBNAME, [?CTX, sys_db]) -> {ok, db}; (_, _) -> {not_found, no_db_file} diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl index 10cd56cee..848b471f9 100644 --- a/src/couch/test/eunit/couch_changes_tests.erl +++ b/src/couch/test/eunit/couch_changes_tests.erl @@ -154,7 +154,6 @@ filter_by_view() -> fun setup/0, fun teardown/1, [ fun should_filter_by_view/1, - fun should_filter_by_fast_view/1, fun should_filter_by_erlang_view/1 ] } @@ -698,43 +697,6 @@ should_filter_by_view({DbName, _}) -> ?assertEqual(UpSeq, LastSeq) end). -should_filter_by_fast_view({DbName, _}) -> - ?_test( - begin - DDocId = <<"_design/app">>, - DDoc = couch_doc:from_json_obj({[ - {<<"_id">>, DDocId}, - {<<"language">>, <<"javascript">>}, - {<<"options">>, {[{<<"seq_indexed">>, true}]}}, - {<<"views">>, {[ - {<<"valid">>, {[ - {<<"map">>, <<"function(doc) {" - " if (doc._id == 'doc3') {" - " emit(doc); " - "} }">>} - ]}} - ]}} - ]}), - ChArgs = #changes_args{filter = "_view"}, - Req = {json_req, {[{ - <<"query">>, {[ - {<<"view">>, <<"app/valid">>} - ]} - }]}}, - ok = update_ddoc(DbName, DDoc), - {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req), - {ok, Db} = couch_db:open_int(DbName, []), - {ok, ViewInfo} = couch_mrview:get_view_info(Db, DDoc, <<"valid">>), - {update_seq, ViewUpSeq} = lists:keyfind(update_seq, 1, ViewInfo), - couch_db:close(Db), - ?assertEqual(1, length(Rows)), - [#row{seq = Seq, id = Id}] = Rows, - ?assertEqual(<<"doc3">>, Id), - ?assertEqual(6, Seq), - ?assertEqual(LastSeq, Seq), - ?assertEqual(UpSeq, ViewUpSeq) - end). - should_filter_by_erlang_view({DbName, _}) -> ?_test( begin |