diff options
Diffstat (limited to 'src/couch/src/couch_changes.erl')
-rw-r--r-- | src/couch/src/couch_changes.erl | 594 |
1 files changed, 324 insertions, 270 deletions
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index 2078fed3a..089cda975 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -70,62 +70,80 @@ handle_db_changes(Args0, Req, Db0) -> end, Start = fun() -> {ok, Db} = couch_db:reopen(Db0), - StartSeq = case Dir of - rev -> - couch_db:get_update_seq(Db); - fwd -> - Since - end, + StartSeq = + case Dir of + rev -> + couch_db:get_update_seq(Db); + fwd -> + Since + end, {Db, StartSeq} end, % begin timer to deal with heartbeat when filter function fails case Args#changes_args.heartbeat of - undefined -> - erlang:erase(last_changes_heartbeat); - Val when is_integer(Val); Val =:= true -> - put(last_changes_heartbeat, os:timestamp()) + undefined -> + erlang:erase(last_changes_heartbeat); + Val when is_integer(Val); Val =:= true -> + put(last_changes_heartbeat, os:timestamp()) end, case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of - true -> - fun(CallbackAcc) -> - {Callback, UserAcc} = get_callback_acc(CallbackAcc), - {ok, Listener} = StartListenerFun(), - - {Db, StartSeq} = Start(), - UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), - {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), - Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq, - <<"">>, Timeout, TimeoutFun), - try - keep_sending_changes( - Args#changes_args{dir=fwd}, - Acc0, - true) - after - couch_event:stop_listener(Listener), - get_rest_updated(ok) % clean out any remaining update messages + true -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + {ok, Listener} = StartListenerFun(), + + {Db, StartSeq} = Start(), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + Acc0 = build_acc( + Args, + Callback, + UserAcc2, + Db, + StartSeq, + <<"">>, + Timeout, + TimeoutFun + ), + try + keep_sending_changes( + Args#changes_args{dir = fwd}, + Acc0, + true + ) + after + couch_event:stop_listener(Listener), + % clean out any remaining update messages + get_rest_updated(ok) + end + end; + false -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), + {Db, StartSeq} = Start(), + Acc0 = build_acc( + Args#changes_args{feed = "normal"}, + Callback, + UserAcc2, + Db, + StartSeq, + <<>>, + Timeout, + TimeoutFun + ), + {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = + send_changes( + Acc0, + Dir, + true + ), + end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end - end; - false -> - fun(CallbackAcc) -> - {Callback, UserAcc} = get_callback_acc(CallbackAcc), - UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), - {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), - {Db, StartSeq} = Start(), - Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback, - UserAcc2, Db, StartSeq, <<>>, - Timeout, TimeoutFun), - {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} = - send_changes( - Acc0, - Dir, - true), - end_sending_changes(Callback, UserAcc3, LastSeq, Feed) - end end. - handle_db_event(_DbName, updated, Parent) -> Parent ! updated, {ok, Parent}; @@ -135,7 +153,6 @@ handle_db_event(_DbName, deleted, Parent) -> handle_db_event(_DbName, _Event, Parent) -> {ok, Parent}. - handle_view_event(_DbName, Msg, {Parent, DDocId}) -> case Msg of {index_commit, DDocId} -> @@ -152,17 +169,17 @@ get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> get_callback_acc(Callback) when is_function(Callback, 2) -> {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. - configure_filter("_doc_ids", Style, Req, _Db) -> {doc_ids, Style, get_doc_ids(Req)}; configure_filter("_selector", Style, Req, _Db) -> - {selector, Style, get_selector_and_fields(Req)}; + {selector, Style, get_selector_and_fields(Req)}; configure_filter("_design", Style, _Req, _Db) -> {design_docs, Style}; configure_filter("_view", Style, Req, Db) -> ViewName = get_view_qs(Req), - if ViewName /= "" -> ok; true -> - throw({bad_request, "`view` filter parameter is not provided."}) + if + ViewName /= "" -> ok; + true -> throw({bad_request, "`view` filter parameter is not provided."}) end, ViewNameParts = string:tokens(ViewName, "/"), case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of @@ -196,10 +213,9 @@ configure_filter(FilterName, Style, Req, Db) -> true -> DIR = fabric_util:doc_id_and_rev(DDoc), {fetch, custom, Style, Req, DIR, FName}; - false-> + false -> {custom, Style, Req, DDoc, FName} end; - [] -> {default, Style}; _Else -> @@ -207,8 +223,7 @@ configure_filter(FilterName, Style, Req, Db) -> throw({bad_request, Msg}) end. - -filter(Db, #full_doc_info{}=FDI, Filter) -> +filter(Db, #full_doc_info{} = FDI, Filter) -> filter(Db, couch_doc:to_doc_info(FDI), Filter); filter(_Db, DocInfo, {default, Style}) -> apply_style(DocInfo, Style); @@ -221,8 +236,10 @@ filter(_Db, DocInfo, {doc_ids, Style, DocIds}) -> end; filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) -> Docs = open_revs(Db, DocInfo, Style), - Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) - || Doc <- Docs], + Passes = [ + mango_selector:match(Selector, couch_doc:to_json_obj(Doc, [])) + || Doc <- Docs + ], filter_revs(Passes, Docs); filter(_Db, DocInfo, {design_docs, Style}) -> case DocInfo#doc_info.id of @@ -236,15 +253,15 @@ filter(Db, DocInfo, {view, Style, DDoc, VName}) -> {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs), filter_revs(Passes, Docs); filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) -> - Req = case Req0 of - {json_req, _} -> Req0; - #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)} - end, + Req = + case Req0 of + {json_req, _} -> Req0; + #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)} + end, Docs = open_revs(Db, DocInfo, Style), {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), filter_revs(Passes, Docs). - get_view_qs({json_req, {Props}}) -> {Query} = couch_util:get_value(<<"query">>, Props, {[]}), binary_to_list(couch_util:get_value(<<"view">>, Query, "")); @@ -253,42 +270,43 @@ get_view_qs(Req) -> get_doc_ids({json_req, {Props}}) -> check_docids(couch_util:get_value(<<"doc_ids">>, Props)); -get_doc_ids(#httpd{method='POST'}=Req) -> +get_doc_ids(#httpd{method = 'POST'} = Req) -> couch_httpd:validate_ctype(Req, "application/json"), {Props} = couch_httpd:json_body_obj(Req), check_docids(couch_util:get_value(<<"doc_ids">>, Props)); -get_doc_ids(#httpd{method='GET'}=Req) -> +get_doc_ids(#httpd{method = 'GET'} = Req) -> DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), check_docids(DocIds); get_doc_ids(_) -> throw({bad_request, no_doc_ids_provided}). - get_selector_and_fields({json_req, {Props}}) -> Selector = check_selector(couch_util:get_value(<<"selector">>, Props)), Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)), {Selector, Fields}; -get_selector_and_fields(#httpd{method='POST'}=Req) -> +get_selector_and_fields(#httpd{method = 'POST'} = Req) -> couch_httpd:validate_ctype(Req, "application/json"), - get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)}); + get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)}); get_selector_and_fields(_) -> throw({bad_request, "Selector must be specified in POST payload"}). - check_docids(DocIds) when is_list(DocIds) -> - lists:foreach(fun - (DocId) when not is_binary(DocId) -> - Msg = "`doc_ids` filter parameter is not a list of doc ids.", - throw({bad_request, Msg}); - (_) -> ok - end, DocIds), + lists:foreach( + fun + (DocId) when not is_binary(DocId) -> + Msg = "`doc_ids` filter parameter is not a list of doc ids.", + throw({bad_request, Msg}); + (_) -> + ok + end, + DocIds + ), DocIds; check_docids(_) -> Msg = "`doc_ids` filter parameter is not a list of doc ids.", throw({bad_request, Msg}). - -check_selector(Selector={_}) -> +check_selector(Selector = {_}) -> try mango_selector:normalize(Selector) catch @@ -299,7 +317,6 @@ check_selector(Selector={_}) -> check_selector(_Selector) -> throw({bad_request, "Selector error: expected a JSON object"}). - check_fields(nil) -> nil; check_fields(Fields) when is_list(Fields) -> @@ -314,7 +331,6 @@ check_fields(Fields) when is_list(Fields) -> check_fields(_Fields) -> throw({bad_request, "Selector error: fields must be JSON array"}). - open_ddoc(Db, DDocId) -> DbName = couch_db:name(Db), case couch_db:is_clustered(Db) of @@ -330,39 +346,38 @@ open_ddoc(Db, DDocId) -> end end. - -check_member_exists(#doc{body={Props}}, Path) -> +check_member_exists(#doc{body = {Props}}, Path) -> couch_util:get_nested_json_value({Props}, Path). - -apply_style(#doc_info{revs=Revs}, main_only) -> - [#rev_info{rev=Rev} | _] = Revs, +apply_style(#doc_info{revs = Revs}, main_only) -> + [#rev_info{rev = Rev} | _] = Revs, [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; -apply_style(#doc_info{revs=Revs}, all_docs) -> - [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs]. - +apply_style(#doc_info{revs = Revs}, all_docs) -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev = R} <- Revs]. open_revs(Db, DocInfo, Style) -> - DocInfos = case Style of - main_only -> [DocInfo]; - all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs] - end, + DocInfos = + case Style of + main_only -> [DocInfo]; + all_docs -> [DocInfo#doc_info{revs = [R]} || R <- DocInfo#doc_info.revs] + end, OpenOpts = [deleted, conflicts], % Relying on list comprehensions to silence errors OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos], [Doc || {ok, Doc} <- OpenResults]. - filter_revs(Passes, Docs) -> - lists:flatmap(fun - ({true, #doc{revs={RevPos, [RevId | _]}}}) -> - RevStr = couch_doc:rev_to_str({RevPos, RevId}), - Change = {[{<<"rev">>, RevStr}]}, - [Change]; - (_) -> - [] - end, lists:zip(Passes, Docs)). - + lists:flatmap( + fun + ({true, #doc{revs = {RevPos, [RevId | _]}}}) -> + RevStr = couch_doc:rev_to_str({RevPos, RevId}), + Change = {[{<<"rev">>, RevStr}]}, + [Change]; + (_) -> + [] + end, + lists:zip(Passes, Docs) + ). get_changes_timeout(Args, Callback) -> #changes_args{ @@ -371,29 +386,30 @@ get_changes_timeout(Args, Callback) -> feed = ResponseType } = Args, DefaultTimeout = chttpd_util:get_chttpd_config_integer( - "changes_timeout", 60000), + "changes_timeout", 60000 + ), case Heartbeat of - undefined -> - case Timeout of undefined -> - {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; - infinity -> - {infinity, fun(UserAcc) -> {stop, UserAcc} end}; + case Timeout of + undefined -> + {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; + infinity -> + {infinity, fun(UserAcc) -> {stop, UserAcc} end}; + _ -> + {lists:min([DefaultTimeout, Timeout]), fun(UserAcc) -> {stop, UserAcc} end} + end; + true -> + {DefaultTimeout, fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; _ -> - {lists:min([DefaultTimeout, Timeout]), - fun(UserAcc) -> {stop, UserAcc} end} - end; - true -> - {DefaultTimeout, - fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; - _ -> - {lists:min([DefaultTimeout, Heartbeat]), - fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} + {lists:min([DefaultTimeout, Heartbeat]), fun(UserAcc) -> + {ok, Callback(timeout, ResponseType, UserAcc)} + end} end. -start_sending_changes(_Callback, UserAcc, ResponseType) - when ResponseType =:= "continuous" - orelse ResponseType =:= "eventsource" -> +start_sending_changes(_Callback, UserAcc, ResponseType) when + ResponseType =:= "continuous" orelse + ResponseType =:= "eventsource" +-> UserAcc; start_sending_changes(Callback, UserAcc, ResponseType) -> Callback(start, ResponseType, UserAcc). @@ -421,8 +437,8 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) - conflicts = Conflicts, timeout = Timeout, timeout_fun = TimeoutFun, - aggregation_results=[], - aggregation_kvs=[] + aggregation_results = [], + aggregation_kvs = [] }. send_changes(Acc, Dir, FirstRound) -> @@ -440,30 +456,35 @@ send_changes(Acc, Dir, FirstRound) -> couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts) end. - can_optimize(true, {doc_ids, _Style, DocIds}) -> - MaxDocIds = config:get_integer("couchdb", - "changes_doc_ids_optimization_threshold", 100), - if length(DocIds) =< MaxDocIds -> - {true, fun send_changes_doc_ids/6}; - true -> - false + MaxDocIds = config:get_integer( + "couchdb", + "changes_doc_ids_optimization_threshold", + 100 + ), + if + length(DocIds) =< MaxDocIds -> + {true, fun send_changes_doc_ids/6}; + true -> + false end; can_optimize(true, {design_docs, _Style}) -> {true, fun send_changes_design_docs/6}; can_optimize(_, _) -> false. - send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> Results = couch_db:get_full_doc_infos(Db, DocIds), - FullInfos = lists:foldl(fun - (#full_doc_info{}=FDI, Acc) -> [FDI | Acc]; - (not_found, Acc) -> Acc - end, [], Results), + FullInfos = lists:foldl( + fun + (#full_doc_info{} = FDI, Acc) -> [FDI | Acc]; + (not_found, Acc) -> Acc + end, + [], + Results + ), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). - send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, Opts = [ @@ -474,49 +495,62 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). - send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> - FoldFun = case Dir of - fwd -> fun lists:foldl/3; - rev -> fun lists:foldr/3 - end, - GreaterFun = case Dir of - fwd -> fun(A, B) -> A > B end; - rev -> fun(A, B) -> A =< B end - end, - DocInfos = lists:foldl(fun(FDI, Acc) -> - DI = couch_doc:to_doc_info(FDI), - case GreaterFun(DI#doc_info.high_seq, StartSeq) of - true -> [DI | Acc]; - false -> Acc - end - end, [], FullDocInfos), - SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos), - FinalAcc = try - FoldFun(fun(DocInfo, Acc) -> - case Fun(DocInfo, Acc) of - {ok, NewAcc} -> - NewAcc; - {stop, NewAcc} -> - throw({stop, NewAcc}) + FoldFun = + case Dir of + fwd -> fun lists:foldl/3; + rev -> fun lists:foldr/3 + end, + GreaterFun = + case Dir of + fwd -> fun(A, B) -> A > B end; + rev -> fun(A, B) -> A =< B end + end, + DocInfos = lists:foldl( + fun(FDI, Acc) -> + DI = couch_doc:to_doc_info(FDI), + case GreaterFun(DI#doc_info.high_seq, StartSeq) of + true -> [DI | Acc]; + false -> Acc end - end, Acc0, SortedDocInfos) - catch - {stop, Acc} -> Acc - end, + end, + [], + FullDocInfos + ), + SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos), + FinalAcc = + try + FoldFun( + fun(DocInfo, Acc) -> + case Fun(DocInfo, Acc) of + {ok, NewAcc} -> + NewAcc; + {stop, NewAcc} -> + throw({stop, NewAcc}) + end + end, + Acc0, + SortedDocInfos + ) + catch + {stop, Acc} -> Acc + end, case Dir of fwd -> - FinalAcc0 = case element(1, FinalAcc) of - changes_acc -> % we came here via couch_http or internal call - FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}; - fabric_changes_acc -> % we came here via chttpd / fabric / rexi - FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)} - end, + FinalAcc0 = + case element(1, FinalAcc) of + % we came here via couch_http or internal call + changes_acc -> + FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}; + % we came here via chttpd / fabric / rexi + fabric_changes_acc -> + FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)} + end, {ok, FinalAcc0}; - rev -> {ok, FinalAcc} + rev -> + {ok, FinalAcc} end. - keep_sending_changes(Args, Acc0, FirstRound) -> #changes_args{ feed = ResponseType, @@ -527,36 +561,44 @@ keep_sending_changes(Args, Acc0, FirstRound) -> {ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound), #changes_acc{ - db = Db, callback = Callback, - timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq, - prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit + db = Db, + callback = Callback, + timeout = Timeout, + timeout_fun = TimeoutFun, + seq = EndSeq, + prepend = Prepend2, + user_acc = UserAcc2, + limit = NewLimit } = maybe_upgrade_changes_acc(ChangesAcc), couch_db:close(Db), - if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); - true -> - case wait_updated(Timeout, TimeoutFun, UserAcc2) of - {updated, UserAcc4} -> - DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions], - case couch_db:open(couch_db:name(Db), DbOptions1) of - {ok, Db2} -> - ?MODULE:keep_sending_changes( - Args#changes_args{limit=NewLimit}, - ChangesAcc#changes_acc{ - db = Db2, - user_acc = UserAcc4, - seq = EndSeq, - prepend = Prepend2, - timeout = Timeout, - timeout_fun = TimeoutFun}, - false); - _Else -> - end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) - end; - {stop, UserAcc4} -> - end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) - end + if + Limit > NewLimit, ResponseType == "longpoll" -> + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); + true -> + case wait_updated(Timeout, TimeoutFun, UserAcc2) of + {updated, UserAcc4} -> + DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions], + case couch_db:open(couch_db:name(Db), DbOptions1) of + {ok, Db2} -> + ?MODULE:keep_sending_changes( + Args#changes_args{limit = NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun + }, + false + ); + _Else -> + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) + end; + {stop, UserAcc4} -> + end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType) + end end. end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> @@ -564,46 +606,59 @@ end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> changes_enumerator(Value, Acc) -> #changes_acc{ - filter = Filter, callback = Callback, prepend = Prepend, - user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db, - timeout = Timeout, timeout_fun = TimeoutFun + filter = Filter, + callback = Callback, + prepend = Prepend, + user_acc = UserAcc, + limit = Limit, + resp_type = ResponseType, + db = Db, + timeout = Timeout, + timeout_fun = TimeoutFun } = maybe_upgrade_changes_acc(Acc), Results0 = filter(Db, Value, Filter), Results = [Result || Result <- Results0, Result /= null], - Seq = case Value of - #full_doc_info{} -> - Value#full_doc_info.update_seq; - #doc_info{} -> - Value#doc_info.high_seq - end, - Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, + Seq = + case Value of + #full_doc_info{} -> + Value#full_doc_info.update_seq; + #doc_info{} -> + Value#doc_info.high_seq + end, + Go = + if + (Limit =< 1) andalso Results =/= [] -> stop; + true -> ok + end, case Results of - [] -> - {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), - case Done of - stop -> - {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; - ok -> - {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} - end; - _ -> - if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}; - true -> - ChangesRow = changes_row(Results, Value, Acc), - UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), - reset_heartbeat(), - {Go, Acc#changes_acc{ - seq = Seq, prepend = <<",\n">>, - user_acc = UserAcc2, limit = Limit - 1}} - end + [] -> + {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc), + case Done of + stop -> + {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}; + ok -> + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}} + end; + _ -> + if + ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" -> + ChangesRow = changes_row(Results, Value, Acc), + UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}; + true -> + ChangesRow = changes_row(Results, Value, Acc), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + reset_heartbeat(), + {Go, Acc#changes_acc{ + seq = Seq, + prepend = <<",\n">>, + user_acc = UserAcc2, + limit = Limit - 1 + }} + end end. - - changes_row(Results, #full_doc_info{} = FDI, Acc) -> changes_row(Results, couch_doc:to_doc_info(FDI), Acc); changes_row(Results, DocInfo, Acc0) -> @@ -611,26 +666,27 @@ changes_row(Results, DocInfo, Acc0) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] } = DocInfo, - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}. + { + [{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ + deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc) + }. -maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) -> +maybe_get_changes_doc(Value, #changes_acc{include_docs = true} = Acc) -> #changes_acc{ db = Db, doc_options = DocOpts, conflicts = Conflicts, filter = Filter } = Acc, - Opts = case Conflicts of - true -> [deleted, conflicts]; - false -> [deleted] - end, + Opts = + case Conflicts of + true -> [deleted, conflicts]; + false -> [deleted] + end, load_doc(Db, Value, Opts, DocOpts, Filter); - maybe_get_changes_doc(_Value, _Acc) -> []. - load_doc(Db, Value, Opts, DocOpts, Filter) -> case couch_index_util:load_doc(Db, Value, Opts) of null -> @@ -639,68 +695,66 @@ load_doc(Db, Value, Opts, DocOpts, Filter) -> [{doc, doc_to_json(Doc, DocOpts, Filter)}] end. - -doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}}) - when Fields =/= nil -> +doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}}) when + Fields =/= nil +-> mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields); doc_to_json(Doc, DocOpts, _Filter) -> couch_doc:to_json_obj(Doc, DocOpts). - deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. % waits for a updated msg, if there are multiple msgs, collects them. wait_updated(Timeout, TimeoutFun, UserAcc) -> receive - updated -> - get_rest_updated(UserAcc); - deleted -> - {stop, UserAcc} + updated -> + get_rest_updated(UserAcc); + deleted -> + {stop, UserAcc} after Timeout -> {Go, UserAcc2} = TimeoutFun(UserAcc), case Go of - ok -> - ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2); - stop -> - {stop, UserAcc2} + ok -> + ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2); + stop -> + {stop, UserAcc2} end end. get_rest_updated(UserAcc) -> receive - updated -> - get_rest_updated(UserAcc) + updated -> + get_rest_updated(UserAcc) after 0 -> {updated, UserAcc} end. reset_heartbeat() -> case get(last_changes_heartbeat) of - undefined -> - ok; - _ -> - put(last_changes_heartbeat, os:timestamp()) + undefined -> + ok; + _ -> + put(last_changes_heartbeat, os:timestamp()) end. maybe_heartbeat(Timeout, TimeoutFun, Acc) -> Before = get(last_changes_heartbeat), case Before of - undefined -> - {ok, Acc}; - _ -> - Now = os:timestamp(), - case timer:now_diff(Now, Before) div 1000 >= Timeout of - true -> - Acc2 = TimeoutFun(Acc), - put(last_changes_heartbeat, Now), - Acc2; - false -> - {ok, Acc} - end + undefined -> + {ok, Acc}; + _ -> + Now = os:timestamp(), + case timer:now_diff(Now, Before) div 1000 >= Timeout of + true -> + Acc2 = TimeoutFun(Acc), + put(last_changes_heartbeat, Now), + Acc2; + false -> + {ok, Acc} + end end. - maybe_upgrade_changes_acc(#changes_acc{} = Acc) -> Acc; maybe_upgrade_changes_acc(Acc) when tuple_size(Acc) == 19 -> |