summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Avdey <eiri@eiri.ca>2019-11-12 13:45:26 -0400
committerEric Avdey <eiri@eiri.ca>2019-12-03 13:54:40 -0400
commit6a58d8c508621aadfcb72a43c9c37e048f2a10e5 (patch)
treed297bbb49000abbb7a57818f8275596e84c00e40
parent2c8966f122943f6aba65bb571dfb50ba6b699fb1 (diff)
downloadcouchdb-6a58d8c508621aadfcb72a43c9c37e048f2a10e5.tar.gz
Remove view_changes functionality from couch_changes side
-rw-r--r--src/couch/src/couch_changes.erl278
-rw-r--r--src/couch/src/couch_multidb_changes.erl4
-rw-r--r--src/couch/test/eunit/couch_changes_tests.erl38
3 files changed, 26 insertions, 294 deletions
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index c5b5edf9f..2d6b58174 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -16,7 +16,6 @@
-export([
handle_db_changes/3,
- handle_changes/4,
get_changes_timeout/2,
wait_updated/3,
get_rest_updated/1,
@@ -24,7 +23,6 @@
filter/3,
handle_db_event/3,
handle_view_event/3,
- view_filter/3,
send_changes_doc_ids/6,
send_changes_design_docs/6
]).
@@ -57,53 +55,21 @@
aggregation_results
}).
-handle_db_changes(Args, Req, Db) ->
- handle_changes(Args, Req, Db, db).
-
-handle_changes(Args1, Req, Db0, Type) ->
+handle_db_changes(Args0, Req, Db0) ->
#changes_args{
style = Style,
filter = FilterName,
feed = Feed,
dir = Dir,
since = Since
- } = Args1,
+ } = Args0,
Filter = configure_filter(FilterName, Style, Req, Db0),
- Args = Args1#changes_args{filter_fun = Filter},
- % The type of changes feed depends on the supplied filter. If the query is
- % for an optimized view-filtered db changes, we need to use the view
- % sequence tree.
- {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
- {{view, DDocName0, ViewName0}, _} ->
- {true, DDocName0, ViewName0};
- {_, {fast_view, _, DDoc, ViewName0}} ->
- {true, DDoc#doc.id, ViewName0};
- _ ->
- {false, undefined, undefined}
- end,
+ Args = Args0#changes_args{filter_fun = Filter},
DbName = couch_db:name(Db0),
- {StartListenerFun, View} = if UseViewChanges ->
- {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
- DbName, DDocName, ViewName, #mrargs{}),
- case View0#mrview.seq_btree of
- #btree{} ->
- ok;
- _ ->
- throw({bad_request, "view changes not enabled"})
- end,
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
- )
- end,
- {SNFun, View0};
- true ->
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_db_event, self(), [{dbname, DbName}]
- )
- end,
- {SNFun, undefined}
+ StartListenerFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+ )
end,
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
@@ -113,14 +79,7 @@ handle_changes(Args1, Req, Db0, Type) ->
fwd ->
Since
end,
- View2 = if UseViewChanges ->
- {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
- DbName, DDocName, ViewName, #mrargs{}),
- View1;
- true ->
- undefined
- end,
- {Db, View2, StartSeq}
+ {Db, StartSeq}
end,
% begin timer to deal with heartbeat when filter function fails
case Args#changes_args.heartbeat of
@@ -136,12 +95,11 @@ handle_changes(Args1, Req, Db0, Type) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
{ok, Listener} = StartListenerFun(),
- {Db, View, StartSeq} = Start(),
+ {Db, StartSeq} = Start(),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
- <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
- View),
+ <<"">>, Timeout, TimeoutFun),
try
keep_sending_changes(
Args#changes_args{dir=fwd},
@@ -157,10 +115,10 @@ handle_changes(Args1, Req, Db0, Type) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
- {Db, View, StartSeq} = Start(),
+ {Db, StartSeq} = Start(),
Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
- UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
- DDocName, ViewName, View),
+ UserAcc2, Db, StartSeq, <<>>,
+ Timeout, TimeoutFun),
{ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
send_changes(
Acc0,
@@ -214,21 +172,12 @@ configure_filter("_view", Style, Req, Db) ->
[DName, VName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"views">>, VName]),
- FilterType = try
- true = couch_util:get_nested_json_value(
- DDoc#doc.body,
- [<<"options">>, <<"seq_indexed">>]
- ),
- fast_view
- catch _:_ ->
- view
- end,
case couch_db:is_clustered(Db) of
true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
- {fetch, FilterType, Style, DIR, VName};
+ {fetch, view, Style, DIR, VName};
false ->
- {FilterType, Style, DDoc, VName}
+ {view, Style, DDoc, VName}
end;
[] ->
Msg = "`view` must be of the form `designname/viewname`",
@@ -285,8 +234,7 @@ filter(_Db, DocInfo, {design_docs, Style}) ->
_ ->
[]
end;
-filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
- when FilterType == view; FilterType == fast_view ->
+filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
Docs = open_revs(Db, DocInfo, Style),
{ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
filter_revs(Passes, Docs);
@@ -299,35 +247,6 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
filter_revs(Passes, Docs).
-fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
- case couch_db:get_doc_info(Db, ID) of
- {ok, #doc_info{high_seq=Seq}=DocInfo} ->
- Docs = open_revs(Db, DocInfo, Style),
- Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
- RevStr = couch_doc:rev_to_str({RevPos, RevId}),
- {[{<<"rev">>, RevStr}]}
- end, Docs),
- {DocInfo, Changes};
- {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
- % If the view seq tree is out of date (or if the view seq tree
- % was opened before the db) seqs may come by from the seq tree
- % which correspond to the not-most-current revision of a document.
- % The proper thing to do is to not send this old revision, but wait
- % until we reopen the up-to-date view seq tree and continue the
- % fold.
- % I left the Seq > HighSeq guard in so if (for some godforsaken
- % reason) the seq in the view is more current than the database,
- % we'll throw an error.
- {undefined, []};
- {error, not_found} ->
- {undefined, []}
- end.
-
-
-
-view_filter(Db, KV, {default, Style}) ->
- apply_view_style(Db, KV, Style).
-
get_view_qs({json_req, {Props}}) ->
{Query} = couch_util:get_value(<<"query">>, Props, {[]}),
@@ -425,16 +344,6 @@ apply_style(#doc_info{revs=Revs}, main_only) ->
apply_style(#doc_info{revs=Revs}, all_docs) ->
[{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
-apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
- case couch_db:get_doc_info(Db, ID) of
- {ok, DocInfo} ->
- apply_style(DocInfo, all_docs);
- {error, not_found} ->
- []
- end.
-
open_revs(Db, DocInfo, Style) ->
DocInfos = case Style of
@@ -493,7 +402,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType)
start_sending_changes(Callback, UserAcc, ResponseType) ->
Callback(start, ResponseType, UserAcc).
-build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
#changes_args{
include_docs = IncludeDocs,
doc_options = DocOpts,
@@ -516,9 +425,6 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D
conflicts = Conflicts,
timeout = Timeout,
timeout_fun = TimeoutFun,
- ddoc_name = DDocName,
- view_name = ViewName,
- view = View,
aggregation_results=[],
aggregation_kvs=[]
}.
@@ -527,41 +433,15 @@ send_changes(Acc, Dir, FirstRound) ->
#changes_acc{
db = Db,
seq = StartSeq,
- filter = Filter,
- view = View
+ filter = Filter
} = Acc,
DbEnumFun = fun changes_enumerator/2,
case can_optimize(FirstRound, Filter) of
{true, Fun} ->
Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
_ ->
- case {View, Filter} of
- {#mrview{}, {fast_view, _, _, _}} ->
- couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
- {undefined, _} ->
- Opts = [{dir, Dir}],
- couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
- {#mrview{}, _} ->
- ViewEnumFun = fun view_changes_enumerator/2,
- {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
- case Acc0 of
- #changes_acc{aggregation_results=[]} ->
- {Go, Acc0};
- _ ->
- #changes_acc{
- aggregation_results = AggResults,
- aggregation_kvs = AggKVs,
- user_acc = UserAcc,
- callback = Callback,
- resp_type = ResponseType,
- prepend = Prepend
- } = Acc0,
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
- UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc0#changes_acc{user_acc=UserAcc0}}
- end
- end
+ Opts = [{dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts)
end.
@@ -653,8 +533,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
#changes_acc{
db = Db, callback = Callback,
timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
- prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
- ddoc_name = DDocName, view_name = ViewName
+ prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
} = ChangesAcc,
couch_db:close(Db),
@@ -670,7 +549,6 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
Args#changes_args{limit=NewLimit},
ChangesAcc#changes_acc{
db = Db2,
- view = maybe_refresh_view(Db2, DDocName, ViewName),
user_acc = UserAcc4,
seq = EndSeq,
prepend = Prepend2,
@@ -685,104 +563,22 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
end
end.
-maybe_refresh_view(_, undefined, undefined) ->
- undefined;
-maybe_refresh_view(Db, DDocName, ViewName) ->
- DbName = couch_db:name(Db),
- {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
- View.
-
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
-view_changes_enumerator(Value, Acc) ->
- #changes_acc{
- filter = Filter, callback = Callback, prepend = Prepend,
- user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
- aggregation_kvs=AggKVs, aggregation_results=AggResults
- } = Acc,
-
- Results0 = view_filter(Db, Value, Filter),
- Results = [Result || Result <- Results0, Result /= null],
- {{Seq, _}, _} = Value,
-
- Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
-
- if CurrentSeq =:= Seq ->
- NewAggKVs = case Results of
- [] -> AggKVs;
- _ -> [Value|AggKVs]
- end,
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- Acc0 = Acc#changes_acc{
- seq = Seq,
- user_acc = UserAcc2,
- aggregation_kvs=NewAggKVs
- },
- case Done of
- stop -> {stop, Acc0};
- ok -> {Go, Acc0}
- end;
- AggResults =/= [] ->
- {NewAggKVs, NewAggResults} = case Results of
- [] -> {[], []};
- _ -> {[Value], Results}
- end,
- if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
- aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
- true ->
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
- UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
- limit = Limit - 1, aggregation_kvs=[Value],
- aggregation_results=Results}}
- end;
- true ->
- {NewAggKVs, NewAggResults} = case Results of
- [] -> {[], []};
- _ -> {[Value], Results}
- end,
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- Acc0 = Acc#changes_acc{
- seq = Seq,
- user_acc = UserAcc2,
- aggregation_kvs=NewAggKVs,
- aggregation_results=NewAggResults
- },
- case Done of
- stop -> {stop, Acc0};
- ok -> {Go, Acc0}
- end
- end.
-
-changes_enumerator(Value0, Acc) ->
+changes_enumerator(Value, Acc) ->
#changes_acc{
filter = Filter, callback = Callback, prepend = Prepend,
user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
timeout = Timeout, timeout_fun = TimeoutFun
} = Acc,
- {Value, Results0} = case Filter of
- {fast_view, _, _, _} ->
- fast_view_filter(Db, Value0, Filter);
- _ ->
- {Value0, filter(Db, Value0, Filter)}
- end,
+ Results0 = filter(Db, Value, Filter),
Results = [Result || Result <- Results0, Result /= null],
Seq = case Value of
#full_doc_info{} ->
Value#full_doc_info.update_seq;
#doc_info{} ->
- Value#doc_info.high_seq;
- {{Seq0, _}, _} ->
- Seq0
+ Value#doc_info.high_seq
end,
Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
case Results of
@@ -812,32 +608,6 @@ changes_enumerator(Value0, Acc) ->
-view_changes_row(Results, KVs, Acc) ->
- {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
- {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
- case Value of
- removed ->
- {AddAcc, [Key|RemAcc]};
- {dups, DupValues} ->
- AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
- [[Key, DupValue]|AddAcc0]
- end, AddAcc, DupValues),
- {AddAcc1, RemAcc};
- _ ->
- {[[Key, Value]|AddAcc], RemAcc}
- end
- end, {[], []}, KVs),
-
- % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
- % by seq.
- [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
-
- {[
- {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
- {<<"remove">>, Remove}, {<<"changes">>, Results}
- ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
-
-
changes_row(Results, #full_doc_info{} = FDI, Acc) ->
changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
changes_row(Results, DocInfo, Acc) ->
diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl
index 7c0f9679b..dad363695 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -257,7 +257,7 @@ scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
ok = scan_local_db(Server, DbSuffix),
{ok, Db} = mem3_util:ensure_exists(
config:get("mem3", "shards_db", "_dbs")),
- ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+ ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db),
ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
couch_db:close(Db).
@@ -383,7 +383,7 @@ setup() ->
meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
ChangesFun = meck:val(fun(_) -> ok end),
- meck:expect(couch_changes, handle_changes, 4, ChangesFun),
+ meck:expect(couch_changes, handle_db_changes, 3, ChangesFun),
meck:expect(couch_db, open_int,
fun(?DBNAME, [?CTX, sys_db]) -> {ok, db};
(_, _) -> {not_found, no_db_file}
diff --git a/src/couch/test/eunit/couch_changes_tests.erl b/src/couch/test/eunit/couch_changes_tests.erl
index 10cd56cee..848b471f9 100644
--- a/src/couch/test/eunit/couch_changes_tests.erl
+++ b/src/couch/test/eunit/couch_changes_tests.erl
@@ -154,7 +154,6 @@ filter_by_view() ->
fun setup/0, fun teardown/1,
[
fun should_filter_by_view/1,
- fun should_filter_by_fast_view/1,
fun should_filter_by_erlang_view/1
]
}
@@ -698,43 +697,6 @@ should_filter_by_view({DbName, _}) ->
?assertEqual(UpSeq, LastSeq)
end).
-should_filter_by_fast_view({DbName, _}) ->
- ?_test(
- begin
- DDocId = <<"_design/app">>,
- DDoc = couch_doc:from_json_obj({[
- {<<"_id">>, DDocId},
- {<<"language">>, <<"javascript">>},
- {<<"options">>, {[{<<"seq_indexed">>, true}]}},
- {<<"views">>, {[
- {<<"valid">>, {[
- {<<"map">>, <<"function(doc) {"
- " if (doc._id == 'doc3') {"
- " emit(doc); "
- "} }">>}
- ]}}
- ]}}
- ]}),
- ChArgs = #changes_args{filter = "_view"},
- Req = {json_req, {[{
- <<"query">>, {[
- {<<"view">>, <<"app/valid">>}
- ]}
- }]}},
- ok = update_ddoc(DbName, DDoc),
- {Rows, LastSeq, UpSeq} = run_changes_query(DbName, ChArgs, Req),
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, ViewInfo} = couch_mrview:get_view_info(Db, DDoc, <<"valid">>),
- {update_seq, ViewUpSeq} = lists:keyfind(update_seq, 1, ViewInfo),
- couch_db:close(Db),
- ?assertEqual(1, length(Rows)),
- [#row{seq = Seq, id = Id}] = Rows,
- ?assertEqual(<<"doc3">>, Id),
- ?assertEqual(6, Seq),
- ?assertEqual(LastSeq, Seq),
- ?assertEqual(UpSeq, ViewUpSeq)
- end).
-
should_filter_by_erlang_view({DbName, _}) ->
?_test(
begin