summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_changes.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_changes.erl')
-rw-r--r--src/couch/src/couch_changes.erl594
1 files changed, 324 insertions, 270 deletions
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index 2078fed3a..089cda975 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -70,62 +70,80 @@ handle_db_changes(Args0, Req, Db0) ->
end,
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
- StartSeq = case Dir of
- rev ->
- couch_db:get_update_seq(Db);
- fwd ->
- Since
- end,
+ StartSeq =
+ case Dir of
+ rev ->
+ couch_db:get_update_seq(Db);
+ fwd ->
+ Since
+ end,
{Db, StartSeq}
end,
% begin timer to deal with heartbeat when filter function fails
case Args#changes_args.heartbeat of
- undefined ->
- erlang:erase(last_changes_heartbeat);
- Val when is_integer(Val); Val =:= true ->
- put(last_changes_heartbeat, os:timestamp())
+ undefined ->
+ erlang:erase(last_changes_heartbeat);
+ Val when is_integer(Val); Val =:= true ->
+ put(last_changes_heartbeat, os:timestamp())
end,
case lists:member(Feed, ["continuous", "longpoll", "eventsource"]) of
- true ->
- fun(CallbackAcc) ->
- {Callback, UserAcc} = get_callback_acc(CallbackAcc),
- {ok, Listener} = StartListenerFun(),
-
- {Db, StartSeq} = Start(),
- UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
- {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
- Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
- <<"">>, Timeout, TimeoutFun),
- try
- keep_sending_changes(
- Args#changes_args{dir=fwd},
- Acc0,
- true)
- after
- couch_event:stop_listener(Listener),
- get_rest_updated(ok) % clean out any remaining update messages
+ true ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ {ok, Listener} = StartListenerFun(),
+
+ {Db, StartSeq} = Start(),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ Acc0 = build_acc(
+ Args,
+ Callback,
+ UserAcc2,
+ Db,
+ StartSeq,
+ <<"">>,
+ Timeout,
+ TimeoutFun
+ ),
+ try
+ keep_sending_changes(
+ Args#changes_args{dir = fwd},
+ Acc0,
+ true
+ )
+ after
+ couch_event:stop_listener(Listener),
+ % clean out any remaining update messages
+ get_rest_updated(ok)
+ end
+ end;
+ false ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+ {Db, StartSeq} = Start(),
+ Acc0 = build_acc(
+ Args#changes_args{feed = "normal"},
+ Callback,
+ UserAcc2,
+ Db,
+ StartSeq,
+ <<>>,
+ Timeout,
+ TimeoutFun
+ ),
+ {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
+ send_changes(
+ Acc0,
+ Dir,
+ true
+ ),
+ end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
end
- end;
- false ->
- fun(CallbackAcc) ->
- {Callback, UserAcc} = get_callback_acc(CallbackAcc),
- UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
- {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
- {Db, StartSeq} = Start(),
- Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
- UserAcc2, Db, StartSeq, <<>>,
- Timeout, TimeoutFun),
- {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
- send_changes(
- Acc0,
- Dir,
- true),
- end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
- end
end.
-
handle_db_event(_DbName, updated, Parent) ->
Parent ! updated,
{ok, Parent};
@@ -135,7 +153,6 @@ handle_db_event(_DbName, deleted, Parent) ->
handle_db_event(_DbName, _Event, Parent) ->
{ok, Parent}.
-
handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
case Msg of
{index_commit, DDocId} ->
@@ -152,17 +169,17 @@ get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
get_callback_acc(Callback) when is_function(Callback, 2) ->
{fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
-
configure_filter("_doc_ids", Style, Req, _Db) ->
{doc_ids, Style, get_doc_ids(Req)};
configure_filter("_selector", Style, Req, _Db) ->
- {selector, Style, get_selector_and_fields(Req)};
+ {selector, Style, get_selector_and_fields(Req)};
configure_filter("_design", Style, _Req, _Db) ->
{design_docs, Style};
configure_filter("_view", Style, Req, Db) ->
ViewName = get_view_qs(Req),
- if ViewName /= "" -> ok; true ->
- throw({bad_request, "`view` filter parameter is not provided."})
+ if
+ ViewName /= "" -> ok;
+ true -> throw({bad_request, "`view` filter parameter is not provided."})
end,
ViewNameParts = string:tokens(ViewName, "/"),
case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
@@ -196,10 +213,9 @@ configure_filter(FilterName, Style, Req, Db) ->
true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, custom, Style, Req, DIR, FName};
- false->
+ false ->
{custom, Style, Req, DDoc, FName}
end;
-
[] ->
{default, Style};
_Else ->
@@ -207,8 +223,7 @@ configure_filter(FilterName, Style, Req, Db) ->
throw({bad_request, Msg})
end.
-
-filter(Db, #full_doc_info{}=FDI, Filter) ->
+filter(Db, #full_doc_info{} = FDI, Filter) ->
filter(Db, couch_doc:to_doc_info(FDI), Filter);
filter(_Db, DocInfo, {default, Style}) ->
apply_style(DocInfo, Style);
@@ -221,8 +236,10 @@ filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
end;
filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
Docs = open_revs(Db, DocInfo, Style),
- Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
- || Doc <- Docs],
+ Passes = [
+ mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
+ || Doc <- Docs
+ ],
filter_revs(Passes, Docs);
filter(_Db, DocInfo, {design_docs, Style}) ->
case DocInfo#doc_info.id of
@@ -236,15 +253,15 @@ filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
{ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
filter_revs(Passes, Docs);
filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
- Req = case Req0 of
- {json_req, _} -> Req0;
- #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
- end,
+ Req =
+ case Req0 of
+ {json_req, _} -> Req0;
+ #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
+ end,
Docs = open_revs(Db, DocInfo, Style),
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
filter_revs(Passes, Docs).
-
get_view_qs({json_req, {Props}}) ->
{Query} = couch_util:get_value(<<"query">>, Props, {[]}),
binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
@@ -253,42 +270,43 @@ get_view_qs(Req) ->
get_doc_ids({json_req, {Props}}) ->
check_docids(couch_util:get_value(<<"doc_ids">>, Props));
-get_doc_ids(#httpd{method='POST'}=Req) ->
+get_doc_ids(#httpd{method = 'POST'} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
{Props} = couch_httpd:json_body_obj(Req),
check_docids(couch_util:get_value(<<"doc_ids">>, Props));
-get_doc_ids(#httpd{method='GET'}=Req) ->
+get_doc_ids(#httpd{method = 'GET'} = Req) ->
DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
check_docids(DocIds);
get_doc_ids(_) ->
throw({bad_request, no_doc_ids_provided}).
-
get_selector_and_fields({json_req, {Props}}) ->
Selector = check_selector(couch_util:get_value(<<"selector">>, Props)),
Fields = check_fields(couch_util:get_value(<<"fields">>, Props, nil)),
{Selector, Fields};
-get_selector_and_fields(#httpd{method='POST'}=Req) ->
+get_selector_and_fields(#httpd{method = 'POST'} = Req) ->
couch_httpd:validate_ctype(Req, "application/json"),
- get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)});
+ get_selector_and_fields({json_req, couch_httpd:json_body_obj(Req)});
get_selector_and_fields(_) ->
throw({bad_request, "Selector must be specified in POST payload"}).
-
check_docids(DocIds) when is_list(DocIds) ->
- lists:foreach(fun
- (DocId) when not is_binary(DocId) ->
- Msg = "`doc_ids` filter parameter is not a list of doc ids.",
- throw({bad_request, Msg});
- (_) -> ok
- end, DocIds),
+ lists:foreach(
+ fun
+ (DocId) when not is_binary(DocId) ->
+ Msg = "`doc_ids` filter parameter is not a list of doc ids.",
+ throw({bad_request, Msg});
+ (_) ->
+ ok
+ end,
+ DocIds
+ ),
DocIds;
check_docids(_) ->
Msg = "`doc_ids` filter parameter is not a list of doc ids.",
throw({bad_request, Msg}).
-
-check_selector(Selector={_}) ->
+check_selector(Selector = {_}) ->
try
mango_selector:normalize(Selector)
catch
@@ -299,7 +317,6 @@ check_selector(Selector={_}) ->
check_selector(_Selector) ->
throw({bad_request, "Selector error: expected a JSON object"}).
-
check_fields(nil) ->
nil;
check_fields(Fields) when is_list(Fields) ->
@@ -314,7 +331,6 @@ check_fields(Fields) when is_list(Fields) ->
check_fields(_Fields) ->
throw({bad_request, "Selector error: fields must be JSON array"}).
-
open_ddoc(Db, DDocId) ->
DbName = couch_db:name(Db),
case couch_db:is_clustered(Db) of
@@ -330,39 +346,38 @@ open_ddoc(Db, DDocId) ->
end
end.
-
-check_member_exists(#doc{body={Props}}, Path) ->
+check_member_exists(#doc{body = {Props}}, Path) ->
couch_util:get_nested_json_value({Props}, Path).
-
-apply_style(#doc_info{revs=Revs}, main_only) ->
- [#rev_info{rev=Rev} | _] = Revs,
+apply_style(#doc_info{revs = Revs}, main_only) ->
+ [#rev_info{rev = Rev} | _] = Revs,
[{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-apply_style(#doc_info{revs=Revs}, all_docs) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
-
+apply_style(#doc_info{revs = Revs}, all_docs) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev = R} <- Revs].
open_revs(Db, DocInfo, Style) ->
- DocInfos = case Style of
- main_only -> [DocInfo];
- all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
- end,
+ DocInfos =
+ case Style of
+ main_only -> [DocInfo];
+ all_docs -> [DocInfo#doc_info{revs = [R]} || R <- DocInfo#doc_info.revs]
+ end,
OpenOpts = [deleted, conflicts],
% Relying on list comprehensions to silence errors
OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
[Doc || {ok, Doc} <- OpenResults].
-
filter_revs(Passes, Docs) ->
- lists:flatmap(fun
- ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
- RevStr = couch_doc:rev_to_str({RevPos, RevId}),
- Change = {[{<<"rev">>, RevStr}]},
- [Change];
- (_) ->
- []
- end, lists:zip(Passes, Docs)).
-
+ lists:flatmap(
+ fun
+ ({true, #doc{revs = {RevPos, [RevId | _]}}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ Change = {[{<<"rev">>, RevStr}]},
+ [Change];
+ (_) ->
+ []
+ end,
+ lists:zip(Passes, Docs)
+ ).
get_changes_timeout(Args, Callback) ->
#changes_args{
@@ -371,29 +386,30 @@ get_changes_timeout(Args, Callback) ->
feed = ResponseType
} = Args,
DefaultTimeout = chttpd_util:get_chttpd_config_integer(
- "changes_timeout", 60000),
+ "changes_timeout", 60000
+ ),
case Heartbeat of
- undefined ->
- case Timeout of
undefined ->
- {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
- infinity ->
- {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+ case Timeout of
+ undefined ->
+ {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end};
+ infinity ->
+ {infinity, fun(UserAcc) -> {stop, UserAcc} end};
+ _ ->
+ {lists:min([DefaultTimeout, Timeout]), fun(UserAcc) -> {stop, UserAcc} end}
+ end;
+ true ->
+ {DefaultTimeout, fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
_ ->
- {lists:min([DefaultTimeout, Timeout]),
- fun(UserAcc) -> {stop, UserAcc} end}
- end;
- true ->
- {DefaultTimeout,
- fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
- _ ->
- {lists:min([DefaultTimeout, Heartbeat]),
- fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
+ {lists:min([DefaultTimeout, Heartbeat]), fun(UserAcc) ->
+ {ok, Callback(timeout, ResponseType, UserAcc)}
+ end}
end.
-start_sending_changes(_Callback, UserAcc, ResponseType)
- when ResponseType =:= "continuous"
- orelse ResponseType =:= "eventsource" ->
+start_sending_changes(_Callback, UserAcc, ResponseType) when
+ ResponseType =:= "continuous" orelse
+ ResponseType =:= "eventsource"
+->
UserAcc;
start_sending_changes(Callback, UserAcc, ResponseType) ->
Callback(start, ResponseType, UserAcc).
@@ -421,8 +437,8 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -
conflicts = Conflicts,
timeout = Timeout,
timeout_fun = TimeoutFun,
- aggregation_results=[],
- aggregation_kvs=[]
+ aggregation_results = [],
+ aggregation_kvs = []
}.
send_changes(Acc, Dir, FirstRound) ->
@@ -440,30 +456,35 @@ send_changes(Acc, Dir, FirstRound) ->
couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts)
end.
-
can_optimize(true, {doc_ids, _Style, DocIds}) ->
- MaxDocIds = config:get_integer("couchdb",
- "changes_doc_ids_optimization_threshold", 100),
- if length(DocIds) =< MaxDocIds ->
- {true, fun send_changes_doc_ids/6};
- true ->
- false
+ MaxDocIds = config:get_integer(
+ "couchdb",
+ "changes_doc_ids_optimization_threshold",
+ 100
+ ),
+ if
+ length(DocIds) =< MaxDocIds ->
+ {true, fun send_changes_doc_ids/6};
+ true ->
+ false
end;
can_optimize(true, {design_docs, _Style}) ->
{true, fun send_changes_design_docs/6};
can_optimize(_, _) ->
false.
-
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
Results = couch_db:get_full_doc_infos(Db, DocIds),
- FullInfos = lists:foldl(fun
- (#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
- (not_found, Acc) -> Acc
- end, [], Results),
+ FullInfos = lists:foldl(
+ fun
+ (#full_doc_info{} = FDI, Acc) -> [FDI | Acc];
+ (not_found, Acc) -> Acc
+ end,
+ [],
+ Results
+ ),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
-
send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
Opts = [
@@ -474,49 +495,62 @@ send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
{ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts),
send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
-
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
- FoldFun = case Dir of
- fwd -> fun lists:foldl/3;
- rev -> fun lists:foldr/3
- end,
- GreaterFun = case Dir of
- fwd -> fun(A, B) -> A > B end;
- rev -> fun(A, B) -> A =< B end
- end,
- DocInfos = lists:foldl(fun(FDI, Acc) ->
- DI = couch_doc:to_doc_info(FDI),
- case GreaterFun(DI#doc_info.high_seq, StartSeq) of
- true -> [DI | Acc];
- false -> Acc
- end
- end, [], FullDocInfos),
- SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
- FinalAcc = try
- FoldFun(fun(DocInfo, Acc) ->
- case Fun(DocInfo, Acc) of
- {ok, NewAcc} ->
- NewAcc;
- {stop, NewAcc} ->
- throw({stop, NewAcc})
+ FoldFun =
+ case Dir of
+ fwd -> fun lists:foldl/3;
+ rev -> fun lists:foldr/3
+ end,
+ GreaterFun =
+ case Dir of
+ fwd -> fun(A, B) -> A > B end;
+ rev -> fun(A, B) -> A =< B end
+ end,
+ DocInfos = lists:foldl(
+ fun(FDI, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ true -> [DI | Acc];
+ false -> Acc
end
- end, Acc0, SortedDocInfos)
- catch
- {stop, Acc} -> Acc
- end,
+ end,
+ [],
+ FullDocInfos
+ ),
+ SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
+ FinalAcc =
+ try
+ FoldFun(
+ fun(DocInfo, Acc) ->
+ case Fun(DocInfo, Acc) of
+ {ok, NewAcc} ->
+ NewAcc;
+ {stop, NewAcc} ->
+ throw({stop, NewAcc})
+ end
+ end,
+ Acc0,
+ SortedDocInfos
+ )
+ catch
+ {stop, Acc} -> Acc
+ end,
case Dir of
fwd ->
- FinalAcc0 = case element(1, FinalAcc) of
- changes_acc -> % we came here via couch_http or internal call
- FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
- fabric_changes_acc -> % we came here via chttpd / fabric / rexi
- FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
- end,
+ FinalAcc0 =
+ case element(1, FinalAcc) of
+ % we came here via couch_http or internal call
+ changes_acc ->
+ FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
+ % we came here via chttpd / fabric / rexi
+ fabric_changes_acc ->
+ FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
+ end,
{ok, FinalAcc0};
- rev -> {ok, FinalAcc}
+ rev ->
+ {ok, FinalAcc}
end.
-
keep_sending_changes(Args, Acc0, FirstRound) ->
#changes_args{
feed = ResponseType,
@@ -527,36 +561,44 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
{ok, ChangesAcc} = send_changes(Acc0, fwd, FirstRound),
#changes_acc{
- db = Db, callback = Callback,
- timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
- prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
+ db = Db,
+ callback = Callback,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun,
+ seq = EndSeq,
+ prepend = Prepend2,
+ user_acc = UserAcc2,
+ limit = NewLimit
} = maybe_upgrade_changes_acc(ChangesAcc),
couch_db:close(Db),
- if Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
- true ->
- case wait_updated(Timeout, TimeoutFun, UserAcc2) of
- {updated, UserAcc4} ->
- DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
- case couch_db:open(couch_db:name(Db), DbOptions1) of
- {ok, Db2} ->
- ?MODULE:keep_sending_changes(
- Args#changes_args{limit=NewLimit},
- ChangesAcc#changes_acc{
- db = Db2,
- user_acc = UserAcc4,
- seq = EndSeq,
- prepend = Prepend2,
- timeout = Timeout,
- timeout_fun = TimeoutFun},
- false);
- _Else ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
- end;
- {stop, UserAcc4} ->
- end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
- end
+ if
+ Limit > NewLimit, ResponseType == "longpoll" ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ true ->
+ case wait_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc4} ->
+ DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
+ case couch_db:open(couch_db:name(Db), DbOptions1) of
+ {ok, Db2} ->
+ ?MODULE:keep_sending_changes(
+ Args#changes_args{limit = NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
+ },
+ false
+ );
+ _Else ->
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ end
end.
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
@@ -564,46 +606,59 @@ end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
changes_enumerator(Value, Acc) ->
#changes_acc{
- filter = Filter, callback = Callback, prepend = Prepend,
- user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun
+ filter = Filter,
+ callback = Callback,
+ prepend = Prepend,
+ user_acc = UserAcc,
+ limit = Limit,
+ resp_type = ResponseType,
+ db = Db,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
} = maybe_upgrade_changes_acc(Acc),
Results0 = filter(Db, Value, Filter),
Results = [Result || Result <- Results0, Result /= null],
- Seq = case Value of
- #full_doc_info{} ->
- Value#full_doc_info.update_seq;
- #doc_info{} ->
- Value#doc_info.high_seq
- end,
- Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+ Seq =
+ case Value of
+ #full_doc_info{} ->
+ Value#full_doc_info.update_seq;
+ #doc_info{} ->
+ Value#doc_info.high_seq
+ end,
+ Go =
+ if
+ (Limit =< 1) andalso Results =/= [] -> stop;
+ true -> ok
+ end,
case Results of
- [] ->
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- case Done of
- stop ->
- {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
- ok ->
- {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
- end;
- _ ->
- if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
- true ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, prepend = <<",\n">>,
- user_acc = UserAcc2, limit = Limit - 1}}
- end
+ [] ->
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ case Done of
+ stop ->
+ {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
+ ok ->
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
+ end;
+ _ ->
+ if
+ ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
+ true ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq,
+ prepend = <<",\n">>,
+ user_acc = UserAcc2,
+ limit = Limit - 1
+ }}
+ end
end.
-
-
changes_row(Results, #full_doc_info{} = FDI, Acc) ->
changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
changes_row(Results, DocInfo, Acc0) ->
@@ -611,26 +666,27 @@ changes_row(Results, DocInfo, Acc0) ->
#doc_info{
id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
} = DocInfo,
- {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
- deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}.
+ {
+ [{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+ deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)
+ }.
-maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
+maybe_get_changes_doc(Value, #changes_acc{include_docs = true} = Acc) ->
#changes_acc{
db = Db,
doc_options = DocOpts,
conflicts = Conflicts,
filter = Filter
} = Acc,
- Opts = case Conflicts of
- true -> [deleted, conflicts];
- false -> [deleted]
- end,
+ Opts =
+ case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
load_doc(Db, Value, Opts, DocOpts, Filter);
-
maybe_get_changes_doc(_Value, _Acc) ->
[].
-
load_doc(Db, Value, Opts, DocOpts, Filter) ->
case couch_index_util:load_doc(Db, Value, Opts) of
null ->
@@ -639,68 +695,66 @@ load_doc(Db, Value, Opts, DocOpts, Filter) ->
[{doc, doc_to_json(Doc, DocOpts, Filter)}]
end.
-
-doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
- when Fields =/= nil ->
+doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}}) when
+ Fields =/= nil
+->
mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
doc_to_json(Doc, DocOpts, _Filter) ->
couch_doc:to_json_obj(Doc, DocOpts).
-
deleted_item(true) -> [{<<"deleted">>, true}];
deleted_item(_) -> [].
% waits for a updated msg, if there are multiple msgs, collects them.
wait_updated(Timeout, TimeoutFun, UserAcc) ->
receive
- updated ->
- get_rest_updated(UserAcc);
- deleted ->
- {stop, UserAcc}
+ updated ->
+ get_rest_updated(UserAcc);
+ deleted ->
+ {stop, UserAcc}
after Timeout ->
{Go, UserAcc2} = TimeoutFun(UserAcc),
case Go of
- ok ->
- ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
- stop ->
- {stop, UserAcc2}
+ ok ->
+ ?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
+ stop ->
+ {stop, UserAcc2}
end
end.
get_rest_updated(UserAcc) ->
receive
- updated ->
- get_rest_updated(UserAcc)
+ updated ->
+ get_rest_updated(UserAcc)
after 0 ->
{updated, UserAcc}
end.
reset_heartbeat() ->
case get(last_changes_heartbeat) of
- undefined ->
- ok;
- _ ->
- put(last_changes_heartbeat, os:timestamp())
+ undefined ->
+ ok;
+ _ ->
+ put(last_changes_heartbeat, os:timestamp())
end.
maybe_heartbeat(Timeout, TimeoutFun, Acc) ->
Before = get(last_changes_heartbeat),
case Before of
- undefined ->
- {ok, Acc};
- _ ->
- Now = os:timestamp(),
- case timer:now_diff(Now, Before) div 1000 >= Timeout of
- true ->
- Acc2 = TimeoutFun(Acc),
- put(last_changes_heartbeat, Now),
- Acc2;
- false ->
- {ok, Acc}
- end
+ undefined ->
+ {ok, Acc};
+ _ ->
+ Now = os:timestamp(),
+ case timer:now_diff(Now, Before) div 1000 >= Timeout of
+ true ->
+ Acc2 = TimeoutFun(Acc),
+ put(last_changes_heartbeat, Now),
+ Acc2;
+ false ->
+ {ok, Acc}
+ end
end.
-
maybe_upgrade_changes_acc(#changes_acc{} = Acc) ->
Acc;
maybe_upgrade_changes_acc(Acc) when tuple_size(Acc) == 19 ->