summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-05-20 13:28:21 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-05-20 13:28:26 -0500
commita11ac081bc97cd1e44138a32feb38088d31f18e4 (patch)
tree1b2fb46af337b507f7f82e4ccc6d1b295f1fc846
parentbb3ed4cca8b8280745f6315dfc43aa4fe34b04ff (diff)
downloadcouchdb-a11ac081bc97cd1e44138a32feb38088d31f18e4.tar.gz
Copy couch_changes to chttpd_changes
-rw-r--r--src/chttpd/src/chttpd_changes.erl919
1 files changed, 919 insertions, 0 deletions
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
new file mode 100644
index 000000000..2fe824cec
--- /dev/null
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -0,0 +1,919 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(chttpd_changes).
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-export([
+ handle_db_changes/3,
+ handle_changes/4,
+ get_changes_timeout/2,
+ wait_updated/3,
+ get_rest_updated/1,
+ configure_filter/4,
+ filter/3,
+ handle_db_event/3,
+ handle_view_event/3,
+ view_filter/3,
+ send_changes_doc_ids/6,
+ send_changes_design_docs/6
+]).
+
+-export([changes_enumerator/2]).
+
+%% export so we can use fully qualified call to facilitate hot-code upgrade
+-export([
+ keep_sending_changes/3
+]).
+
+-record(changes_acc, {
+ db,
+ view_name,
+ ddoc_name,
+ view,
+ seq,
+ prepend,
+ filter,
+ callback,
+ user_acc,
+ resp_type,
+ limit,
+ include_docs,
+ doc_options,
+ conflicts,
+ timeout,
+ timeout_fun,
+ aggregation_kvs,
+ aggregation_results
+}).
+
+handle_db_changes(Args, Req, Db) ->
+ handle_changes(Args, Req, Db, db).
+
+handle_changes(Args1, Req, Db0, Type) ->
+ #changes_args{
+ style = Style,
+ filter = FilterName,
+ feed = Feed,
+ dir = Dir,
+ since = Since
+ } = Args1,
+ Filter = configure_filter(FilterName, Style, Req, Db0),
+ Args = Args1#changes_args{filter_fun = Filter},
+ % The type of changes feed depends on the supplied filter. If the query is
+ % for an optimized view-filtered db changes, we need to use the view
+ % sequence tree.
+ {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
+ {{view, DDocName0, ViewName0}, _} ->
+ {true, DDocName0, ViewName0};
+ {_, {fast_view, _, DDoc, ViewName0}} ->
+ {true, DDoc#doc.id, ViewName0};
+ _ ->
+ {false, undefined, undefined}
+ end,
+ DbName = couch_db:name(Db0),
+ {StartListenerFun, View} = if UseViewChanges ->
+ {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
+ DbName, DDocName, ViewName, #mrargs{}),
+ case View0#mrview.seq_btree of
+ #btree{} ->
+ ok;
+ _ ->
+ throw({bad_request, "view changes not enabled"})
+ end,
+ SNFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
+ )
+ end,
+ {SNFun, View0};
+ true ->
+ SNFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+ )
+ end,
+ {SNFun, undefined}
+ end,
+ Start = fun() ->
+ {ok, Db} = couch_db:reopen(Db0),
+ StartSeq = case Dir of
+ rev ->
+ couch_db:get_update_seq(Db);
+ fwd ->
+ Since
+ end,
+ View2 = if UseViewChanges ->
+ {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
+ DbName, DDocName, ViewName, #mrargs{}),
+ View1;
+ true ->
+ undefined
+ end,
+ {Db, View2, StartSeq}
+ end,
+ % begin timer to deal with heartbeat when filter function fails
+ case Args#changes_args.heartbeat of
+ undefined ->
+ erlang:erase(last_changes_heartbeat);
+ Val when is_integer(Val); Val =:= true ->
+ put(last_changes_heartbeat, os:timestamp())
+ end,
+
+ case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
+ true ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ {ok, Listener} = StartListenerFun(),
+
+ {Db, View, StartSeq} = Start(),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
+ <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
+ View),
+ try
+ keep_sending_changes(
+ Args#changes_args{dir=fwd},
+ Acc0,
+ true)
+ after
+ couch_event:stop_listener(Listener),
+ get_rest_updated(ok) % clean out any remaining update messages
+ end
+ end;
+ false ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ {Db, View, StartSeq} = Start(),
+ Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
+ UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
+ DDocName, ViewName, View),
+ {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+ send_changes(
+ Acc0,
+ Dir,
+ true),
+ end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+ end
+ end.
+
+
+handle_db_event(_DbName, updated, Parent) ->
+ Parent ! updated,
+ {ok, Parent};
+handle_db_event(_DbName, deleted, Parent) ->
+ Parent ! deleted,
+ {ok, Parent};
+handle_db_event(_DbName, _Event, Parent) ->
+ {ok, Parent}.
+
+
+handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
+ case Msg of
+ {index_commit, DDocId} ->
+ Parent ! updated;
+ {index_delete, DDocId} ->
+ Parent ! deleted;
+ _ ->
+ ok
+ end,
+ {ok, {Parent, DDocId}}.
+
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+ Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+ {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
+
+configure_filter("_doc_ids", Style, Req, _Db) ->
+ {doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_selector", Style, Req, _Db) ->
+ {selector, Style, get_selector_and_fields(Req)};
+configure_filter("_design", Style, _Req, _Db) ->
+ {design_docs, Style};
+configure_filter("_view", Style, Req, Db) ->
+ ViewName = get_view_qs(Req),
+ if ViewName /= "" -> ok; true ->
+ throw({bad_request, "`view` filter parameter is not provided."})
+ end,
+ ViewNameParts = string:tokens(ViewName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
+ [DName, VName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"views">>, VName]),
+ FilterType = try
+ true = couch_util:get_nested_json_value(
+ DDoc#doc.body,
+ [<<"options">>, <<"seq_indexed">>]
+ ),
+ fast_view
+ catch _:_ ->
+ view
+ end,
+ case couch_db:is_clustered(Db) of
+ true ->
+ DIR = fabric_util:doc_id_and_rev(DDoc),
+ {fetch, FilterType, Style, DIR, VName};
+ false ->
+ {FilterType, Style, DDoc, VName}
+ end;
+ [] ->
+ Msg = "`view` must be of the form `designname/viewname`",
+ throw({bad_request, Msg})
+ end;
+configure_filter([$_ | _], _Style, _Req, _Db) ->
+ throw({bad_request, "unknown builtin filter name"});
+configure_filter("", main_only, _Req, _Db) ->
+ {default, main_only};
+configure_filter("", all_docs, _Req, _Db) ->
+ {default, all_docs};
+configure_filter(FilterName, Style, Req, Db) ->
+ FilterNameParts = string:tokens(FilterName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
+ [DName, FName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"filters">>, FName]),
+ DIR = fabric_util:doc_id_and_rev(DDoc),
+ {fetch, custom, Style, Req, DIR, FName};
+ [] ->
+ {default, Style};
+ _Else ->
+ Msg = "`filter` must be of the form `designname/filtername`",
+ throw({bad_request, Msg})
+ end.
+
+
+filter(Db, #full_doc_info{}=FDI, Filter) ->
+ filter(Db, couch_doc:to_doc_info(FDI), Filter);
+filter(_Db, DocInfo, {default, Style}) ->
+ apply_style(DocInfo, Style);
+filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
+ case lists:member(DocInfo#doc_info.id, DocIds) of
+ true ->
+ apply_style(DocInfo, Style);
+ false ->
+ []
+ end;
+filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
+ Docs = open_revs(Db, DocInfo, Style),
+ Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
+ || Doc <- Docs],
+ filter_revs(Passes, Docs);
+filter(_Db, DocInfo, {design_docs, Style}) ->
+ case DocInfo#doc_info.id of
+ <<"_design", _/binary>> ->
+ apply_style(DocInfo, Style);
+ _ ->
+ []
+ end;
+filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
+ when FilterType == view; FilterType == fast_view ->
+ Docs = open_revs(Db, DocInfo, Style),
+ {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+ filter_revs(Passes, Docs);
+filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+ Req = case Req0 of
+ {json_req, _} -> Req0;
+ #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+ end,
+ Docs = open_revs(Db, DocInfo, Style),
+ {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
+ filter_revs(Passes, Docs).
+
+fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
+ case couch_db:get_doc_info(Db, ID) of
+ {ok, #doc_info{high_seq=Seq}=DocInfo} ->
+ Docs = open_revs(Db, DocInfo, Style),
+ Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ {[{<<"rev">>, RevStr}]}
+ end, Docs),
+ {DocInfo, Changes};
+ {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
+ % If the view seq tree is out of date (or if the view seq tree
+ % was opened before the db) seqs may come by from the seq tree
+ % which correspond to the not-most-current revision of a document.
+ % The proper thing to do is to not send this old revision, but wait
+ % until we reopen the up-to-date view seq tree and continue the
+ % fold.
+ % I left the Seq > HighSeq guard in so if (for some godforsaken
+ % reason) the seq in the view is more current than the database,
+ % we'll throw an error.
+ {undefined, []};
+ {error, not_found} ->
+ {undefined, []}
+ end.
+
+
+
+view_filter(Db, KV, {default, Style}) ->
+ apply_view_style(Db, KV, Style).
+
+
+get_view_qs({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
+ binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
+get_view_qs(Req) ->
+ couch_httpd:qs_value(Req, "view", "").
+
+get_doc_ids({json_req, {Props}}) ->
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='POST'}=Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ {Props} = couch_httpd:json_body_obj(Req),
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='GET'}=Req) ->
+ DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
+ check_docids(DocIds);
+get_doc_ids(_) ->
+ throw({bad_request, no_doc_ids_provided}).
+
+
+get_selector_and_fields({json_req, {Props}}) ->
+ Selector = check_selector(couch_util:get_value(<<"selector">>, Props)),
+ Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)),
+ {Selector, Fields};
+get_selector_and_fields(#httpd{method='POST'}=Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)});
+get_selector_and_fields(_) ->
+ throw({bad_request, "Selector must be specified in POST payload"}).
+
+
+check_docids(DocIds) when is_list(DocIds) ->
+ lists:foreach(fun
+ (DocId) when not is_binary(DocId) ->
+ Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+ throw({bad_request, Msg});
+ (_) -> ok
+ end, DocIds),
+ DocIds;
+check_docids(_) ->
+ Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+ throw({bad_request, Msg}).
+
+
+check_selector(Selector={_}) ->
+ try
+ mango_selector:normalize(Selector)
+ catch
+ {mango_error, Mod, Reason0} ->
+ {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+ throw({bad_request, Reason})
+ end;
+check_selector(_Selector) ->
+ throw({bad_request, "Selector error: expected a JSON object"}).
+
+
+check_fields(nil) ->
+ nil;
+check_fields(Fields) when is_list(Fields) ->
+ try
+ {ok, Fields1} = mango_fields:new(Fields),
+ Fields1
+ catch
+ {mango_error, Mod, Reason0} ->
+ {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+ throw({bad_request, Reason})
+ end;
+check_fields(_Fields) ->
+ throw({bad_request, "Selector error: fields must be JSON array"}).
+
+
+open_ddoc(Db, DDocId) ->
+ case ddoc_cache:open_doc(Db, DDocId) of
+ {ok, _} = Resp -> Resp;
+ Else -> throw(Else)
+ end.
+
+
+check_member_exists(#doc{body={Props}}, Path) ->
+ couch_util:get_nested_json_value({Props}, Path).
+
+
+apply_style(#doc_info{revs=Revs}, main_only) ->
+ [#rev_info{rev=Rev} | _] = Revs,
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_style(#doc_info{revs=Revs}, all_docs) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+
+apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
+ case couch_db:get_doc_info(Db, ID) of
+ {ok, DocInfo} ->
+ apply_style(DocInfo, all_docs);
+ {error, not_found} ->
+ []
+ end.
+
+
+open_revs(Db, DocInfo, Style) ->
+ DocInfos = case Style of
+ main_only -> [DocInfo];
+ all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
+ end,
+ OpenOpts = [deleted, conflicts],
+ % Relying on list comprehensions to silence errors
+ OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
+ [Doc || {ok, Doc} <- OpenResults].
+
+
+filter_revs(Passes, Docs) ->
+ lists:flatmap(fun
+ ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ Change = {[{<<"rev">>, RevStr}]},
+ [Change];
+ (_) ->
+ []
+ end, lists:zip(Passes, Docs)).
+
+
+get_changes_timeout(Args, Callback) ->
+ #changes_args{
+ heartbeat = Heartbeat,
+ timeout = Timeout,
+ feed = ResponseType
+ } = Args,
+ DefaultTimeout = list_to_integer(
+ config:get("httpd", "changes_timeout", "60000")
+ ),
+ case Heartbeat of
+ undefined ->
+ case Timeout of
+ undefined ->
+ {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+ infinity ->
+ {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+ _ ->
+ {lists:min([DefaultTimeout, Timeout]),
+ fun(UserAcc) -> {stop, UserAcc} end}
+ end;
+ true ->
+ {DefaultTimeout,
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
+ _ ->
+ {lists:min([DefaultTimeout, Heartbeat]),
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+ end.
+
+start_sending_changes(_Callback, UserAcc, ResponseType)
+ when ResponseType =:= "continuous"
+ orelse ResponseType =:= "eventsource" ->
+ UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+ Callback(start, ResponseType, UserAcc).
+
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+ #changes_args{
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ limit = Limit,
+ feed = ResponseType,
+ filter_fun = Filter
+ } = Args,
+ #changes_acc{
+ db = Db,
+ seq = StartSeq,
+ prepend = Prepend,
+ filter = Filter,
+ callback = Callback,
+ user_acc = UserAcc,
+ resp_type = ResponseType,
+ limit = Limit,
+ include_docs = IncludeDocs,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun,
+ ddoc_name = DDocName,
+ view_name = ViewName,
+ view = View,
+ aggregation_results=[],
+ aggregation_kvs=[]
+ }.
+
+send_changes(Acc, Dir, FirstRound) ->
+ #changes_acc{
+ db = Db,
+ seq = StartSeq,
+ filter = Filter,
+ view = View
+ } = Acc,
+ DbEnumFun = fun changes_enumerator/2,
+ case can_optimize(FirstRound, Filter) of
+ {true, Fun} ->
+ Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
+ _ ->
+ case {View, Filter} of
+ {#mrview{}, {fast_view, _, _, _}} ->
+ couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ {undefined, _} ->
+ Opts = [{dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
+ {#mrview{}, _} ->
+ ViewEnumFun = fun view_changes_enumerator/2,
+ {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
+ case Acc0 of
+ #changes_acc{aggregation_results=[]} ->
+ {Go, Acc0};
+ _ ->
+ #changes_acc{
+ aggregation_results = AggResults,
+ aggregation_kvs = AggKVs,
+ user_acc = UserAcc,
+ callback = Callback,
+ resp_type = ResponseType,
+ prepend = Prepend
+ } = Acc0,
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
+ UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc0#changes_acc{user_acc=UserAcc0}}
+ end
+ end
+ end.
+
+
+can_optimize(true, {doc_ids, _Style, DocIds}) ->
+ MaxDocIds = config:get_integer("couchdb",
+ "changes_doc_ids_optimization_threshold", 100),
+ if length(DocIds) =< MaxDocIds ->
+ {true, fun send_changes_doc_ids/6};
+ true ->
+ false
+ end;
+can_optimize(true, {design_docs, _Style}) ->
+ {true, fun send_changes_design_docs/6};
+can_optimize(_, _) ->
+ false.
+
+
+send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
+ Results = couch_db:get_full_doc_infos(Db, DocIds),
+ FullInfos = lists:foldl(fun
+ (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
+ (not_found, Acc) -> Acc
+ end, [], Results),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ Opts = [
+ include_deleted,
+ {start_key, <<"_design/">>},
+ {end_key_gt, <<"_design0">>}
+ ],
+ {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
+
+
+send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
+ FoldFun = case Dir of
+ fwd -> fun lists:foldl/3;
+ rev -> fun lists:foldr/3
+ end,
+ GreaterFun = case Dir of
+ fwd -> fun(A, B) -> A > B end;
+ rev -> fun(A, B) -> A =< B end
+ end,
+ DocInfos = lists:foldl(fun(FDI, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ true -> [DI | Acc];
+ false -> Acc
+ end
+ end, [], FullDocInfos),
+ SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+ FinalAcc = try
+ FoldFun(fun(DocInfo, Acc) ->
+ case Fun(DocInfo, Acc) of
+ {ok, NewAcc} ->
+ NewAcc;
+ {stop, NewAcc} ->
+ throw({stop, NewAcc})
+ end
+ end, Acc0, SortedDocInfos)
+ catch
+ {stop, Acc} -> Acc
+ end,
+ case Dir of
+ fwd ->
+ FinalAcc0 = case element(1, FinalAcc) of
+ changes_acc -> % we came here via couch_http or internal call
+ FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
+ fabric_changes_acc -> % we came here via chttpd / fabric / rexi
+ FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
+ end,
+ {ok, FinalAcc0};
+ rev -> {ok, FinalAcc}
+ end.
+
+
+keep_sending_changes(Args, Acc0, FirstRound) ->
+ #changes_args{
+ feed = ResponseType,
+ limit = Limit,
+ db_open_options = DbOptions
+ } = Args,
+
+ {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound),
+
+ #changes_acc{
+ db = Db, callback = Callback,
+ timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
+ prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
+ ddoc_name = DDocName, view_name = ViewName
+ } = ChangesAcc,
+
+ couch_db:close(Db),
+ if Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ true ->
+ case wait_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc4} ->
+ DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
+ case couch_db:open(couch_db:name(Db), DbOptions1) of
+ {ok, Db2} ->
+ ?MODULE:keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ view = maybe_refresh_view(Db2, DDocName, ViewName),
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ end
+ end.
+
+maybe_refresh_view(_, undefined, undefined) ->
+ undefined;
+maybe_refresh_view(Db, DDocName, ViewName) ->
+ DbName = couch_db:name(Db),
+ {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
+ View.
+
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+ Callback({stop, EndSeq}, ResponseType, UserAcc).
+
+view_changes_enumerator(Value, Acc) ->
+ #changes_acc{
+ filter = Filter, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
+ aggregation_kvs=AggKVs, aggregation_results=AggResults
+ } = Acc,
+
+ Results0 = view_filter(Db, Value, Filter),
+ Results = [Result || Result <- Results0, Result /= null],
+ {{Seq, _}, _} = Value,
+
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+
+ if CurrentSeq =:= Seq ->
+ NewAggKVs = case Results of
+ [] -> AggKVs;
+ _ -> [Value|AggKVs]
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end;
+ AggResults =/= [] ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
+ aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
+ true ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
+ limit = Limit - 1, aggregation_kvs=[Value],
+ aggregation_results=Results}}
+ end;
+ true ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs,
+ aggregation_results=NewAggResults
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end
+ end.
+
+changes_enumerator(Value0, Acc) ->
+ #changes_acc{
+ filter = Filter, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun
+ } = Acc,
+ {Value, Results0} = case Filter of
+ {fast_view, _, _, _} ->
+ fast_view_filter(Db, Value0, Filter);
+ _ ->
+ {Value0, filter(Db, Value0, Filter)}
+ end,
+ Results = [Result || Result <- Results0, Result /= null],
+ Seq = case Value of
+ #full_doc_info{} ->
+ Value#full_doc_info.update_seq;
+ #doc_info{} ->
+ Value#doc_info.high_seq;
+ {{Seq0, _}, _} ->
+ Seq0
+ end,
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ case Results of
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
+ true ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>,
+ user_acc = UserAcc2, limit = Limit - 1}}
+ end
+ end.
+
+
+
+view_changes_row(Results, KVs, Acc) ->
+ {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
+ {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
+ case Value of
+ removed ->
+ {AddAcc, [Key|RemAcc]};
+ {dups, DupValues} ->
+ AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
+ [[Key, DupValue]|AddAcc0]
+ end, AddAcc, DupValues),
+ {AddAcc1, RemAcc};
+ _ ->
+ {[[Key, Value]|AddAcc], RemAcc}
+ end
+ end, {[], []}, KVs),
+
+ % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
+ % by seq.
+ [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
+
+ {[
+ {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
+ {<<"remove">>, Remove}, {<<"changes">>, Results}
+ ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+
+
+changes_row(Results, #full_doc_info{} = FDI, Acc) ->
+ changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
+changes_row(Results, DocInfo, Acc) ->
+ #doc_info{
+ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
+ } = DocInfo,
+ {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+ deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}.
+
+maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
+ #changes_acc{
+ db = Db,
+ doc_options = DocOpts,
+ conflicts = Conflicts,
+ filter = Filter
+ } = Acc,
+ Opts = case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
+ load_doc(Db, Value, Opts, DocOpts, Filter);
+
+maybe_get_changes_doc(_Value, _Acc) ->
+ [].
+
+
+load_doc(Db, Value, Opts, DocOpts, Filter) ->
+ case couch_index_util:load_doc(Db, Value, Opts) of
+ null ->
+ [{doc, null}];
+ Doc ->
+ [{doc, doc_to_json(Doc, DocOpts, Filter)}]
+ end.
+
+
+doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
+ when Fields =/= nil ->
+ mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
+doc_to_json(Doc, DocOpts, _Filter) ->
+ couch_doc:to_json_obj(Doc, DocOpts).
+
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].
+
+% waits for a updated msg, if there are multiple msgs, collects them.
+wait_updated(Timeout, TimeoutFun, UserAcc) ->
+ receive
+ updated ->
+ get_rest_updated(UserAcc);
+ deleted ->
+ {stop, UserAcc}
+ after Timeout ->
+ {Go, UserAcc2} = TimeoutFun(UserAcc),
+ case Go of
+ ok ->
+ ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
+ stop ->
+ {stop, UserAcc2}
+ end
+ end.
+
+get_rest_updated(UserAcc) ->
+ receive
+ updated ->
+ get_rest_updated(UserAcc)
+ after 0 ->
+ {updated, UserAcc}
+ end.
+
+reset_heartbeat() ->
+ case get(last_changes_heartbeat) of
+ undefined ->
+ ok;
+ _ ->
+ put(last_changes_heartbeat, os:timestamp())
+ end.
+
+maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
+ Before = get(last_changes_heartbeat),
+ case Before of
+ undefined ->
+ {ok, Acc};
+ _ ->
+ Now = os:timestamp(),
+ case timer:now_diff(Now, Before) div 1000 >= Timeout of
+ true ->
+ Acc2 = TimeoutFun(Acc),
+ put(last_changes_heartbeat, Now),
+ Acc2;
+ false ->
+ {ok, Acc}
+ end
+ end.