diff options
author | Jan Lehnardt <jan@apache.org> | 2020-06-12 18:49:05 +0200 |
---|---|---|
committer | Jan Lehnardt <jan@apache.org> | 2020-07-10 19:08:52 +0200 |
commit | 3661c0fca3b35d86a7fde748ef7c6d119f36cd02 (patch) | |
tree | fc4098eea2d55333803056113e74319fc698678e | |
parent | 7ffcd412556e942c006eeb70cd58ea7c5d23436d (diff) | |
download | couchdb-3661c0fca3b35d86a7fde748ef7c6d119f36cd02.tar.gz |
fix: all_docs on partitioned dbs, override partition requirement on faked access all docs query
-rw-r--r-- | src/couch/src/couch_db.erl | 1 | ||||
-rw-r--r-- | src/couch_mrview/include/couch_mrview.hrl.orig | 110 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview.erl | 8 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview.erl.orig | 701 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_http.erl.orig | 640 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl.orig | 380 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_updater.erl.rej | 52 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl | 4 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl.orig | 1177 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_util.erl.rej | 16 |
10 files changed, 3082 insertions, 7 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index ecd456c37..b315f07c1 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -1792,7 +1792,6 @@ open_doc_revs_int(Db, IdRevs, Options) -> open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) -> case couch_db_engine:open_local_docs(Db, [Id]) of [#doc{} = Doc] -> - couch_log:info("~nopen_doc_int: Doc: ~p~n", [Doc]), case Doc#doc.body of { Body } -> Access = couch_util:get_value(<<"_access">>, Body), diff --git a/src/couch_mrview/include/couch_mrview.hrl.orig b/src/couch_mrview/include/couch_mrview.hrl.orig new file mode 100644 index 000000000..bb0ab0b46 --- /dev/null +++ b/src/couch_mrview/include/couch_mrview.hrl.orig @@ -0,0 +1,110 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-record(mrst, { + sig=nil, + fd=nil, + fd_monitor, + db_name, + idx_name, + language, + design_opts=[], + partitioned=false, + lib, + views, + id_btree=nil, + update_seq=0, + purge_seq=0, + first_build, + partial_resp_pid, + doc_acc, + doc_queue, + write_queue, + qserver=nil +}). + + +-record(mrview, { + id_num, + update_seq=0, + purge_seq=0, + map_names=[], + reduce_funs=[], + def, + btree=nil, + options=[] +}). + + +-record(mrheader, { + seq=0, + purge_seq=0, + id_btree_state=nil, + view_states=nil +}). + +-define(MAX_VIEW_LIMIT, 16#10000000). + +-record(mrargs, { + view_type, + reduce, + + preflight_fun, + + start_key, + start_key_docid, + end_key, + end_key_docid, + keys, + + direction = fwd, + limit = ?MAX_VIEW_LIMIT, + skip = 0, + group_level = 0, + group = undefined, + stable = false, + update = true, + multi_get = false, + inclusive_end = true, + include_docs = false, + doc_options = [], + update_seq=false, + conflicts, + callback, + sorted = true, + extra = [] +}). + +-record(vacc, { + db, + req, + resp, + prepend, + etag, + should_close = false, + buffer = [], + bufsize = 0, + threshold = 1490, + row_sent = false, + meta_sent = false +}). + +-record(lacc, { + db, + req, + resp, + qserver, + lname, + etag, + code, + headers +}). diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 98bceaeb2..0d41f2ef6 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -233,9 +233,9 @@ query_all_docs(Db, Args) -> query_all_docs(Db, Args, Callback, Acc) when is_list(Args) -> query_all_docs(Db, to_mrargs(Args), Callback, Acc); query_all_docs(Db, Args0, Callback, Acc) -> - case couch_db:is_admin(Db) of - true -> query_all_docs_admin(Db, Args0, Callback, Acc); - false -> query_all_docs_access(Db, Args0, Callback, Acc) + case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of + true -> query_all_docs_access(Db, Args0, Callback, Acc); + false -> query_all_docs_admin(Db, Args0, Callback, Acc) end. access_ddoc() -> @@ -305,7 +305,7 @@ query_all_docs_access(Db, Args0, Callback0, Acc) -> UserCtx = couch_db:get_user_ctx(Db), UserName = UserCtx#user_ctx.name, Args1 = prefix_startkey_endkey(UserName, Args0, Args0#mrargs.direction), - Args = Args1#mrargs{reduce=false}, + Args = Args1#mrargs{reduce=false, extra=Args1#mrargs.extra ++ [{all_docs_access, true}]}, Callback = fun ({row, Props}, Acc0) -> diff --git a/src/couch_mrview/src/couch_mrview.erl.orig b/src/couch_mrview/src/couch_mrview.erl.orig new file mode 100644 index 000000000..1cdc91809 --- /dev/null +++ b/src/couch_mrview/src/couch_mrview.erl.orig @@ -0,0 +1,701 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview). + +-export([validate/2]). +-export([query_all_docs/2, query_all_docs/4]). +-export([query_view/3, query_view/4, query_view/6, get_view_index_pid/4]). +-export([get_info/2]). +-export([trigger_update/2, trigger_update/3]). +-export([get_view_info/3]). +-export([refresh/2]). +-export([compact/2, compact/3, cancel_compaction/2]). +-export([cleanup/1]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-record(mracc, { + db, + meta_sent=false, + total_rows, + offset, + limit, + skip, + group_level, + doc_info, + callback, + user_acc, + last_go=ok, + reduce_fun, + finalizer, + update_seq, + args +}). + + + +validate_ddoc_fields(DDoc) -> + MapFuncType = map_function_type(DDoc), + lists:foreach(fun(Path) -> + validate_ddoc_fields(DDoc, Path) + end, [ + [{<<"filters">>, object}, {any, [object, string]}], + [{<<"language">>, string}], + [{<<"lists">>, object}, {any, [object, string]}], + [{<<"options">>, object}], + [{<<"options">>, object}, {<<"include_design">>, boolean}], + [{<<"options">>, object}, {<<"local_seq">>, boolean}], + [{<<"options">>, object}, {<<"partitioned">>, boolean}], + [{<<"rewrites">>, [string, array]}], + [{<<"shows">>, object}, {any, [object, string]}], + [{<<"updates">>, object}, {any, [object, string]}], + [{<<"validate_doc_update">>, string}], + [{<<"views">>, object}, {<<"lib">>, object}], + [{<<"views">>, object}, {any, object}, {<<"map">>, MapFuncType}], + [{<<"views">>, object}, {any, object}, {<<"reduce">>, string}] + ]), + require_map_function_for_views(DDoc), + ok. + +require_map_function_for_views({Props}) -> + case couch_util:get_value(<<"views">>, Props) of + undefined -> ok; + {Views} -> + lists:foreach(fun + ({<<"lib">>, _}) -> ok; + ({Key, {Value}}) -> + case couch_util:get_value(<<"map">>, Value) of + undefined -> throw({invalid_design_doc, + <<"View `", Key/binary, "` must contain map function">>}); + _ -> ok + end + end, Views), + ok + end. + +validate_ddoc_fields(DDoc, Path) -> + case validate_ddoc_fields(DDoc, Path, []) of + ok -> ok; + {error, {FailedPath0, Type0}} -> + FailedPath = iolist_to_binary(join(FailedPath0, <<".">>)), + Type = format_type(Type0), + throw({invalid_design_doc, + <<"`", FailedPath/binary, "` field must have ", + Type/binary, " type">>}) + end. + +validate_ddoc_fields(undefined, _, _) -> + ok; +validate_ddoc_fields(_, [], _) -> + ok; +validate_ddoc_fields({KVS}=Props, [{any, Type} | Rest], Acc) -> + lists:foldl(fun + ({Key, _}, ok) -> + validate_ddoc_fields(Props, [{Key, Type} | Rest], Acc); + ({_, _}, {error, _}=Error) -> + Error + end, ok, KVS); +validate_ddoc_fields({KVS}=Props, [{Key, Type} | Rest], Acc) -> + case validate_ddoc_field(Props, {Key, Type}) of + ok -> + validate_ddoc_fields(couch_util:get_value(Key, KVS), + Rest, + [Key | Acc]); + error -> + {error, {[Key | Acc], Type}}; + {error, Key1} -> + {error, {[Key1 | Acc], Type}} + end. + +validate_ddoc_field(undefined, Type) when is_atom(Type) -> + ok; +validate_ddoc_field(_, any) -> + ok; +validate_ddoc_field(Value, Types) when is_list(Types) -> + lists:foldl(fun + (_, ok) -> ok; + (Type, _) -> validate_ddoc_field(Value, Type) + end, error, Types); +validate_ddoc_field(Value, string) when is_binary(Value) -> + ok; +validate_ddoc_field(Value, array) when is_list(Value) -> + ok; +validate_ddoc_field({Value}, object) when is_list(Value) -> + ok; +validate_ddoc_field(Value, boolean) when is_boolean(Value) -> + ok; +validate_ddoc_field({Props}, {any, Type}) -> + validate_ddoc_field1(Props, Type); +validate_ddoc_field({Props}, {Key, Type}) -> + validate_ddoc_field(couch_util:get_value(Key, Props), Type); +validate_ddoc_field(_, _) -> + error. + +validate_ddoc_field1([], _) -> + ok; +validate_ddoc_field1([{Key, Value} | Rest], Type) -> + case validate_ddoc_field(Value, Type) of + ok -> + validate_ddoc_field1(Rest, Type); + error -> + {error, Key} + end. + +map_function_type({Props}) -> + case couch_util:get_value(<<"language">>, Props) of + <<"query">> -> object; + _ -> string + end. + +format_type(Type) when is_atom(Type) -> + ?l2b(atom_to_list(Type)); +format_type(Types) when is_list(Types) -> + iolist_to_binary(join(lists:map(fun atom_to_list/1, Types), <<" or ">>)). + +join(L, Sep) -> + join(L, Sep, []). +join([H|[]], _, Acc) -> + [H | Acc]; +join([H|T], Sep, Acc) -> + join(T, Sep, [Sep, H | Acc]). + + +validate(Db, DDoc) -> + ok = validate_ddoc_fields(DDoc#doc.body), + GetName = fun + (#mrview{map_names = [Name | _]}) -> Name; + (#mrview{reduce_funs = [{Name, _} | _]}) -> Name; + (_) -> null + end, + ValidateView = fun(Proc, #mrview{def=MapSrc, reduce_funs=Reds}=View) -> + couch_query_servers:try_compile(Proc, map, GetName(View), MapSrc), + lists:foreach(fun + ({_RedName, <<"_sum", _/binary>>}) -> + ok; + ({_RedName, <<"_count", _/binary>>}) -> + ok; + ({_RedName, <<"_stats", _/binary>>}) -> + ok; + ({_RedName, <<"_approx_count_distinct", _/binary>>}) -> + ok; + ({_RedName, <<"_", _/binary>> = Bad}) -> + Msg = ["`", Bad, "` is not a supported reduce function."], + throw({invalid_design_doc, Msg}); + ({RedName, RedSrc}) -> + couch_query_servers:try_compile(Proc, reduce, RedName, RedSrc) + end, Reds) + end, + {ok, #mrst{ + language = Lang, + views = Views, + partitioned = Partitioned + }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), + + case {couch_db:is_partitioned(Db), Partitioned} of + {false, true} -> + throw({invalid_design_doc, + <<"partitioned option cannot be true in a " + "non-partitioned database.">>}); + {_, _} -> + ok + end, + + try Views =/= [] andalso couch_query_servers:get_os_process(Lang) of + false -> + ok; + Proc -> + try + lists:foreach(fun(V) -> ValidateView(Proc, V) end, Views) + after + couch_query_servers:ret_os_process(Proc) + end + catch {unknown_query_language, _Lang} -> + %% Allow users to save ddocs written in unknown languages + ok + end. + + +query_all_docs(Db, Args) -> + query_all_docs(Db, Args, fun default_cb/2, []). + + +query_all_docs(Db, Args, Callback, Acc) when is_list(Args) -> + query_all_docs(Db, to_mrargs(Args), Callback, Acc); +query_all_docs(Db, Args0, Callback, Acc) -> + Sig = couch_util:with_db(Db, fun(WDb) -> + {ok, Info} = couch_db:get_db_info(WDb), + couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Info))) + end), + Args1 = Args0#mrargs{view_type=map}, + Args2 = couch_mrview_util:validate_all_docs_args(Db, Args1), + {ok, Acc1} = case Args2#mrargs.preflight_fun of + PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc); + _ -> {ok, Acc} + end, + all_docs_fold(Db, Args2, Callback, Acc1). + + +query_view(Db, DDoc, VName) -> + query_view(Db, DDoc, VName, #mrargs{}). + + +query_view(Db, DDoc, VName, Args) when is_list(Args) -> + query_view(Db, DDoc, VName, to_mrargs(Args), fun default_cb/2, []); +query_view(Db, DDoc, VName, Args) -> + query_view(Db, DDoc, VName, Args, fun default_cb/2, []). + + +query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) -> + query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc); +query_view(Db, DDoc, VName, Args0, Callback, Acc0) -> + case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of + {ok, VInfo, Sig, Args} -> + {ok, Acc1} = case Args#mrargs.preflight_fun of + PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0); + _ -> {ok, Acc0} + end, + query_view(Db, VInfo, Args, Callback, Acc1); + ddoc_updated -> + Callback(ok, ddoc_updated) + end. + + +get_view_index_pid(Db, DDoc, ViewName, Args0) -> + couch_mrview_util:get_view_index_pid(Db, DDoc, ViewName, Args0). + + +query_view(Db, {Type, View, Ref}, Args, Callback, Acc) -> + try + case Type of + map -> map_fold(Db, View, Args, Callback, Acc); + red -> red_fold(Db, View, Args, Callback, Acc) + end + after + erlang:demonitor(Ref, [flush]) + end. + + +get_info(Db, DDoc) -> + {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + couch_index:get_info(Pid). + + +trigger_update(Db, DDoc) -> + trigger_update(Db, DDoc, couch_db:get_update_seq(Db)). + +trigger_update(Db, DDoc, UpdateSeq) -> + {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + couch_index:trigger_update(Pid, UpdateSeq). + +%% get informations on a view +get_view_info(Db, DDoc, VName) -> + {ok, {_, View, _}, _, _Args} = couch_mrview_util:get_view(Db, DDoc, VName, + #mrargs{}), + + %% get the total number of rows + {ok, TotalRows} = couch_mrview_util:get_row_count(View), + + {ok, [{update_seq, View#mrview.update_seq}, + {purge_seq, View#mrview.purge_seq}, + {total_rows, TotalRows}]}. + + +%% @doc refresh a view index +refresh(DbName, DDoc) when is_binary(DbName)-> + UpdateSeq = couch_util:with_db(DbName, fun(WDb) -> + couch_db:get_update_seq(WDb) + end), + + case couch_index_server:get_index(couch_mrview_index, DbName, DDoc) of + {ok, Pid} -> + case catch couch_index:get_state(Pid, UpdateSeq) of + {ok, _} -> ok; + Error -> {error, Error} + end; + Error -> + {error, Error} + end; + +refresh(Db, DDoc) -> + refresh(couch_db:name(Db), DDoc). + +compact(Db, DDoc) -> + compact(Db, DDoc, []). + + +compact(Db, DDoc, Opts) -> + {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + couch_index:compact(Pid, Opts). + + +cancel_compaction(Db, DDoc) -> + {ok, IPid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc), + {ok, CPid} = couch_index:get_compactor_pid(IPid), + ok = couch_index_compactor:cancel(CPid), + + % Cleanup the compaction file if it exists + {ok, #mrst{sig=Sig, db_name=DbName}} = couch_index:get_state(IPid, 0), + couch_mrview_util:delete_compaction_file(DbName, Sig), + ok. + + +cleanup(Db) -> + couch_mrview_cleanup:run(Db). + + +all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) -> + ReduceFun = get_reduce_fun(Args), + Total = get_total_rows(Db, Args), + UpdateSeq = get_update_seq(Db, Args), + Acc = #mracc{ + db=Db, + total_rows=Total, + limit=Args#mrargs.limit, + skip=Args#mrargs.skip, + callback=Callback, + user_acc=UAcc, + reduce_fun=ReduceFun, + update_seq=UpdateSeq, + args=Args + }, + [Opts1] = couch_mrview_util:all_docs_key_opts(Args), + % TODO: This is a terrible hack for now. We'll probably have + % to rewrite _all_docs to not be part of mrview and not expect + % a btree. For now non-btree's will just have to pass 0 or + % some fake reductions to get an offset. + Opts2 = [include_reductions | Opts1], + FunName = case couch_util:get_value(namespace, Args#mrargs.extra) of + <<"_design">> -> fold_design_docs; + <<"_local">> -> fold_local_docs; + _ -> fold_docs + end, + {ok, Offset, FinalAcc} = couch_db:FunName(Db, fun map_fold/3, Acc, Opts2), + finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]); +all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) -> + ReduceFun = get_reduce_fun(Args), + Total = get_total_rows(Db, Args), + UpdateSeq = get_update_seq(Db, Args), + Acc = #mracc{ + db=Db, + total_rows=Total, + limit=Args#mrargs.limit, + skip=Args#mrargs.skip, + callback=Callback, + user_acc=UAcc, + reduce_fun=ReduceFun, + update_seq=UpdateSeq, + args=Args + }, + % Backwards compatibility hack. The old _all_docs iterates keys + % in reverse if descending=true was passed. Here we'll just + % reverse the list instead. + Keys = if Dir =:= fwd -> Keys0; true -> lists:reverse(Keys0) end, + + FoldFun = fun(Key, Acc0) -> + DocInfo = (catch couch_db:get_doc_info(Db, Key)), + {Doc, Acc1} = case DocInfo of + {ok, #doc_info{id=Id, revs=[RevInfo | _RestRevs]}=DI} -> + Rev = couch_doc:rev_to_str(RevInfo#rev_info.rev), + Props = [{rev, Rev}] ++ case RevInfo#rev_info.deleted of + true -> [{deleted, true}]; + false -> [] + end, + {{{Id, Id}, {Props}}, Acc0#mracc{doc_info=DI}}; + not_found -> + {{{Key, error}, not_found}, Acc0} + end, + {_, Acc2} = map_fold(Doc, {[], [{0, 0, 0}]}, Acc1), + Acc2 + end, + FinalAcc = lists:foldl(FoldFun, Acc, Keys), + finish_fold(FinalAcc, [{total, Total}]). + + +map_fold(Db, View, Args, Callback, UAcc) -> + {ok, Total} = couch_mrview_util:get_row_count(View), + Acc = #mracc{ + db=Db, + total_rows=Total, + limit=Args#mrargs.limit, + skip=Args#mrargs.skip, + callback=Callback, + user_acc=UAcc, + reduce_fun=fun couch_mrview_util:reduce_to_count/1, + update_seq=View#mrview.update_seq, + args=Args + }, + OptList = couch_mrview_util:key_opts(Args), + {Reds, Acc2} = lists:foldl(fun(Opts, {_, Acc0}) -> + {ok, R, A} = couch_mrview_util:fold(View, fun map_fold/3, Acc0, Opts), + {R, A} + end, {nil, Acc}, OptList), + Offset = couch_mrview_util:reduce_to_count(Reds), + finish_fold(Acc2, [{total, Total}, {offset, Offset}]). + + +map_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) -> + % matches for _all_docs and translates #full_doc_info{} -> KV pair + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI -> + Value = {[{rev, couch_doc:rev_to_str(Rev)}]}, + map_fold({{Id, Id}, Value}, OffsetReds, Acc#mracc{doc_info=DI}); + #doc_info{revs=[#rev_info{deleted=true}|_]} -> + {ok, Acc} + end; +map_fold(_KV, _Offset, #mracc{skip=N}=Acc) when N > 0 -> + {ok, Acc#mracc{skip=N-1, last_go=ok}}; +map_fold(KV, OffsetReds, #mracc{offset=undefined}=Acc) -> + #mracc{ + total_rows=Total, + callback=Callback, + user_acc=UAcc0, + reduce_fun=Reduce, + update_seq=UpdateSeq, + args=Args + } = Acc, + Offset = Reduce(OffsetReds), + Meta = make_meta(Args, UpdateSeq, [{total, Total}, {offset, Offset}]), + {Go, UAcc1} = Callback(Meta, UAcc0), + Acc1 = Acc#mracc{meta_sent=true, offset=Offset, user_acc=UAcc1, last_go=Go}, + case Go of + ok -> map_fold(KV, OffsetReds, Acc1); + stop -> {stop, Acc1} + end; +map_fold(_KV, _Offset, #mracc{limit=0}=Acc) -> + {stop, Acc}; +map_fold({{Key, Id}, Val}, _Offset, Acc) -> + #mracc{ + db=Db, + limit=Limit, + doc_info=DI, + callback=Callback, + user_acc=UAcc0, + args=Args + } = Acc, + Doc = case DI of + #doc_info{} -> couch_mrview_util:maybe_load_doc(Db, DI, Args); + _ -> couch_mrview_util:maybe_load_doc(Db, Id, Val, Args) + end, + Row = [{id, Id}, {key, Key}, {value, Val}] ++ Doc, + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{ + limit=Limit-1, + doc_info=undefined, + user_acc=UAcc1, + last_go=Go + }}; +map_fold(#doc{id = <<"_local/", _/binary>>} = Doc, _Offset, #mracc{} = Acc) -> + #mracc{ + limit=Limit, + callback=Callback, + user_acc=UAcc0, + args=Args + } = Acc, + #doc{ + id = DocId, + revs = {Pos, [RevId | _]} + } = Doc, + Rev = {Pos, RevId}, + Row = [ + {id, DocId}, + {key, DocId}, + {value, {[{rev, couch_doc:rev_to_str(Rev)}]}} + ] ++ if not Args#mrargs.include_docs -> []; true -> + [{doc, couch_doc:to_json_obj(Doc, Args#mrargs.doc_options)}] + end, + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{ + limit=Limit-1, + reduce_fun=undefined, + doc_info=undefined, + user_acc=UAcc1, + last_go=Go + }}. + +red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) -> + Finalizer = case couch_util:get_value(finalizer, Args#mrargs.extra) of + undefined -> + {_, FunSrc} = lists:nth(NthRed, View#mrview.reduce_funs), + FunSrc; + CustomFun-> + CustomFun + end, + Acc = #mracc{ + db=Db, + total_rows=null, + limit=Args#mrargs.limit, + skip=Args#mrargs.skip, + group_level=Args#mrargs.group_level, + callback=Callback, + user_acc=UAcc, + update_seq=View#mrview.update_seq, + finalizer=Finalizer, + args=Args + }, + Grouping = {key_group_level, Args#mrargs.group_level}, + OptList = couch_mrview_util:key_opts(Args, [Grouping]), + Acc2 = lists:foldl(fun(Opts, Acc0) -> + {ok, Acc1} = + couch_mrview_util:fold_reduce(RedView, fun red_fold/3, Acc0, Opts), + Acc1 + end, Acc, OptList), + finish_fold(Acc2, []). + +red_fold({p, _Partition, Key}, Red, Acc) -> + red_fold(Key, Red, Acc); +red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 -> + {ok, Acc#mracc{skip=N-1, last_go=ok}}; +red_fold(Key, Red, #mracc{meta_sent=false}=Acc) -> + #mracc{ + args=Args, + callback=Callback, + user_acc=UAcc0, + update_seq=UpdateSeq + } = Acc, + Meta = make_meta(Args, UpdateSeq, []), + {Go, UAcc1} = Callback(Meta, UAcc0), + Acc1 = Acc#mracc{user_acc=UAcc1, meta_sent=true, last_go=Go}, + case Go of + ok -> red_fold(Key, Red, Acc1); + _ -> {Go, Acc1} + end; +red_fold(_Key, _Red, #mracc{limit=0} = Acc) -> + {stop, Acc}; +red_fold(_Key, Red, #mracc{group_level=0} = Acc) -> + #mracc{ + finalizer=Finalizer, + limit=Limit, + callback=Callback, + user_acc=UAcc0 + } = Acc, + Row = [{key, null}, {value, maybe_finalize(Red, Finalizer)}], + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}; +red_fold(Key, Red, #mracc{group_level=exact} = Acc) -> + #mracc{ + finalizer=Finalizer, + limit=Limit, + callback=Callback, + user_acc=UAcc0 + } = Acc, + Row = [{key, Key}, {value, maybe_finalize(Red, Finalizer)}], + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}; +red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0, is_list(K) -> + #mracc{ + finalizer=Finalizer, + limit=Limit, + callback=Callback, + user_acc=UAcc0 + } = Acc, + Row = [{key, lists:sublist(K, I)}, {value, maybe_finalize(Red, Finalizer)}], + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}; +red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0 -> + #mracc{ + finalizer=Finalizer, + limit=Limit, + callback=Callback, + user_acc=UAcc0 + } = Acc, + Row = [{key, K}, {value, maybe_finalize(Red, Finalizer)}], + {Go, UAcc1} = Callback({row, Row}, UAcc0), + {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}. + +maybe_finalize(Red, null) -> + Red; +maybe_finalize(Red, RedSrc) -> + {ok, Finalized} = couch_query_servers:finalize(RedSrc, Red), + Finalized. + +finish_fold(#mracc{last_go=ok, update_seq=UpdateSeq}=Acc, ExtraMeta) -> + #mracc{callback=Callback, user_acc=UAcc, args=Args}=Acc, + % Possible send meta info + Meta = make_meta(Args, UpdateSeq, ExtraMeta), + {Go, UAcc1} = case Acc#mracc.meta_sent of + false -> Callback(Meta, UAcc); + _ -> {ok, Acc#mracc.user_acc} + end, + % Notify callback that the fold is complete. + {_, UAcc2} = case Go of + ok -> Callback(complete, UAcc1); + _ -> {ok, UAcc1} + end, + {ok, UAcc2}; +finish_fold(#mracc{user_acc=UAcc}, _ExtraMeta) -> + {ok, UAcc}. + + +make_meta(Args, UpdateSeq, Base) -> + case Args#mrargs.update_seq of + true -> {meta, Base ++ [{update_seq, UpdateSeq}]}; + _ -> {meta, Base} + end. + + +get_reduce_fun(#mrargs{extra = Extra}) -> + case couch_util:get_value(namespace, Extra) of + <<"_local">> -> + fun(_) -> null end; + _ -> + fun couch_mrview_util:all_docs_reduce_to_count/1 + end. + + +get_total_rows(Db, #mrargs{extra = Extra}) -> + case couch_util:get_value(namespace, Extra) of + <<"_local">> -> + null; + <<"_design">> -> + {ok, N} = couch_db:get_design_doc_count(Db), + N; + _ -> + {ok, Info} = couch_db:get_db_info(Db), + couch_util:get_value(doc_count, Info) + end. + + +get_update_seq(Db, #mrargs{extra = Extra}) -> + case couch_util:get_value(namespace, Extra) of + <<"_local">> -> + null; + _ -> + couch_db:get_update_seq(Db) + end. + + +default_cb(complete, Acc) -> + {ok, lists:reverse(Acc)}; +default_cb({final, Info}, []) -> + {ok, [Info]}; +default_cb({final, _}, Acc) -> + {ok, Acc}; +default_cb(ok, ddoc_updated) -> + {ok, ddoc_updated}; +default_cb(Row, Acc) -> + {ok, [Row | Acc]}. + + +to_mrargs(KeyList) -> + lists:foldl(fun({Key, Value}, Acc) -> + Index = lookup_index(couch_util:to_existing_atom(Key)), + setelement(Index, Acc, Value) + end, #mrargs{}, KeyList). + + +lookup_index(Key) -> + Index = lists:zip( + record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs)) + ), + couch_util:get_value(Key, Index). diff --git a/src/couch_mrview/src/couch_mrview_http.erl.orig b/src/couch_mrview/src/couch_mrview_http.erl.orig new file mode 100644 index 000000000..3cf8833d7 --- /dev/null +++ b/src/couch_mrview/src/couch_mrview_http.erl.orig @@ -0,0 +1,640 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_http). + +-export([ + handle_all_docs_req/2, + handle_local_docs_req/2, + handle_design_docs_req/2, + handle_reindex_req/3, + handle_view_req/3, + handle_temp_view_req/2, + handle_info_req/3, + handle_compact_req/3, + handle_cleanup_req/2 +]). + +-export([ + parse_boolean/1, + parse_int/1, + parse_pos_int/1, + prepend_val/1, + parse_body_and_query/2, + parse_body_and_query/3, + parse_params/2, + parse_params/3, + parse_params/4, + view_cb/2, + row_to_json/1, + row_to_json/2, + check_view_etag/3 +]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + + +handle_all_docs_req(#httpd{method='GET'}=Req, Db) -> + all_docs_req(Req, Db, undefined); +handle_all_docs_req(#httpd{method='POST'}=Req, Db) -> + chttpd:validate_ctype(Req, "application/json"), + Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)), + all_docs_req(Req, Db, Keys); +handle_all_docs_req(Req, _Db) -> + chttpd:send_method_not_allowed(Req, "GET,POST,HEAD"). + +handle_local_docs_req(#httpd{method='GET'}=Req, Db) -> + all_docs_req(Req, Db, undefined, <<"_local">>); +handle_local_docs_req(#httpd{method='POST'}=Req, Db) -> + chttpd:validate_ctype(Req, "application/json"), + Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)), + all_docs_req(Req, Db, Keys, <<"_local">>); +handle_local_docs_req(Req, _Db) -> + chttpd:send_method_not_allowed(Req, "GET,POST,HEAD"). + +handle_design_docs_req(#httpd{method='GET'}=Req, Db) -> + all_docs_req(Req, Db, undefined, <<"_design">>); +handle_design_docs_req(#httpd{method='POST'}=Req, Db) -> + chttpd:validate_ctype(Req, "application/json"), + Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)), + all_docs_req(Req, Db, Keys, <<"_design">>); +handle_design_docs_req(Req, _Db) -> + chttpd:send_method_not_allowed(Req, "GET,POST,HEAD"). + +handle_reindex_req(#httpd{method='POST', + path_parts=[_, _, DName,<<"_reindex">>]}=Req, + Db, _DDoc) -> + chttpd:validate_ctype(Req, "application/json"), + ok = couch_db:check_is_admin(Db), + couch_mrview:trigger_update(Db, <<"_design/", DName/binary>>), + chttpd:send_json(Req, 201, {[{<<"ok">>, true}]}); +handle_reindex_req(Req, _Db, _DDoc) -> + chttpd:send_method_not_allowed(Req, "POST"). + + +handle_view_req(#httpd{method='GET', + path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req, + Db, _DDoc) -> + DbName = couch_db:name(Db), + DDocId = <<"_design/", DDocName/binary >>, + {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName), + + FinalInfo = [{db_name, DbName}, + {ddoc, DDocId}, + {view, VName}] ++ Info, + chttpd:send_json(Req, 200, {FinalInfo}); +handle_view_req(#httpd{method='GET'}=Req, Db, DDoc) -> + [_, _, _, _, ViewName] = Req#httpd.path_parts, + couch_stats:increment_counter([couchdb, httpd, view_reads]), + design_doc_view(Req, Db, DDoc, ViewName, undefined); +handle_view_req(#httpd{method='POST'}=Req, Db, DDoc) -> + chttpd:validate_ctype(Req, "application/json"), + [_, _, _, _, ViewName] = Req#httpd.path_parts, + Props = chttpd:json_body_obj(Req), + Keys = couch_mrview_util:get_view_keys(Props), + Queries = couch_mrview_util:get_view_queries(Props), + case {Queries, Keys} of + {Queries, undefined} when is_list(Queries) -> + IncrBy = length(Queries), + couch_stats:increment_counter([couchdb, httpd, view_reads], IncrBy), + multi_query_view(Req, Db, DDoc, ViewName, Queries); + {undefined, Keys} when is_list(Keys) -> + couch_stats:increment_counter([couchdb, httpd, view_reads]), + design_doc_view(Req, Db, DDoc, ViewName, Keys); + {undefined, undefined} -> + throw({ + bad_request, + "POST body must contain `keys` or `queries` field" + }); + {_, _} -> + throw({bad_request, "`keys` and `queries` are mutually exclusive"}) + end; +handle_view_req(Req, _Db, _DDoc) -> + chttpd:send_method_not_allowed(Req, "GET,POST,HEAD"). + + +handle_temp_view_req(#httpd{method='POST'}=Req, Db) -> + chttpd:validate_ctype(Req, "application/json"), + ok = couch_db:check_is_admin(Db), + {Body} = chttpd:json_body_obj(Req), + DDoc = couch_mrview_util:temp_view_to_ddoc({Body}), + Keys = couch_mrview_util:get_view_keys({Body}), + couch_stats:increment_counter([couchdb, httpd, temporary_view_reads]), + design_doc_view(Req, Db, DDoc, <<"temp">>, Keys); +handle_temp_view_req(Req, _Db) -> + chttpd:send_method_not_allowed(Req, "POST"). + + +handle_info_req(#httpd{method='GET'}=Req, Db, DDoc) -> + [_, _, Name, _] = Req#httpd.path_parts, + {ok, Info} = couch_mrview:get_info(Db, DDoc), + chttpd:send_json(Req, 200, {[ + {name, Name}, + {view_index, {Info}} + ]}); +handle_info_req(Req, _Db, _DDoc) -> + chttpd:send_method_not_allowed(Req, "GET"). + + +handle_compact_req(#httpd{method='POST'}=Req, Db, DDoc) -> + chttpd:validate_ctype(Req, "application/json"), + ok = couch_db:check_is_admin(Db), + ok = couch_mrview:compact(Db, DDoc), + chttpd:send_json(Req, 202, {[{ok, true}]}); +handle_compact_req(Req, _Db, _DDoc) -> + chttpd:send_method_not_allowed(Req, "POST"). + + +handle_cleanup_req(#httpd{method='POST'}=Req, Db) -> + chttpd:validate_ctype(Req, "application/json"), + ok = couch_db:check_is_admin(Db), + ok = couch_mrview:cleanup(Db), + chttpd:send_json(Req, 202, {[{ok, true}]}); +handle_cleanup_req(Req, _Db) -> + chttpd:send_method_not_allowed(Req, "POST"). + + +all_docs_req(Req, Db, Keys) -> + all_docs_req(Req, Db, Keys, undefined). + +all_docs_req(Req, Db, Keys, NS) -> + case is_restricted(Db, NS) of + true -> + case (catch couch_db:check_is_admin(Db)) of + ok -> + do_all_docs_req(Req, Db, Keys, NS); + _ when NS == <<"_local">> -> + throw({forbidden, <<"Only admins can access _local_docs">>}); + _ -> + case is_public_fields_configured(Db) of + true -> + do_all_docs_req(Req, Db, Keys, NS); + false -> + throw({forbidden, <<"Only admins can access _all_docs", + " of system databases.">>}) + end + end; + false -> + do_all_docs_req(Req, Db, Keys, NS) + end. + +is_restricted(_Db, <<"_local">>) -> + true; +is_restricted(Db, _) -> + couch_db:is_system_db(Db). + +is_public_fields_configured(Db) -> + DbName = ?b2l(couch_db:name(Db)), + case config:get("couch_httpd_auth", "authentication_db", "_users") of + DbName -> + UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"), + PublicFields = config:get("couch_httpd_auth", "public_fields"), + case {UsersDbPublic, PublicFields} of + {"true", PublicFields} when PublicFields =/= undefined -> + true; + {_, _} -> + false + end; + _ -> + false + end. + +do_all_docs_req(Req, Db, Keys, NS) -> + Args0 = couch_mrview_http:parse_body_and_query(Req, Keys), + Args1 = set_namespace(NS, Args0), + ETagFun = fun(Sig, Acc0) -> + check_view_etag(Sig, Acc0, Req) + end, + Args = Args1#mrargs{preflight_fun=ETagFun}, + {ok, Resp} = couch_httpd:etag_maybe(Req, fun() -> + Max = chttpd:chunked_response_buffer_size(), + VAcc0 = #vacc{db=Db, req=Req, threshold=Max}, + DbName = ?b2l(couch_db:name(Db)), + UsersDbName = config:get("couch_httpd_auth", + "authentication_db", + "_users"), + IsAdmin = is_admin(Db), + Callback = get_view_callback(DbName, UsersDbName, IsAdmin), + couch_mrview:query_all_docs(Db, Args, Callback, VAcc0) + end), + case is_record(Resp, vacc) of + true -> {ok, Resp#vacc.resp}; + _ -> {ok, Resp} + end. + +set_namespace(NS, #mrargs{extra = Extra} = Args) -> + Args#mrargs{extra = [{namespace, NS} | Extra]}. + +is_admin(Db) -> + case catch couch_db:check_is_admin(Db) of + {unauthorized, _} -> + false; + ok -> + true + end. + + +% admin users always get all fields +get_view_callback(_, _, true) -> + fun view_cb/2; +% if we are operating on the users db and we aren't +% admin, filter the view +get_view_callback(_DbName, _DbName, false) -> + fun filtered_view_cb/2; +% non _users databases get all fields +get_view_callback(_, _, _) -> + fun view_cb/2. + + +design_doc_view(Req, Db, DDoc, ViewName, Keys) -> + Args0 = parse_params(Req, Keys), + ETagFun = fun(Sig, Acc0) -> + check_view_etag(Sig, Acc0, Req) + end, + Args = Args0#mrargs{preflight_fun=ETagFun}, + {ok, Resp} = couch_httpd:etag_maybe(Req, fun() -> + Max = chttpd:chunked_response_buffer_size(), + VAcc0 = #vacc{db=Db, req=Req, threshold=Max}, + couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0) + end), + case is_record(Resp, vacc) of + true -> {ok, Resp#vacc.resp}; + _ -> {ok, Resp} + end. + + +multi_query_view(Req, Db, DDoc, ViewName, Queries) -> + Args0 = parse_params(Req, undefined), + {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0), + ArgQueries = lists:map(fun({Query}) -> + QueryArg = parse_params(Query, undefined, Args1), + couch_mrview_util:validate_args(Db, DDoc, QueryArg) + end, Queries), + {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() -> + Max = chttpd:chunked_response_buffer_size(), + VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n", threshold=Max}, + %% TODO: proper calculation of etag + Etag = [$", couch_uuids:new(), $"], + Headers = [{"ETag", Etag}], + FirstChunk = "{\"results\":[", + {ok, Resp0} = chttpd:start_delayed_json_response(VAcc0#vacc.req, 200, Headers, FirstChunk), + VAcc1 = VAcc0#vacc{resp=Resp0}, + VAcc2 = lists:foldl(fun(Args, Acc0) -> + {ok, Acc1} = couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, Acc0), + Acc1 + end, VAcc1, ArgQueries), + {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"), + {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), + {ok, VAcc2#vacc{resp=Resp2}} + end), + case is_record(Resp2, vacc) of + true -> {ok, Resp2#vacc.resp}; + _ -> {ok, Resp2} + end. + +filtered_view_cb({row, Row0}, Acc) -> + Row1 = lists:map(fun({doc, null}) -> + {doc, null}; + ({doc, Body}) -> + Doc = couch_users_db:strip_non_public_fields(#doc{body=Body}), + {doc, Doc#doc.body}; + (KV) -> + KV + end, Row0), + view_cb({row, Row1}, Acc); +filtered_view_cb(Obj, Acc) -> + view_cb(Obj, Acc). + + +%% these clauses start (and possibly end) the response +view_cb({error, Reason}, #vacc{resp=undefined}=Acc) -> + {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason), + {ok, Acc#vacc{resp=Resp}}; + +view_cb(complete, #vacc{resp=undefined}=Acc) -> + % Nothing in view + {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}), + {ok, Acc#vacc{resp=Resp}}; + +view_cb(Msg, #vacc{resp=undefined}=Acc) -> + %% Start response + Headers = [], + {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers), + view_cb(Msg, Acc#vacc{resp=Resp, should_close=true}); + +%% --------------------------------------------------- + +%% From here on down, the response has been started. + +view_cb({error, Reason}, #vacc{resp=Resp}=Acc) -> + {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason), + {ok, Acc#vacc{resp=Resp1}}; + +view_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) -> + % Finish view output and possibly end the response + {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max), + case Acc#vacc.should_close of + true -> + {ok, Resp2} = chttpd:end_delayed_json_response(Resp1), + {ok, Acc#vacc{resp=Resp2}}; + _ -> + {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false, + prepend=",\r\n", buffer=[], bufsize=0}} + end; + +view_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) -> + % Sending metadata as we've not sent it or any row yet + Parts = case couch_util:get_value(total, Meta) of + undefined -> []; + Total -> [io_lib:format("\"total_rows\":~p", [Total])] + end ++ case couch_util:get_value(offset, Meta) of + undefined -> []; + Offset -> [io_lib:format("\"offset\":~p", [Offset])] + end ++ case couch_util:get_value(update_seq, Meta) of + undefined -> []; + null -> + ["\"update_seq\":null"]; + UpdateSeq when is_integer(UpdateSeq) -> + [io_lib:format("\"update_seq\":~B", [UpdateSeq])]; + UpdateSeq when is_binary(UpdateSeq) -> + [io_lib:format("\"update_seq\":\"~s\"", [UpdateSeq])] + end ++ ["\"rows\":["], + Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"], + {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)), + {ok, AccOut#vacc{prepend="", meta_sent=true}}; + +view_cb({meta, _Meta}, #vacc{}=Acc) -> + %% ignore metadata + {ok, Acc}; + +view_cb({row, Row}, #vacc{meta_sent=false}=Acc) -> + %% sorted=false and row arrived before meta + % Adding another row + Chunk = [prepend_val(Acc), "{\"rows\":[\r\n", row_to_json(Row)], + maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk)); + +view_cb({row, Row}, #vacc{meta_sent=true}=Acc) -> + % Adding another row + Chunk = [prepend_val(Acc), row_to_json(Row)], + maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)). + + +maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len) + when Size > 0 andalso (Size + Len) > Max -> + #vacc{buffer = Buffer, resp = Resp} = Acc, + {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer), + {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}}; +maybe_flush_response(Acc0, Data, Len) -> + #vacc{buffer = Buf, bufsize = Size} = Acc0, + Acc = Acc0#vacc{ + prepend = ",\r\n", + buffer = [Buf | Data], + bufsize = Size + Len + }, + {ok, Acc}. + +prepend_val(#vacc{prepend=Prepend}) -> + case Prepend of + undefined -> + ""; + _ -> + Prepend + end. + + +row_to_json(Row) -> + Id = couch_util:get_value(id, Row), + row_to_json(Id, Row). + + +row_to_json(error, Row) -> + % Special case for _all_docs request with KEYS to + % match prior behavior. + Key = couch_util:get_value(key, Row), + Val = couch_util:get_value(value, Row), + Reason = couch_util:get_value(reason, Row), + ReasonProp = if Reason == undefined -> []; true -> + [{reason, Reason}] + end, + Obj = {[{key, Key}, {error, Val}] ++ ReasonProp}, + ?JSON_ENCODE(Obj); +row_to_json(Id0, Row) -> + Id = case Id0 of + undefined -> []; + Id0 -> [{id, Id0}] + end, + Key = couch_util:get_value(key, Row, null), + Val = couch_util:get_value(value, Row), + Doc = case couch_util:get_value(doc, Row) of + undefined -> []; + Doc0 -> [{doc, Doc0}] + end, + Obj = {Id ++ [{key, Key}, {value, Val}] ++ Doc}, + ?JSON_ENCODE(Obj). + + +parse_params(#httpd{}=Req, Keys) -> + parse_params(chttpd:qs(Req), Keys); +parse_params(Props, Keys) -> + Args = #mrargs{}, + parse_params(Props, Keys, Args). + + +parse_params(Props, Keys, Args) -> + parse_params(Props, Keys, Args, []). + +parse_params(Props, Keys, #mrargs{}=Args0, Options) -> + IsDecoded = lists:member(decoded, Options), + Args1 = case lists:member(keep_group_level, Options) of + true -> + Args0; + _ -> + % group_level set to undefined to detect if explicitly set by user + Args0#mrargs{keys=Keys, group=undefined, group_level=undefined} + end, + lists:foldl(fun({K, V}, Acc) -> + parse_param(K, V, Acc, IsDecoded) + end, Args1, Props). + + +parse_body_and_query(#httpd{method='POST'} = Req, Keys) -> + Props = chttpd:json_body_obj(Req), + parse_body_and_query(Req, Props, Keys); + +parse_body_and_query(Req, Keys) -> + parse_params(chttpd:qs(Req), Keys, #mrargs{keys=Keys, group=undefined, + group_level=undefined}, [keep_group_level]). + +parse_body_and_query(Req, {Props}, Keys) -> + Args = #mrargs{keys=Keys, group=undefined, group_level=undefined}, + BodyArgs = parse_params(Props, Keys, Args, [decoded]), + parse_params(chttpd:qs(Req), Keys, BodyArgs, [keep_group_level]). + +parse_param(Key, Val, Args, IsDecoded) when is_binary(Key) -> + parse_param(binary_to_list(Key), Val, Args, IsDecoded); +parse_param(Key, Val, Args, IsDecoded) -> + case Key of + "" -> + Args; + "reduce" -> + Args#mrargs{reduce=parse_boolean(Val)}; + "key" when IsDecoded -> + Args#mrargs{start_key=Val, end_key=Val}; + "key" -> + JsonKey = ?JSON_DECODE(Val), + Args#mrargs{start_key=JsonKey, end_key=JsonKey}; + "keys" when IsDecoded -> + Args#mrargs{keys=Val}; + "keys" -> + Args#mrargs{keys=?JSON_DECODE(Val)}; + "startkey" when IsDecoded -> + Args#mrargs{start_key=Val}; + "start_key" when IsDecoded -> + Args#mrargs{start_key=Val}; + "startkey" -> + Args#mrargs{start_key=?JSON_DECODE(Val)}; + "start_key" -> + Args#mrargs{start_key=?JSON_DECODE(Val)}; + "startkey_docid" -> + Args#mrargs{start_key_docid=couch_util:to_binary(Val)}; + "start_key_doc_id" -> + Args#mrargs{start_key_docid=couch_util:to_binary(Val)}; + "endkey" when IsDecoded -> + Args#mrargs{end_key=Val}; + "end_key" when IsDecoded -> + Args#mrargs{end_key=Val}; + "endkey" -> + Args#mrargs{end_key=?JSON_DECODE(Val)}; + "end_key" -> + Args#mrargs{end_key=?JSON_DECODE(Val)}; + "endkey_docid" -> + Args#mrargs{end_key_docid=couch_util:to_binary(Val)}; + "end_key_doc_id" -> + Args#mrargs{end_key_docid=couch_util:to_binary(Val)}; + "limit" -> + Args#mrargs{limit=parse_pos_int(Val)}; + "stale" when Val == "ok" orelse Val == <<"ok">> -> + Args#mrargs{stable=true, update=false}; + "stale" when Val == "update_after" orelse Val == <<"update_after">> -> + Args#mrargs{stable=true, update=lazy}; + "stale" -> + throw({query_parse_error, <<"Invalid value for `stale`.">>}); + "stable" when Val == "true" orelse Val == <<"true">> -> + Args#mrargs{stable=true}; + "stable" when Val == "false" orelse Val == <<"false">> -> + Args#mrargs{stable=false}; + "stable" -> + throw({query_parse_error, <<"Invalid value for `stable`.">>}); + "update" when Val == "true" orelse Val == <<"true">> -> + Args#mrargs{update=true}; + "update" when Val == "false" orelse Val == <<"false">> -> + Args#mrargs{update=false}; + "update" when Val == "lazy" orelse Val == <<"lazy">> -> + Args#mrargs{update=lazy}; + "update" -> + throw({query_parse_error, <<"Invalid value for `update`.">>}); + "descending" -> + case parse_boolean(Val) of + true -> Args#mrargs{direction=rev}; + _ -> Args#mrargs{direction=fwd} + end; + "skip" -> + Args#mrargs{skip=parse_pos_int(Val)}; + "group" -> + Args#mrargs{group=parse_boolean(Val)}; + "group_level" -> + Args#mrargs{group_level=parse_pos_int(Val)}; + "inclusive_end" -> + Args#mrargs{inclusive_end=parse_boolean(Val)}; + "include_docs" -> + Args#mrargs{include_docs=parse_boolean(Val)}; + "attachments" -> + case parse_boolean(Val) of + true -> + Opts = Args#mrargs.doc_options, + Args#mrargs{doc_options=[attachments|Opts]}; + false -> + Args + end; + "att_encoding_info" -> + case parse_boolean(Val) of + true -> + Opts = Args#mrargs.doc_options, + Args#mrargs{doc_options=[att_encoding_info|Opts]}; + false -> + Args + end; + "update_seq" -> + Args#mrargs{update_seq=parse_boolean(Val)}; + "conflicts" -> + Args#mrargs{conflicts=parse_boolean(Val)}; + "callback" -> + Args#mrargs{callback=couch_util:to_binary(Val)}; + "sorted" -> + Args#mrargs{sorted=parse_boolean(Val)}; + "partition" -> + Partition = couch_util:to_binary(Val), + couch_partition:validate_partition(Partition), + couch_mrview_util:set_extra(Args, partition, Partition); + _ -> + BKey = couch_util:to_binary(Key), + BVal = couch_util:to_binary(Val), + Args#mrargs{extra=[{BKey, BVal} | Args#mrargs.extra]} + end. + + +parse_boolean(true) -> + true; +parse_boolean(false) -> + false; + +parse_boolean(Val) when is_binary(Val) -> + parse_boolean(?b2l(Val)); + +parse_boolean(Val) -> + case string:to_lower(Val) of + "true" -> true; + "false" -> false; + _ -> + Msg = io_lib:format("Invalid boolean parameter: ~p", [Val]), + throw({query_parse_error, ?l2b(Msg)}) + end. + +parse_int(Val) when is_integer(Val) -> + Val; +parse_int(Val) -> + case (catch list_to_integer(Val)) of + IntVal when is_integer(IntVal) -> + IntVal; + _ -> + Msg = io_lib:format("Invalid value for integer: ~p", [Val]), + throw({query_parse_error, ?l2b(Msg)}) + end. + +parse_pos_int(Val) -> + case parse_int(Val) of + IntVal when IntVal >= 0 -> + IntVal; + _ -> + Fmt = "Invalid value for positive integer: ~p", + Msg = io_lib:format(Fmt, [Val]), + throw({query_parse_error, ?l2b(Msg)}) + end. + + +check_view_etag(Sig, Acc0, Req) -> + ETag = chttpd:make_etag(Sig), + case chttpd:etag_match(Req, ETag) of + true -> throw({etag_match, ETag}); + false -> {ok, Acc0#vacc{etag=ETag}} + end. diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.orig b/src/couch_mrview/src/couch_mrview_updater.erl.orig new file mode 100644 index 000000000..7d6823e6a --- /dev/null +++ b/src/couch_mrview/src/couch_mrview_updater.erl.orig @@ -0,0 +1,380 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_updater). + +-export([start_update/4, purge/4, process_doc/3, finish_update/1]). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-define(REM_VAL, removed). + +start_update(Partial, State, NumChanges, NumChangesDone) -> + MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000), + MaxItems = config:get_integer("view_updater", "queue_item_cap", 500), + QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}], + {ok, DocQueue} = couch_work_queue:new(QueueOpts), + {ok, WriteQueue} = couch_work_queue:new(QueueOpts), + InitState = State#mrst{ + first_build=State#mrst.update_seq==0, + partial_resp_pid=Partial, + doc_acc=[], + doc_queue=DocQueue, + write_queue=WriteQueue + }, + + Self = self(), + + MapFun = fun() -> + erlang:put(io_priority, + {view_update, State#mrst.db_name, State#mrst.idx_name}), + Progress = case NumChanges of + 0 -> 0; + _ -> (NumChangesDone * 100) div NumChanges + end, + couch_task_status:add_task([ + {indexer_pid, ?l2b(pid_to_list(Partial))}, + {type, indexer}, + {database, State#mrst.db_name}, + {design_document, State#mrst.idx_name}, + {progress, Progress}, + {changes_done, NumChangesDone}, + {total_changes, NumChanges} + ]), + couch_task_status:set_update_frequency(500), + map_docs(Self, InitState) + end, + WriteFun = fun() -> + erlang:put(io_priority, + {view_update, State#mrst.db_name, State#mrst.idx_name}), + write_results(Self, InitState) + end, + spawn_link(MapFun), + spawn_link(WriteFun), + + {ok, InitState}. + + +purge(_Db, PurgeSeq, PurgedIdRevs, State) -> + #mrst{ + id_btree=IdBtree, + views=Views, + partitioned=Partitioned + } = State, + + Ids = [Id || {Id, _Revs} <- PurgedIdRevs], + {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), + + MakeDictFun = fun + ({ok, {DocId, ViewNumRowKeys}}, DictAcc) -> + FoldFun = fun + ({ViewNum, {Key, Seq, _Op}}, DictAcc2) -> + dict:append(ViewNum, {Key, Seq, DocId}, DictAcc2); + ({ViewNum, RowKey0}, DictAcc2) -> + RowKey = if not Partitioned -> RowKey0; true -> + [{RK, _}] = inject_partition([{RowKey0, DocId}]), + RK + end, + dict:append(ViewNum, {RowKey, DocId}, DictAcc2) + end, + lists:foldl(FoldFun, DictAcc, ViewNumRowKeys); + ({not_found, _}, DictAcc) -> + DictAcc + end, + KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups), + + RemKeysFun = fun(#mrview{id_num=ViewId}=View) -> + ToRem = couch_util:dict_find(ViewId, KeysToRemove, []), + {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, [], ToRem), + NewPurgeSeq = case VBtree2 =/= View#mrview.btree of + true -> PurgeSeq; + _ -> View#mrview.purge_seq + end, + View#mrview{btree=VBtree2, purge_seq=NewPurgeSeq} + end, + + Views2 = lists:map(RemKeysFun, Views), + {ok, State#mrst{ + id_btree=IdBtree2, + views=Views2, + purge_seq=PurgeSeq + }}. + + +process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 -> + couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)), + process_doc(Doc, Seq, State#mrst{doc_acc=[]}); +process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) -> + {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}}; +% process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) -> +% {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}}; +process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) -> + {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}. + + +finish_update(#mrst{doc_acc=Acc}=State) -> + if Acc /= [] -> + couch_work_queue:queue(State#mrst.doc_queue, Acc); + true -> ok + end, + couch_work_queue:close(State#mrst.doc_queue), + receive + {new_state, NewState} -> + {ok, NewState#mrst{ + first_build=undefined, + partial_resp_pid=undefined, + doc_acc=undefined, + doc_queue=undefined, + write_queue=undefined, + qserver=nil + }} + end. + +make_deleted_body({Props}, Meta, Seq) -> + BodySp = couch_util:get_value(body_sp, Meta), + Result = [{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}], + case couch_util:get_value(<<"_access">>, Props) of + undefined -> Result; + Access -> [{<<"_access">>, Access} | Result] + end. + +map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) -> + erlang:put(io_priority, {view_update, DbName, IdxName}), + case couch_work_queue:dequeue(State0#mrst.doc_queue) of + closed -> + couch_query_servers:stop_doc_map(State0#mrst.qserver), + couch_work_queue:close(State0#mrst.write_queue); + {ok, Dequeued} -> + % Run all the non deleted docs through the view engine and + % then pass the results on to the writer process. + State1 = case State0#mrst.qserver of + nil -> start_query_server(State0); + _ -> State0 + end, + QServer = State1#mrst.qserver, + DocFun = fun + ({nil, Seq, _}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), Results}; + ({Id, Seq, deleted}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), [{Id, []} | Results]}; + ({Id, Seq, Doc}, {SeqAcc, Results}) -> + couch_stats:increment_counter([couchdb, mrview, map_doc]), + {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), + {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]} + end, + FoldFun = fun(Docs, Acc) -> + update_task(length(Docs)), + lists:foldl(DocFun, Acc, Docs) + end, + Results = lists:foldl(FoldFun, {0, []}, Dequeued), + couch_work_queue:queue(State1#mrst.write_queue, Results), + map_docs(Parent, State1) + end. + + +write_results(Parent, #mrst{} = State) -> + case accumulate_writes(State, State#mrst.write_queue, nil) of + stop -> + Parent ! {new_state, State}; + {Go, {Seq, ViewKVs, DocIdKeys}} -> + NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys), + if Go == stop -> + Parent ! {new_state, NewState}; + true -> + send_partial(NewState#mrst.partial_resp_pid, NewState), + write_results(Parent, NewState) + end + end. + + +start_query_server(State) -> + #mrst{ + language=Language, + lib=Lib, + views=Views + } = State, + Defs = [View#mrview.def || View <- Views], + {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib), + State#mrst{qserver=QServer}. + + +accumulate_writes(State, W, Acc0) -> + {Seq, ViewKVs, DocIdKVs} = case Acc0 of + nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []}; + _ -> Acc0 + end, + case couch_work_queue:dequeue(W) of + closed when Seq == 0 -> + stop; + closed -> + {stop, {Seq, ViewKVs, DocIdKVs}}; + {ok, Info} -> + {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs), + case accumulate_more(length(NewIds), Acc) of + true -> accumulate_writes(State, W, Acc); + false -> {ok, Acc} + end + end. + + +accumulate_more(NumDocIds, Acc) -> + % check if we have enough items now + MinItems = config:get("view_updater", "min_writer_items", "100"), + MinSize = config:get("view_updater", "min_writer_size", "16777216"), + CurrMem = ?term_size(Acc), + NumDocIds < list_to_integer(MinItems) + andalso CurrMem < list_to_integer(MinSize). + + +merge_results([], SeqAcc, ViewKVs, DocIdKeys) -> + {SeqAcc, ViewKVs, DocIdKeys}; +merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) -> + Fun = fun(RawResults, {VKV, DIK}) -> + merge_results(RawResults, VKV, DIK) + end, + {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results), + merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1). + + +merge_results({DocId, []}, ViewKVs, DocIdKeys) -> + {ViewKVs, [{DocId, []} | DocIdKeys]}; +merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) -> + JsonResults = couch_query_servers:raw_to_ejson(RawResults), + Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults], + case lists:flatten(Results) of + [] -> + {ViewKVs, [{DocId, []} | DocIdKeys]}; + _ -> + {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []), + {ViewKVs1, [ViewIdKeys | DocIdKeys]} + end. + + +insert_results(DocId, [], [], ViewKVs, ViewIdKeys) -> + {lists:reverse(ViewKVs), {DocId, ViewIdKeys}}; +insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) -> + CombineDupesFun = fun + ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) -> + {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys}; + ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) -> + {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys}; + ({Key, Value}, {Rest, IdKeys}) -> + {[{Key, Value} | Rest], [{Id, Key} | IdKeys]} + end, + InitAcc = {[], VIdKeys}, + couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)), + {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc, + lists:sort(KVs)), + FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs, + insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0). + + +write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) -> + #mrst{ + id_btree=IdBtree, + first_build=FirstBuild, + partitioned=Partitioned + } = State, + + {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild), + ToRemByView = collapse_rem_keys(ToRemove, dict:new()), + + UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs0}) -> + ToRem0 = couch_util:dict_find(ViewId, ToRemByView, []), + {KVs, ToRem} = case Partitioned of + true -> + KVs1 = inject_partition(KVs0), + ToRem1 = inject_partition(ToRem0), + {KVs1, ToRem1}; + false -> + {KVs0, ToRem0} + end, + {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem), + NewUpdateSeq = case VBtree2 =/= View#mrview.btree of + true -> UpdateSeq; + _ -> View#mrview.update_seq + end, + + View2 = View#mrview{btree=VBtree2, update_seq=NewUpdateSeq}, + maybe_notify(State, View2, KVs, ToRem), + View2 + end, + + State#mrst{ + views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs), + update_seq=UpdateSeq, + id_btree=IdBtree2 + }. + + +inject_partition(Rows) -> + lists:map(fun + ({{Key, DocId}, Value}) -> + % Adding a row to the view + {Partition, _} = couch_partition:extract(DocId), + {{{p, Partition, Key}, DocId}, Value}; + ({Key, DocId}) -> + % Removing a row based on values in id_tree + {Partition, _} = couch_partition:extract(DocId), + {{p, Partition, Key}, DocId} + end, Rows). + + +update_id_btree(Btree, DocIdKeys, true) -> + ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], + couch_btree:query_modify(Btree, [], ToAdd, []); +update_id_btree(Btree, DocIdKeys, _) -> + ToFind = [Id || {Id, _} <- DocIdKeys], + ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []], + ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []], + couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem). + + +collapse_rem_keys([], Acc) -> + Acc; +collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) -> + NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) -> + dict:append(ViewId, {Key, DocId}, Acc2) + end, Acc, ViewIdKeys), + collapse_rem_keys(Rest, NewAcc); +collapse_rem_keys([{not_found, _} | Rest], Acc) -> + collapse_rem_keys(Rest, Acc). + + +send_partial(Pid, State) when is_pid(Pid) -> + gen_server:cast(Pid, {new_state, State}); +send_partial(_, _) -> + ok. + + +update_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = case Total of + 0 -> + % updater restart after compaction finishes + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]). + + +maybe_notify(State, View, KVs, ToRem) -> + Updated = fun() -> + [Key || {{Key, _}, _} <- KVs] + end, + Removed = fun() -> + [Key || {Key, _DocId} <- ToRem] + end, + couch_index_plugin:index_update(State, View, Updated, Removed). diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.rej b/src/couch_mrview/src/couch_mrview_updater.erl.rej new file mode 100644 index 000000000..81a2ce15f --- /dev/null +++ b/src/couch_mrview/src/couch_mrview_updater.erl.rej @@ -0,0 +1,52 @@ +*************** +*** 192,202 **** + DocFun = fun + ({nil, Seq, _, _}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), Results}; +- ({Id, Seq, Rev, deleted}, {SeqAcc, Results}) -> +- {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]}; + ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) -> + couch_stats:increment_counter([couchdb, mrview, map_doc]), +- {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), + {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]} + end, + +--- 199,236 ---- + DocFun = fun + ({nil, Seq, _, _}, {SeqAcc, Results}) -> + {erlang:max(Seq, SeqAcc), Results}; ++ ({Id, Seq, Rev, #doc{deleted=true, body=Body, meta=Meta}}, {SeqAcc, Results}) -> ++ % _access needs deleted docs ++ case IdxName of ++ <<"_design/_access">> -> ++ % splice in seq ++ {Start, Rev1} = Rev, ++ Doc = #doc{ ++ id = Id, ++ revs = {Start, [Rev1]}, ++ body = {make_deleted_body(Body, Meta, Seq)}, %% todo: only keep _access and add _seq ++ deleted = true ++ }, ++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc), ++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}; ++ _Else -> ++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]} ++ end; + ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) -> + couch_stats:increment_counter([couchdb, mrview, map_doc]), ++ % couch_log:info("~nIdxName: ~p, Doc: ~p~n~n", [IdxName, Doc]), ++ Doc0 = case IdxName of ++ <<"_design/_access">> -> ++ % splice in seq ++ {Props} = Doc#doc.body, ++ BodySp = couch_util:get_value(body_sp, Doc#doc.meta), ++ Doc#doc{ ++ body = {Props++[{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}]} ++ }; ++ _Else -> ++ Doc ++ end, ++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc0), + {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]} + end, + diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index be75dd5e5..2bf168073 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -409,11 +409,11 @@ validate_args(Db, DDoc, Args0) -> validate_args(#mrst{} = State, Args0) -> Args = validate_args(Args0), - ViewPartitioned = State#mrst.partitioned, Partition = get_extra(Args, partition), + AllDocsAccess = get_extra(Args, all_docs_access, false), - case {ViewPartitioned, Partition} of + case {ViewPartitioned and not AllDocsAccess, Partition} of {true, undefined} -> Msg1 = <<"`partition` parameter is mandatory " "for queries to this view.">>, diff --git a/src/couch_mrview/src/couch_mrview_util.erl.orig b/src/couch_mrview/src/couch_mrview_util.erl.orig new file mode 100644 index 000000000..e971720c9 --- /dev/null +++ b/src/couch_mrview/src/couch_mrview_util.erl.orig @@ -0,0 +1,1177 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_util). + +-export([get_view/4, get_view_index_pid/4]). +-export([get_local_purge_doc_id/1, get_value_from_options/2]). +-export([verify_view_filename/1, get_signature_from_filename/1]). +-export([ddoc_to_mrst/2, init_state/4, reset_index/3]). +-export([make_header/1]). +-export([index_file/2, compaction_file/2, open_file/1]). +-export([delete_files/2, delete_index_file/2, delete_compaction_file/2]). +-export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]). +-export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). +-export([fold/4, fold_reduce/4]). +-export([temp_view_to_ddoc/1]). +-export([calculate_external_size/1]). +-export([calculate_active_size/1]). +-export([validate_all_docs_args/2, validate_args/1, validate_args/3]). +-export([maybe_load_doc/3, maybe_load_doc/4]). +-export([maybe_update_index_file/1]). +-export([extract_view/4, extract_view_reduce/1]). +-export([get_view_keys/1, get_view_queries/1]). +-export([set_view_type/3]). +-export([set_extra/3, get_extra/2, get_extra/3]). + +-define(MOD, couch_mrview_index). +-define(GET_VIEW_RETRY_COUNT, 1). +-define(GET_VIEW_RETRY_DELAY, 50). +-define(LOWEST_KEY, null). +-define(HIGHEST_KEY, {<<255, 255, 255, 255>>}). +-define(LOWEST(A, B), (if A < B -> A; true -> B end)). +-define(HIGHEST(A, B), (if A > B -> A; true -> B end)). + +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + + +get_local_purge_doc_id(Sig) -> + ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig). + + +get_value_from_options(Key, Options) -> + case couch_util:get_value(Key, Options) of + undefined -> + Reason = <<"'", Key/binary, "' must exists in options.">>, + throw({bad_request, Reason}); + Value -> Value + end. + + +verify_view_filename(FileName) -> + FilePathList = filename:split(FileName), + PureFN = lists:last(FilePathList), + case filename:extension(PureFN) of + ".view" -> + Sig = filename:basename(PureFN), + case [Ch || Ch <- Sig, not (((Ch >= $0) and (Ch =< $9)) + orelse ((Ch >= $a) and (Ch =< $f)) + orelse ((Ch >= $A) and (Ch =< $F)))] == [] of + true -> true; + false -> false + end; + _ -> false + end. + +get_signature_from_filename(FileName) -> + FilePathList = filename:split(FileName), + PureFN = lists:last(FilePathList), + filename:basename(PureFN, ".view"). + +get_view(Db, DDoc, ViewName, Args0) -> + case get_view_index_state(Db, DDoc, ViewName, Args0) of + {ok, State, Args2} -> + Ref = erlang:monitor(process, State#mrst.fd), + #mrst{language=Lang, views=Views} = State, + {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views), + check_range(Args3, view_cmp(View)), + Sig = view_sig(Db, State, View, Args3), + {ok, {Type, View, Ref}, Sig, Args3}; + ddoc_updated -> + ddoc_updated + end. + + +get_view_index_pid(Db, DDoc, ViewName, Args0) -> + ArgCheck = fun(InitState) -> + Args1 = set_view_type(Args0, ViewName, InitState#mrst.views), + {ok, validate_args(InitState, Args1)} + end, + couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck). + + +get_view_index_state(Db, DDoc, ViewName, Args0) -> + get_view_index_state(Db, DDoc, ViewName, Args0, ?GET_VIEW_RETRY_COUNT). + +get_view_index_state(_, DDoc, _, _, RetryCount) when RetryCount < 0 -> + couch_log:warning("DDoc '~s' recreated too frequently", [DDoc#doc.id]), + throw({get_view_state, exceeded_retry_count}); +get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) -> + try + {ok, Pid, Args} = get_view_index_pid(Db, DDoc, ViewName, Args0), + UpdateSeq = couch_util:with_db(Db, fun(WDb) -> + couch_db:get_update_seq(WDb) + end), + State = case Args#mrargs.update of + lazy -> + spawn(fun() -> + catch couch_index:get_state(Pid, UpdateSeq) + end), + couch_index:get_state(Pid, 0); + false -> + couch_index:get_state(Pid, 0); + _ -> + couch_index:get_state(Pid, UpdateSeq) + end, + case State of + {ok, State0} -> {ok, State0, Args}; + ddoc_updated -> ddoc_updated; + Else -> throw(Else) + end + catch + exit:{Reason, _} when Reason == noproc; Reason == normal -> + timer:sleep(?GET_VIEW_RETRY_DELAY), + get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount - 1); + error:{badmatch, Error} -> + throw(Error); + Error -> + throw(Error) + end. + + +ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) -> + MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) -> + case couch_util:get_value(<<"map">>, MRFuns) of + MapSrc when MapSrc /= undefined -> + RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null), + {ViewOpts} = couch_util:get_value(<<"options">>, MRFuns, {[]}), + View = case dict:find({MapSrc, ViewOpts}, DictBySrcAcc) of + {ok, View0} -> View0; + error -> #mrview{def=MapSrc, options=ViewOpts} + end, + {MapNames, RedSrcs} = case RedSrc of + null -> + MNames = [Name | View#mrview.map_names], + {MNames, View#mrview.reduce_funs}; + _ -> + RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs], + {View#mrview.map_names, RedFuns} + end, + View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs}, + dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc); + undefined -> + DictBySrcAcc + end; + ({Name, Else}, DictBySrcAcc) -> + couch_log:error("design_doc_to_view_group ~s views ~p", + [Name, Else]), + DictBySrcAcc + end, + {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}), + Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false), + + {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}), + BySrc = lists:foldl(MakeDict, dict:new(), RawViews), + + NumViews = fun({_, View}, N) -> + {View#mrview{id_num=N}, N+1} + end, + {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))), + + Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>), + Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}), + + IdxState = #mrst{ + db_name=DbName, + idx_name=Id, + lib=Lib, + views=Views, + language=Language, + design_opts=DesignOpts, + partitioned=Partitioned + }, + SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, + {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}. + + +set_view_type(_Args, _ViewName, []) -> + throw({not_found, missing_named_view}); +set_view_type(Args, ViewName, [View | Rest]) -> + RedNames = [N || {N, _} <- View#mrview.reduce_funs], + case lists:member(ViewName, RedNames) of + true -> + case Args#mrargs.reduce of + false -> Args#mrargs{view_type=map}; + _ -> Args#mrargs{view_type=red} + end; + false -> + case lists:member(ViewName, View#mrview.map_names) of + true -> Args#mrargs{view_type=map}; + false -> set_view_type(Args, ViewName, Rest) + end + end. + + +set_extra(#mrargs{} = Args, Key, Value) -> + Extra0 = Args#mrargs.extra, + Extra1 = lists:ukeysort(1, [{Key, Value} | Extra0]), + Args#mrargs{extra = Extra1}. + + +get_extra(#mrargs{} = Args, Key) -> + couch_util:get_value(Key, Args#mrargs.extra). + +get_extra(#mrargs{} = Args, Key, Default) -> + couch_util:get_value(Key, Args#mrargs.extra, Default). + + +extract_view(_Lang, _Args, _ViewName, []) -> + throw({not_found, missing_named_view}); +extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) -> + Names = View#mrview.map_names ++ [N || {N, _} <- View#mrview.reduce_funs], + case lists:member(Name, Names) of + true -> {map, View, Args}; + _ -> extract_view(Lang, Args, Name, Rest) + end; +extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) -> + RedNames = [N || {N, _} <- View#mrview.reduce_funs], + case lists:member(Name, RedNames) of + true -> {red, {index_of(Name, RedNames), Lang, View}, Args}; + false -> extract_view(Lang, Args, Name, Rest) + end. + + +view_sig(Db, State, View, #mrargs{include_docs=true}=Args) -> + BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}), + UpdateSeq = couch_db:get_update_seq(Db), + PurgeSeq = couch_db:get_purge_seq(Db), + Term = view_sig_term(BaseSig, UpdateSeq, PurgeSeq), + couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))); +view_sig(Db, State, {_Nth, _Lang, View}, Args) -> + view_sig(Db, State, View, Args); +view_sig(_Db, State, View, Args0) -> + Sig = State#mrst.sig, + UpdateSeq = View#mrview.update_seq, + PurgeSeq = View#mrview.purge_seq, + Args = Args0#mrargs{ + preflight_fun=undefined, + extra=[] + }, + Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, Args), + couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))). + +view_sig_term(BaseSig, UpdateSeq, PurgeSeq) -> + {BaseSig, UpdateSeq, PurgeSeq}. + +view_sig_term(BaseSig, UpdateSeq, PurgeSeq, Args) -> + {BaseSig, UpdateSeq, PurgeSeq, Args}. + + +init_state(Db, Fd, #mrst{views=Views}=State, nil) -> + PurgeSeq = couch_db:get_purge_seq(Db), + Header = #mrheader{ + seq=0, + purge_seq=PurgeSeq, + id_btree_state=nil, + view_states=[make_view_state(#mrview{}) || _ <- Views] + }, + init_state(Db, Fd, State, Header); +init_state(Db, Fd, State, Header) -> + #mrst{ + language=Lang, + views=Views + } = State, + #mrheader{ + seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=IdBtreeState, + view_states=ViewStates + } = maybe_update_header(Header), + + IdBtOpts = [ + {compression, couch_compress:get_compression_method()} + ], + {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts), + + OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end, + Views2 = lists:zipwith(OpenViewFun, ViewStates, Views), + + State#mrst{ + fd=Fd, + fd_monitor=erlang:monitor(process, Fd), + update_seq=Seq, + purge_seq=PurgeSeq, + id_btree=IdBtree, + views=Views2 + }. + +open_view(_Db, Fd, Lang, ViewState, View) -> + ReduceFun = make_reduce_fun(Lang, View#mrview.reduce_funs), + LessFun = maybe_define_less_fun(View), + Compression = couch_compress:get_compression_method(), + BTState = get_key_btree_state(ViewState), + ViewBtOpts = [ + {less, LessFun}, + {reduce, ReduceFun}, + {compression, Compression} + ], + {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts), + + View#mrview{btree=Btree, + update_seq=get_update_seq(ViewState), + purge_seq=get_purge_seq(ViewState)}. + + +temp_view_to_ddoc({Props}) -> + Language = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + Options = couch_util:get_value(<<"options">>, Props, {[]}), + View0 = [{<<"map">>, couch_util:get_value(<<"map">>, Props)}], + View1 = View0 ++ case couch_util:get_value(<<"reduce">>, Props) of + RedSrc when is_binary(RedSrc) -> [{<<"reduce">>, RedSrc}]; + _ -> [] + end, + DDoc = {[ + {<<"_id">>, couch_uuids:random()}, + {<<"language">>, Language}, + {<<"options">>, Options}, + {<<"views">>, {[ + {<<"temp">>, {View1}} + ]}} + ]}, + couch_doc:from_json_obj(DDoc). + + +get_row_count(#mrview{btree=Bt}) -> + Count = case couch_btree:full_reduce(Bt) of + {ok, {Count0, _Reds, _}} -> Count0; + {ok, {Count0, _Reds}} -> Count0 + end, + {ok, Count}. + + +all_docs_reduce_to_count(Reductions) -> + Reduce = fun couch_bt_engine:id_tree_reduce/2, + {Count, _, _} = couch_btree:final_reduce(Reduce, Reductions), + Count. + +reduce_to_count(nil) -> + 0; +reduce_to_count(Reductions) -> + CountReduceFun = fun count_reduce/2, + FinalReduction = couch_btree:final_reduce(CountReduceFun, Reductions), + get_count(FinalReduction). + + +fold(#mrview{btree=Bt}, Fun, Acc, Opts) -> + WrapperFun = fun(KV, Reds, Acc2) -> + fold_fun(Fun, expand_dups([KV], []), Reds, Acc2) + end, + {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts). + +fold_fun(_Fun, [], _, Acc) -> + {ok, Acc}; +fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) -> + case Fun(KV, {KVReds, Reds}, Acc) of + {ok, Acc2} -> + fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2); + {stop, Acc2} -> + {stop, Acc2} + end. + + +fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) -> + #mrview{ + btree=Bt, + reduce_funs=RedFuns + } = View, + + ReduceFun = make_user_reds_reduce_fun(Lang, RedFuns, NthRed), + + WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) -> + FinalReduction = couch_btree:final_reduce(ReduceFun, PartialReds), + UserReductions = get_user_reds(FinalReduction), + Fun(GroupedKey, lists:nth(NthRed, UserReductions), Acc0) + end, + + couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options). + + +validate_args(Db, DDoc, Args0) -> + {ok, State} = couch_mrview_index:init(Db, DDoc), + Args1 = apply_limit(State#mrst.partitioned, Args0), + validate_args(State, Args1). + + +validate_args(#mrst{} = State, Args0) -> + Args = validate_args(Args0), + + ViewPartitioned = State#mrst.partitioned, + Partition = get_extra(Args, partition), + + case {ViewPartitioned, Partition} of + {true, undefined} -> + Msg1 = <<"`partition` parameter is mandatory " + "for queries to this view.">>, + mrverror(Msg1); + {true, _} -> + apply_partition(Args, Partition); + {false, undefined} -> + Args; + {false, Value} when is_binary(Value) -> + Msg2 = <<"`partition` parameter is not " + "supported in this design doc">>, + mrverror(Msg2) + end. + + +apply_limit(ViewPartitioned, Args) -> + LimitType = case ViewPartitioned of + true -> "partition_query_limit"; + false -> "query_limit" + end, + + MaxLimit = config:get_integer("query_server_config", + LimitType, ?MAX_VIEW_LIMIT), + + % Set the highest limit possible if a user has not + % specified a limit + Args1 = case Args#mrargs.limit == ?MAX_VIEW_LIMIT of + true -> Args#mrargs{limit = MaxLimit}; + false -> Args + end, + + if Args1#mrargs.limit =< MaxLimit -> Args1; true -> + Fmt = "Limit is too large, must not exceed ~p", + mrverror(io_lib:format(Fmt, [MaxLimit])) + end. + + +validate_all_docs_args(Db, Args0) -> + Args = validate_args(Args0), + + DbPartitioned = couch_db:is_partitioned(Db), + Partition = get_extra(Args, partition), + + case {DbPartitioned, Partition} of + {false, <<_/binary>>} -> + mrverror(<<"`partition` parameter is not supported on this db">>); + {_, <<_/binary>>} -> + Args1 = apply_limit(true, Args), + apply_all_docs_partition(Args1, Partition); + _ -> + Args + end. + + +validate_args(Args) -> + GroupLevel = determine_group_level(Args), + Reduce = Args#mrargs.reduce, + case Reduce == undefined orelse is_boolean(Reduce) of + true -> ok; + _ -> mrverror(<<"Invalid `reduce` value.">>) + end, + + case {Args#mrargs.view_type, Reduce} of + {map, true} -> mrverror(<<"Reduce is invalid for map-only views.">>); + _ -> ok + end, + + case {Args#mrargs.view_type, GroupLevel, Args#mrargs.keys} of + {red, exact, _} -> ok; + {red, _, KeyList} when is_list(KeyList) -> + Msg = <<"Multi-key fetchs for reduce views must use `group=true`">>, + mrverror(Msg); + _ -> ok + end, + + case Args#mrargs.keys of + Keys when is_list(Keys) -> ok; + undefined -> ok; + _ -> mrverror(<<"`keys` must be an array of strings.">>) + end, + + case {Args#mrargs.keys, Args#mrargs.start_key, + Args#mrargs.end_key} of + {undefined, _, _} -> ok; + {[], _, _} -> ok; + {[_|_], undefined, undefined} -> ok; + _ -> mrverror(<<"`keys` is incompatible with `key`" + ", `start_key` and `end_key`">>) + end, + + case Args#mrargs.start_key_docid of + undefined -> ok; + SKDocId0 when is_binary(SKDocId0) -> ok; + _ -> mrverror(<<"`start_key_docid` must be a string.">>) + end, + + case Args#mrargs.end_key_docid of + undefined -> ok; + EKDocId0 when is_binary(EKDocId0) -> ok; + _ -> mrverror(<<"`end_key_docid` must be a string.">>) + end, + + case Args#mrargs.direction of + fwd -> ok; + rev -> ok; + _ -> mrverror(<<"Invalid direction.">>) + end, + + case {Args#mrargs.limit >= 0, Args#mrargs.limit == undefined} of + {true, _} -> ok; + {_, true} -> ok; + _ -> mrverror(<<"`limit` must be a positive integer.">>) + end, + + case Args#mrargs.skip < 0 of + true -> mrverror(<<"`skip` must be >= 0">>); + _ -> ok + end, + + case {Args#mrargs.view_type, GroupLevel} of + {red, exact} -> ok; + {_, 0} -> ok; + {red, Int} when is_integer(Int), Int >= 0 -> ok; + {red, _} -> mrverror(<<"`group_level` must be >= 0">>); + {map, _} -> mrverror(<<"Invalid use of grouping on a map view.">>) + end, + + case Args#mrargs.stable of + true -> ok; + false -> ok; + _ -> mrverror(<<"Invalid value for `stable`.">>) + end, + + case Args#mrargs.update of + true -> ok; + false -> ok; + lazy -> ok; + _ -> mrverror(<<"Invalid value for `update`.">>) + end, + + case is_boolean(Args#mrargs.inclusive_end) of + true -> ok; + _ -> mrverror(<<"Invalid value for `inclusive_end`.">>) + end, + + case {Args#mrargs.view_type, Args#mrargs.include_docs} of + {red, true} -> mrverror(<<"`include_docs` is invalid for reduce">>); + {_, ID} when is_boolean(ID) -> ok; + _ -> mrverror(<<"Invalid value for `include_docs`">>) + end, + + case {Args#mrargs.view_type, Args#mrargs.conflicts} of + {_, undefined} -> ok; + {map, V} when is_boolean(V) -> ok; + {red, undefined} -> ok; + {map, _} -> mrverror(<<"Invalid value for `conflicts`.">>); + {red, _} -> mrverror(<<"`conflicts` is invalid for reduce views.">>) + end, + + SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of + {fwd, undefined} -> <<>>; + {rev, undefined} -> <<255>>; + {_, SKDocId1} -> SKDocId1 + end, + + EKDocId = case {Args#mrargs.direction, Args#mrargs.end_key_docid} of + {fwd, undefined} -> <<255>>; + {rev, undefined} -> <<>>; + {_, EKDocId1} -> EKDocId1 + end, + + case is_boolean(Args#mrargs.sorted) of + true -> ok; + _ -> mrverror(<<"Invalid value for `sorted`.">>) + end, + + case get_extra(Args, partition) of + undefined -> ok; + Partition when is_binary(Partition), Partition /= <<>> -> ok; + _ -> mrverror(<<"Invalid value for `partition`.">>) + end, + + Args#mrargs{ + start_key_docid=SKDocId, + end_key_docid=EKDocId, + group_level=GroupLevel + }. + + +determine_group_level(#mrargs{group=undefined, group_level=undefined}) -> + 0; +determine_group_level(#mrargs{group=false, group_level=undefined}) -> + 0; +determine_group_level(#mrargs{group=false, group_level=Level}) when Level > 0 -> + mrverror(<<"Can't specify group=false and group_level>0 at the same time">>); +determine_group_level(#mrargs{group=true, group_level=undefined}) -> + exact; +determine_group_level(#mrargs{group_level=GroupLevel}) -> + GroupLevel. + +apply_partition(#mrargs{keys=[{p, _, _} | _]} = Args, _Partition) -> + Args; % already applied + +apply_partition(#mrargs{keys=Keys} = Args, Partition) when Keys /= undefined -> + Args#mrargs{keys=[{p, Partition, K} || K <- Keys]}; + +apply_partition(#mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args, _Partition) -> + Args; % already applied. + +apply_partition(Args, Partition) -> + #mrargs{ + direction = Dir, + start_key = StartKey, + end_key = EndKey + } = Args, + + {DefSK, DefEK} = case Dir of + fwd -> {?LOWEST_KEY, ?HIGHEST_KEY}; + rev -> {?HIGHEST_KEY, ?LOWEST_KEY} + end, + + SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end, + EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end, + + Args#mrargs{ + start_key = {p, Partition, SK0}, + end_key = {p, Partition, EK0} + }. + +%% all_docs is special as it's not really a view and is already +%% effectively partitioned as the partition is a prefix of all keys. +apply_all_docs_partition(#mrargs{} = Args, Partition) -> + #mrargs{ + direction = Dir, + start_key = StartKey, + end_key = EndKey + } = Args, + + {DefSK, DefEK} = case Dir of + fwd -> + { + couch_partition:start_key(Partition), + couch_partition:end_key(Partition) + }; + rev -> + { + couch_partition:end_key(Partition), + couch_partition:start_key(Partition) + } + end, + + SK0 = if StartKey == undefined -> DefSK; true -> StartKey end, + EK0 = if EndKey == undefined -> DefEK; true -> EndKey end, + + {SK1, EK1} = case Dir of + fwd -> {?HIGHEST(DefSK, SK0), ?LOWEST(DefEK, EK0)}; + rev -> {?LOWEST(DefSK, SK0), ?HIGHEST(DefEK, EK0)} + end, + + Args#mrargs{ + start_key = SK1, + end_key = EK1 + }. + + +check_range(#mrargs{start_key=undefined}, _Cmp) -> + ok; +check_range(#mrargs{end_key=undefined}, _Cmp) -> + ok; +check_range(#mrargs{start_key=K, end_key=K}, _Cmp) -> + ok; +check_range(Args, Cmp) -> + #mrargs{ + direction=Dir, + start_key=SK, + start_key_docid=SKD, + end_key=EK, + end_key_docid=EKD + } = Args, + case {Dir, Cmp({SK, SKD}, {EK, EKD})} of + {fwd, false} -> + throw({query_parse_error, + <<"No rows can match your key range, reverse your ", + "start_key and end_key or set descending=true">>}); + {rev, true} -> + throw({query_parse_error, + <<"No rows can match your key range, reverse your ", + "start_key and end_key or set descending=false">>}); + _ -> ok + end. + + +view_cmp({_Nth, _Lang, View}) -> + view_cmp(View); +view_cmp(View) -> + fun(A, B) -> couch_btree:less(View#mrview.btree, A, B) end. + + +make_header(State) -> + #mrst{ + update_seq=Seq, + purge_seq=PurgeSeq, + id_btree=IdBtree, + views=Views + } = State, + + #mrheader{ + seq=Seq, + purge_seq=PurgeSeq, + id_btree_state=get_btree_state(IdBtree), + view_states=[make_view_state(V) || V <- Views] + }. + + +index_file(DbName, Sig) -> + FileName = couch_index_util:hexsig(Sig) ++ ".view", + couch_index_util:index_file(mrview, DbName, FileName). + + +compaction_file(DbName, Sig) -> + FileName = couch_index_util:hexsig(Sig) ++ ".compact.view", + couch_index_util:index_file(mrview, DbName, FileName). + + +open_file(FName) -> + case couch_file:open(FName, [nologifmissing]) of + {ok, Fd} -> {ok, Fd}; + {error, enoent} -> couch_file:open(FName, [create]); + Error -> Error + end. + + +delete_files(DbName, Sig) -> + delete_index_file(DbName, Sig), + delete_compaction_file(DbName, Sig). + + +delete_index_file(DbName, Sig) -> + delete_file(index_file(DbName, Sig)). + + +delete_compaction_file(DbName, Sig) -> + delete_file(compaction_file(DbName, Sig)). + + +delete_file(FName) -> + case filelib:is_file(FName) of + true -> + RootDir = couch_index_util:root_dir(), + couch_file:delete(RootDir, FName); + _ -> + ok + end. + + +reset_index(Db, Fd, #mrst{sig=Sig}=State) -> + ok = couch_file:truncate(Fd, 0), + ok = couch_file:write_header(Fd, {Sig, nil}), + init_state(Db, Fd, reset_state(State), nil). + + +reset_state(State) -> + State#mrst{ + fd=nil, + qserver=nil, + update_seq=0, + id_btree=nil, + views=[View#mrview{btree=nil} || View <- State#mrst.views] + }. + + +all_docs_key_opts(#mrargs{extra = Extra} = Args) -> + all_docs_key_opts(Args, Extra). + +all_docs_key_opts(#mrargs{keys=undefined}=Args, Extra) -> + all_docs_key_opts(Args#mrargs{keys=[]}, Extra); +all_docs_key_opts(#mrargs{keys=[], direction=Dir}=Args, Extra) -> + [[{dir, Dir}] ++ ad_skey_opts(Args) ++ ad_ekey_opts(Args) ++ Extra]; +all_docs_key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) -> + lists:map(fun(K) -> + [{dir, Dir}] + ++ ad_skey_opts(Args#mrargs{start_key=K}) + ++ ad_ekey_opts(Args#mrargs{end_key=K}) + ++ Extra + end, Keys). + + +ad_skey_opts(#mrargs{start_key=SKey}) when is_binary(SKey) -> + [{start_key, SKey}]; +ad_skey_opts(#mrargs{start_key_docid=SKeyDocId}) -> + [{start_key, SKeyDocId}]. + + +ad_ekey_opts(#mrargs{end_key=EKey}=Args) when is_binary(EKey) -> + Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end, + [{Type, EKey}]; +ad_ekey_opts(#mrargs{end_key_docid=EKeyDocId}=Args) -> + Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end, + [{Type, EKeyDocId}]. + + +key_opts(Args) -> + key_opts(Args, []). + +key_opts(#mrargs{keys=undefined, direction=Dir}=Args, Extra) -> + [[{dir, Dir}] ++ skey_opts(Args) ++ ekey_opts(Args) ++ Extra]; +key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) -> + lists:map(fun(K) -> + [{dir, Dir}] + ++ skey_opts(Args#mrargs{start_key=K}) + ++ ekey_opts(Args#mrargs{end_key=K}) + ++ Extra + end, Keys). + + +skey_opts(#mrargs{start_key=undefined}) -> + []; +skey_opts(#mrargs{start_key=SKey, start_key_docid=SKeyDocId}) -> + [{start_key, {SKey, SKeyDocId}}]. + + +ekey_opts(#mrargs{end_key=undefined}) -> + []; +ekey_opts(#mrargs{end_key=EKey, end_key_docid=EKeyDocId}=Args) -> + case Args#mrargs.inclusive_end of + true -> [{end_key, {EKey, EKeyDocId}}]; + false -> [{end_key_gt, {EKey, reverse_key_default(EKeyDocId)}}] + end. + + +reverse_key_default(<<>>) -> <<255>>; +reverse_key_default(<<255>>) -> <<>>; +reverse_key_default(Key) -> Key. + + +reduced_external_size(Tree) -> + case couch_btree:full_reduce(Tree) of + {ok, {_, _, Size}} -> Size; + % return 0 for versions of the reduce function without Size + {ok, {_, _}} -> 0 + end. + + +calculate_external_size(Views) -> + SumFun = fun + (#mrview{btree=nil}, Acc) -> + Acc; + (#mrview{btree=Bt}, Acc) -> + Acc + reduced_external_size(Bt) + end, + {ok, lists:foldl(SumFun, 0, Views)}. + + +calculate_active_size(Views) -> + FoldFun = fun + (#mrview{btree=nil}, Acc) -> + Acc; + (#mrview{btree=Bt}, Acc) -> + Acc + couch_btree:size(Bt) + end, + {ok, lists:foldl(FoldFun, 0, Views)}. + + +detuple_kvs([], Acc) -> + lists:reverse(Acc); +detuple_kvs([KV | Rest], Acc) -> + {{Key,Id},Value} = KV, + NKV = [[Key, Id], Value], + detuple_kvs(Rest, [NKV | Acc]). + + +expand_dups([], Acc) -> + lists:reverse(Acc); +expand_dups([{Key, {dups, Vals}} | Rest], Acc) -> + Expanded = [{Key, Val} || Val <- Vals], + expand_dups(Rest, Expanded ++ Acc); +expand_dups([KV | Rest], Acc) -> + expand_dups(Rest, [KV | Acc]). + + +maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) -> + []; +maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true, doc_options=Opts}) -> + doc_row(couch_index_util:load_doc(Db, DI, [conflicts]), Opts); +maybe_load_doc(Db, #doc_info{}=DI, #mrargs{doc_options=Opts}) -> + doc_row(couch_index_util:load_doc(Db, DI, []), Opts). + + +maybe_load_doc(_Db, _Id, _Val, #mrargs{include_docs=false}) -> + []; +maybe_load_doc(Db, Id, Val, #mrargs{conflicts=true, doc_options=Opts}) -> + doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), [conflicts]), Opts); +maybe_load_doc(Db, Id, Val, #mrargs{doc_options=Opts}) -> + doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), []), Opts). + + +doc_row(null, _Opts) -> + [{doc, null}]; +doc_row(Doc, Opts) -> + [{doc, couch_doc:to_json_obj(Doc, Opts)}]. + + +docid_rev(Id, {Props}) -> + DocId = couch_util:get_value(<<"_id">>, Props, Id), + Rev = case couch_util:get_value(<<"_rev">>, Props, nil) of + nil -> nil; + Rev0 -> couch_doc:parse_rev(Rev0) + end, + {DocId, Rev}; +docid_rev(Id, _) -> + {Id, nil}. + + +index_of(Key, List) -> + index_of(Key, List, 1). + + +index_of(_, [], _) -> + throw({error, missing_named_view}); +index_of(Key, [Key | _], Idx) -> + Idx; +index_of(Key, [_ | Rest], Idx) -> + index_of(Key, Rest, Idx+1). + + +mrverror(Mesg) -> + throw({query_parse_error, Mesg}). + + +%% Updates 2.x view files to 3.x or later view files +%% transparently, the first time the 2.x view file is opened by +%% 3.x or later. +%% +%% Here's how it works: +%% +%% Before opening a view index, +%% If no matching index file is found in the new location: +%% calculate the <= 2.x view signature +%% if a file with that signature lives in the old location +%% rename it to the new location with the new signature in the name. +%% Then proceed to open the view index as usual. + +maybe_update_index_file(State) -> + DbName = State#mrst.db_name, + NewIndexFile = index_file(DbName, State#mrst.sig), + % open in read-only mode so we don't create + % the file if it doesn't exist. + case file:open(NewIndexFile, [read, raw]) of + {ok, Fd_Read} -> + % the new index file exists, there is nothing to do here. + file:close(Fd_Read); + _Error -> + update_index_file(State) + end. + +update_index_file(State) -> + Sig = sig_vsn_2x(State), + DbName = State#mrst.db_name, + FileName = couch_index_util:hexsig(Sig) ++ ".view", + IndexFile = couch_index_util:index_file("mrview", DbName, FileName), + + % If we have an old index, rename it to the new position. + case file:read_file_info(IndexFile) of + {ok, _FileInfo} -> + % Crash if the rename fails for any reason. + % If the target exists, e.g. the next request will find the + % new file and we are good. We might need to catch this + % further up to avoid a full server crash. + NewIndexFile = index_file(DbName, State#mrst.sig), + couch_log:notice("Attempting to update legacy view index file" + " from ~p to ~s", [IndexFile, NewIndexFile]), + ok = filelib:ensure_dir(NewIndexFile), + ok = file:rename(IndexFile, NewIndexFile), + couch_log:notice("Successfully updated legacy view index file" + " ~s", [IndexFile]), + Sig; + {error, enoent} -> + % Ignore missing index file + ok; + {error, Reason} -> + couch_log:error("Failed to update legacy view index file" + " ~s : ~s", [IndexFile, file:format_error(Reason)]), + ok + end. + +sig_vsn_2x(State) -> + #mrst{ + lib = Lib, + language = Language, + design_opts = DesignOpts + } = State, + SI = proplists:get_value(<<"seq_indexed">>, DesignOpts, false), + KSI = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false), + Views = [old_view_format(V, SI, KSI) || V <- State#mrst.views], + SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)}, + couch_hash:md5_hash(term_to_binary(SigInfo)). + +old_view_format(View, SI, KSI) -> +{ + mrview, + View#mrview.id_num, + View#mrview.update_seq, + View#mrview.purge_seq, + View#mrview.map_names, + View#mrview.reduce_funs, + View#mrview.def, + View#mrview.btree, + nil, + nil, + SI, + KSI, + View#mrview.options +}. + +maybe_update_header(#mrheader{} = Header) -> + Header; +maybe_update_header(Header) when tuple_size(Header) == 6 -> + #mrheader{ + seq = element(2, Header), + purge_seq = element(3, Header), + id_btree_state = element(4, Header), + view_states = [make_view_state(S) || S <- element(6, Header)] + }. + +%% End of <= 2.x upgrade code. + +make_view_state(#mrview{} = View) -> + BTState = get_btree_state(View#mrview.btree), + { + BTState, + View#mrview.update_seq, + View#mrview.purge_seq + }; +make_view_state({BTState, _SeqBTState, _KSeqBTState, UpdateSeq, PurgeSeq}) -> + {BTState, UpdateSeq, PurgeSeq}; +make_view_state(nil) -> + {nil, 0, 0}. + + +get_key_btree_state(ViewState) -> + element(1, ViewState). + +get_update_seq(ViewState) -> + element(2, ViewState). + +get_purge_seq(ViewState) -> + element(3, ViewState). + +get_count(Reduction) -> + element(1, Reduction). + +get_user_reds(Reduction) -> + element(2, Reduction). + + +% This is for backwards compatibility for seq btree reduces +get_external_size_reds(Reduction) when is_integer(Reduction) -> + 0; + +get_external_size_reds(Reduction) when tuple_size(Reduction) == 2 -> + 0; + +get_external_size_reds(Reduction) when tuple_size(Reduction) == 3 -> + element(3, Reduction). + + +make_reduce_fun(Lang, ReduceFuns) -> + FunSrcs = [FunSrc || {_, FunSrc} <- ReduceFuns], + fun + (reduce, KVs0) -> + KVs = detuple_kvs(expand_dups(KVs0, []), []), + {ok, Result} = couch_query_servers:reduce(Lang, FunSrcs, KVs), + ExternalSize = kv_external_size(KVs, Result), + {length(KVs), Result, ExternalSize}; + (rereduce, Reds) -> + ExtractFun = fun(Red, {CountsAcc0, URedsAcc0, ExtAcc0}) -> + CountsAcc = CountsAcc0 + get_count(Red), + URedsAcc = lists:append(URedsAcc0, [get_user_reds(Red)]), + ExtAcc = ExtAcc0 + get_external_size_reds(Red), + {CountsAcc, URedsAcc, ExtAcc} + end, + {Counts, UReds, ExternalSize} = lists:foldl(ExtractFun, + {0, [], 0}, Reds), + {ok, Result} = couch_query_servers:rereduce(Lang, FunSrcs, UReds), + {Counts, Result, ExternalSize} + end. + + +maybe_define_less_fun(#mrview{options = Options}) -> + case couch_util:get_value(<<"collation">>, Options) of + <<"raw">> -> undefined; + _ -> fun couch_ejson_compare:less_json_ids/2 + end. + + +count_reduce(reduce, KVs) -> + CountFun = fun + ({_, {dups, Vals}}, Acc) -> Acc + length(Vals); + (_, Acc) -> Acc + 1 + end, + Count = lists:foldl(CountFun, 0, KVs), + {Count, []}; +count_reduce(rereduce, Reds) -> + CountFun = fun(Red, Acc) -> + Acc + get_count(Red) + end, + Count = lists:foldl(CountFun, 0, Reds), + {Count, []}. + + +make_user_reds_reduce_fun(Lang, ReduceFuns, NthRed) -> + LPad = lists:duplicate(NthRed - 1, []), + RPad = lists:duplicate(length(ReduceFuns) - NthRed, []), + {_, FunSrc} = lists:nth(NthRed, ReduceFuns), + fun + (reduce, KVs0) -> + KVs = detuple_kvs(expand_dups(KVs0, []), []), + {ok, Result} = couch_query_servers:reduce(Lang, [FunSrc], KVs), + {0, LPad ++ Result ++ RPad}; + (rereduce, Reds) -> + ExtractFun = fun(Reds0) -> + [lists:nth(NthRed, get_user_reds(Reds0))] + end, + UReds = lists:map(ExtractFun, Reds), + {ok, Result} = couch_query_servers:rereduce(Lang, [FunSrc], UReds), + {0, LPad ++ Result ++ RPad} + end. + + +get_btree_state(nil) -> + nil; +get_btree_state(#btree{} = Btree) -> + couch_btree:get_state(Btree). + + +extract_view_reduce({red, {N, _Lang, #mrview{reduce_funs=Reds}}, _Ref}) -> + {_Name, FunSrc} = lists:nth(N, Reds), + FunSrc. + + +get_view_keys({Props}) -> + case couch_util:get_value(<<"keys">>, Props) of + undefined -> + undefined; + Keys when is_list(Keys) -> + Keys; + _ -> + throw({bad_request, "`keys` member must be an array."}) + end. + + +get_view_queries({Props}) -> + case couch_util:get_value(<<"queries">>, Props) of + undefined -> + undefined; + Queries when is_list(Queries) -> + Queries; + _ -> + throw({bad_request, "`queries` member must be an array."}) + end. + + +kv_external_size(KVList, Reduction) -> + lists:foldl(fun([[Key, _], Value], Acc) -> + ?term_size(Key) + ?term_size(Value) + Acc + end, ?term_size(Reduction), KVList). diff --git a/src/couch_mrview/src/couch_mrview_util.erl.rej b/src/couch_mrview/src/couch_mrview_util.erl.rej new file mode 100644 index 000000000..2bcf1262c --- /dev/null +++ b/src/couch_mrview/src/couch_mrview_util.erl.rej @@ -0,0 +1,16 @@ +*************** +*** 20,25 **** + -export([index_file/2, compaction_file/2, open_file/1]). + -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]). + -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]). + -export([get_view_changes_count/1]). + -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). + -export([fold/4, fold_reduce/4]). +--- 20,26 ---- + -export([index_file/2, compaction_file/2, open_file/1]). + -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]). + -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]). ++ -export([get_access_row_count/2]). + -export([get_view_changes_count/1]). + -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]). + -export([fold/4, fold_reduce/4]). |