summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan Lehnardt <jan@apache.org>2020-06-12 18:49:05 +0200
committerJan Lehnardt <jan@apache.org>2020-07-10 19:08:52 +0200
commit3661c0fca3b35d86a7fde748ef7c6d119f36cd02 (patch)
treefc4098eea2d55333803056113e74319fc698678e
parent7ffcd412556e942c006eeb70cd58ea7c5d23436d (diff)
downloadcouchdb-3661c0fca3b35d86a7fde748ef7c6d119f36cd02.tar.gz
fix: all_docs on partitioned dbs, override partition requirement on faked access all docs query
-rw-r--r--src/couch/src/couch_db.erl1
-rw-r--r--src/couch_mrview/include/couch_mrview.hrl.orig110
-rw-r--r--src/couch_mrview/src/couch_mrview.erl8
-rw-r--r--src/couch_mrview/src/couch_mrview.erl.orig701
-rw-r--r--src/couch_mrview/src/couch_mrview_http.erl.orig640
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl.orig380
-rw-r--r--src/couch_mrview/src/couch_mrview_updater.erl.rej52
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl4
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl.orig1177
-rw-r--r--src/couch_mrview/src/couch_mrview_util.erl.rej16
10 files changed, 3082 insertions, 7 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index ecd456c37..b315f07c1 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -1792,7 +1792,6 @@ open_doc_revs_int(Db, IdRevs, Options) ->
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_db_engine:open_local_docs(Db, [Id]) of
[#doc{} = Doc] ->
- couch_log:info("~nopen_doc_int: Doc: ~p~n", [Doc]),
case Doc#doc.body of
{ Body } ->
Access = couch_util:get_value(<<"_access">>, Body),
diff --git a/src/couch_mrview/include/couch_mrview.hrl.orig b/src/couch_mrview/include/couch_mrview.hrl.orig
new file mode 100644
index 000000000..bb0ab0b46
--- /dev/null
+++ b/src/couch_mrview/include/couch_mrview.hrl.orig
@@ -0,0 +1,110 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-record(mrst, {
+ sig=nil,
+ fd=nil,
+ fd_monitor,
+ db_name,
+ idx_name,
+ language,
+ design_opts=[],
+ partitioned=false,
+ lib,
+ views,
+ id_btree=nil,
+ update_seq=0,
+ purge_seq=0,
+ first_build,
+ partial_resp_pid,
+ doc_acc,
+ doc_queue,
+ write_queue,
+ qserver=nil
+}).
+
+
+-record(mrview, {
+ id_num,
+ update_seq=0,
+ purge_seq=0,
+ map_names=[],
+ reduce_funs=[],
+ def,
+ btree=nil,
+ options=[]
+}).
+
+
+-record(mrheader, {
+ seq=0,
+ purge_seq=0,
+ id_btree_state=nil,
+ view_states=nil
+}).
+
+-define(MAX_VIEW_LIMIT, 16#10000000).
+
+-record(mrargs, {
+ view_type,
+ reduce,
+
+ preflight_fun,
+
+ start_key,
+ start_key_docid,
+ end_key,
+ end_key_docid,
+ keys,
+
+ direction = fwd,
+ limit = ?MAX_VIEW_LIMIT,
+ skip = 0,
+ group_level = 0,
+ group = undefined,
+ stable = false,
+ update = true,
+ multi_get = false,
+ inclusive_end = true,
+ include_docs = false,
+ doc_options = [],
+ update_seq=false,
+ conflicts,
+ callback,
+ sorted = true,
+ extra = []
+}).
+
+-record(vacc, {
+ db,
+ req,
+ resp,
+ prepend,
+ etag,
+ should_close = false,
+ buffer = [],
+ bufsize = 0,
+ threshold = 1490,
+ row_sent = false,
+ meta_sent = false
+}).
+
+-record(lacc, {
+ db,
+ req,
+ resp,
+ qserver,
+ lname,
+ etag,
+ code,
+ headers
+}).
diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl
index 98bceaeb2..0d41f2ef6 100644
--- a/src/couch_mrview/src/couch_mrview.erl
+++ b/src/couch_mrview/src/couch_mrview.erl
@@ -233,9 +233,9 @@ query_all_docs(Db, Args) ->
query_all_docs(Db, Args, Callback, Acc) when is_list(Args) ->
query_all_docs(Db, to_mrargs(Args), Callback, Acc);
query_all_docs(Db, Args0, Callback, Acc) ->
- case couch_db:is_admin(Db) of
- true -> query_all_docs_admin(Db, Args0, Callback, Acc);
- false -> query_all_docs_access(Db, Args0, Callback, Acc)
+ case couch_db:has_access_enabled(Db) and not couch_db:is_admin(Db) of
+ true -> query_all_docs_access(Db, Args0, Callback, Acc);
+ false -> query_all_docs_admin(Db, Args0, Callback, Acc)
end.
access_ddoc() ->
@@ -305,7 +305,7 @@ query_all_docs_access(Db, Args0, Callback0, Acc) ->
UserCtx = couch_db:get_user_ctx(Db),
UserName = UserCtx#user_ctx.name,
Args1 = prefix_startkey_endkey(UserName, Args0, Args0#mrargs.direction),
- Args = Args1#mrargs{reduce=false},
+ Args = Args1#mrargs{reduce=false, extra=Args1#mrargs.extra ++ [{all_docs_access, true}]},
Callback = fun
({row, Props}, Acc0) ->
diff --git a/src/couch_mrview/src/couch_mrview.erl.orig b/src/couch_mrview/src/couch_mrview.erl.orig
new file mode 100644
index 000000000..1cdc91809
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview.erl.orig
@@ -0,0 +1,701 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview).
+
+-export([validate/2]).
+-export([query_all_docs/2, query_all_docs/4]).
+-export([query_view/3, query_view/4, query_view/6, get_view_index_pid/4]).
+-export([get_info/2]).
+-export([trigger_update/2, trigger_update/3]).
+-export([get_view_info/3]).
+-export([refresh/2]).
+-export([compact/2, compact/3, cancel_compaction/2]).
+-export([cleanup/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-record(mracc, {
+ db,
+ meta_sent=false,
+ total_rows,
+ offset,
+ limit,
+ skip,
+ group_level,
+ doc_info,
+ callback,
+ user_acc,
+ last_go=ok,
+ reduce_fun,
+ finalizer,
+ update_seq,
+ args
+}).
+
+
+
+validate_ddoc_fields(DDoc) ->
+ MapFuncType = map_function_type(DDoc),
+ lists:foreach(fun(Path) ->
+ validate_ddoc_fields(DDoc, Path)
+ end, [
+ [{<<"filters">>, object}, {any, [object, string]}],
+ [{<<"language">>, string}],
+ [{<<"lists">>, object}, {any, [object, string]}],
+ [{<<"options">>, object}],
+ [{<<"options">>, object}, {<<"include_design">>, boolean}],
+ [{<<"options">>, object}, {<<"local_seq">>, boolean}],
+ [{<<"options">>, object}, {<<"partitioned">>, boolean}],
+ [{<<"rewrites">>, [string, array]}],
+ [{<<"shows">>, object}, {any, [object, string]}],
+ [{<<"updates">>, object}, {any, [object, string]}],
+ [{<<"validate_doc_update">>, string}],
+ [{<<"views">>, object}, {<<"lib">>, object}],
+ [{<<"views">>, object}, {any, object}, {<<"map">>, MapFuncType}],
+ [{<<"views">>, object}, {any, object}, {<<"reduce">>, string}]
+ ]),
+ require_map_function_for_views(DDoc),
+ ok.
+
+require_map_function_for_views({Props}) ->
+ case couch_util:get_value(<<"views">>, Props) of
+ undefined -> ok;
+ {Views} ->
+ lists:foreach(fun
+ ({<<"lib">>, _}) -> ok;
+ ({Key, {Value}}) ->
+ case couch_util:get_value(<<"map">>, Value) of
+ undefined -> throw({invalid_design_doc,
+ <<"View `", Key/binary, "` must contain map function">>});
+ _ -> ok
+ end
+ end, Views),
+ ok
+ end.
+
+validate_ddoc_fields(DDoc, Path) ->
+ case validate_ddoc_fields(DDoc, Path, []) of
+ ok -> ok;
+ {error, {FailedPath0, Type0}} ->
+ FailedPath = iolist_to_binary(join(FailedPath0, <<".">>)),
+ Type = format_type(Type0),
+ throw({invalid_design_doc,
+ <<"`", FailedPath/binary, "` field must have ",
+ Type/binary, " type">>})
+ end.
+
+validate_ddoc_fields(undefined, _, _) ->
+ ok;
+validate_ddoc_fields(_, [], _) ->
+ ok;
+validate_ddoc_fields({KVS}=Props, [{any, Type} | Rest], Acc) ->
+ lists:foldl(fun
+ ({Key, _}, ok) ->
+ validate_ddoc_fields(Props, [{Key, Type} | Rest], Acc);
+ ({_, _}, {error, _}=Error) ->
+ Error
+ end, ok, KVS);
+validate_ddoc_fields({KVS}=Props, [{Key, Type} | Rest], Acc) ->
+ case validate_ddoc_field(Props, {Key, Type}) of
+ ok ->
+ validate_ddoc_fields(couch_util:get_value(Key, KVS),
+ Rest,
+ [Key | Acc]);
+ error ->
+ {error, {[Key | Acc], Type}};
+ {error, Key1} ->
+ {error, {[Key1 | Acc], Type}}
+ end.
+
+validate_ddoc_field(undefined, Type) when is_atom(Type) ->
+ ok;
+validate_ddoc_field(_, any) ->
+ ok;
+validate_ddoc_field(Value, Types) when is_list(Types) ->
+ lists:foldl(fun
+ (_, ok) -> ok;
+ (Type, _) -> validate_ddoc_field(Value, Type)
+ end, error, Types);
+validate_ddoc_field(Value, string) when is_binary(Value) ->
+ ok;
+validate_ddoc_field(Value, array) when is_list(Value) ->
+ ok;
+validate_ddoc_field({Value}, object) when is_list(Value) ->
+ ok;
+validate_ddoc_field(Value, boolean) when is_boolean(Value) ->
+ ok;
+validate_ddoc_field({Props}, {any, Type}) ->
+ validate_ddoc_field1(Props, Type);
+validate_ddoc_field({Props}, {Key, Type}) ->
+ validate_ddoc_field(couch_util:get_value(Key, Props), Type);
+validate_ddoc_field(_, _) ->
+ error.
+
+validate_ddoc_field1([], _) ->
+ ok;
+validate_ddoc_field1([{Key, Value} | Rest], Type) ->
+ case validate_ddoc_field(Value, Type) of
+ ok ->
+ validate_ddoc_field1(Rest, Type);
+ error ->
+ {error, Key}
+ end.
+
+map_function_type({Props}) ->
+ case couch_util:get_value(<<"language">>, Props) of
+ <<"query">> -> object;
+ _ -> string
+ end.
+
+format_type(Type) when is_atom(Type) ->
+ ?l2b(atom_to_list(Type));
+format_type(Types) when is_list(Types) ->
+ iolist_to_binary(join(lists:map(fun atom_to_list/1, Types), <<" or ">>)).
+
+join(L, Sep) ->
+ join(L, Sep, []).
+join([H|[]], _, Acc) ->
+ [H | Acc];
+join([H|T], Sep, Acc) ->
+ join(T, Sep, [Sep, H | Acc]).
+
+
+validate(Db, DDoc) ->
+ ok = validate_ddoc_fields(DDoc#doc.body),
+ GetName = fun
+ (#mrview{map_names = [Name | _]}) -> Name;
+ (#mrview{reduce_funs = [{Name, _} | _]}) -> Name;
+ (_) -> null
+ end,
+ ValidateView = fun(Proc, #mrview{def=MapSrc, reduce_funs=Reds}=View) ->
+ couch_query_servers:try_compile(Proc, map, GetName(View), MapSrc),
+ lists:foreach(fun
+ ({_RedName, <<"_sum", _/binary>>}) ->
+ ok;
+ ({_RedName, <<"_count", _/binary>>}) ->
+ ok;
+ ({_RedName, <<"_stats", _/binary>>}) ->
+ ok;
+ ({_RedName, <<"_approx_count_distinct", _/binary>>}) ->
+ ok;
+ ({_RedName, <<"_", _/binary>> = Bad}) ->
+ Msg = ["`", Bad, "` is not a supported reduce function."],
+ throw({invalid_design_doc, Msg});
+ ({RedName, RedSrc}) ->
+ couch_query_servers:try_compile(Proc, reduce, RedName, RedSrc)
+ end, Reds)
+ end,
+ {ok, #mrst{
+ language = Lang,
+ views = Views,
+ partitioned = Partitioned
+ }} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc),
+
+ case {couch_db:is_partitioned(Db), Partitioned} of
+ {false, true} ->
+ throw({invalid_design_doc,
+ <<"partitioned option cannot be true in a "
+ "non-partitioned database.">>});
+ {_, _} ->
+ ok
+ end,
+
+ try Views =/= [] andalso couch_query_servers:get_os_process(Lang) of
+ false ->
+ ok;
+ Proc ->
+ try
+ lists:foreach(fun(V) -> ValidateView(Proc, V) end, Views)
+ after
+ couch_query_servers:ret_os_process(Proc)
+ end
+ catch {unknown_query_language, _Lang} ->
+ %% Allow users to save ddocs written in unknown languages
+ ok
+ end.
+
+
+query_all_docs(Db, Args) ->
+ query_all_docs(Db, Args, fun default_cb/2, []).
+
+
+query_all_docs(Db, Args, Callback, Acc) when is_list(Args) ->
+ query_all_docs(Db, to_mrargs(Args), Callback, Acc);
+query_all_docs(Db, Args0, Callback, Acc) ->
+ Sig = couch_util:with_db(Db, fun(WDb) ->
+ {ok, Info} = couch_db:get_db_info(WDb),
+ couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Info)))
+ end),
+ Args1 = Args0#mrargs{view_type=map},
+ Args2 = couch_mrview_util:validate_all_docs_args(Db, Args1),
+ {ok, Acc1} = case Args2#mrargs.preflight_fun of
+ PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc);
+ _ -> {ok, Acc}
+ end,
+ all_docs_fold(Db, Args2, Callback, Acc1).
+
+
+query_view(Db, DDoc, VName) ->
+ query_view(Db, DDoc, VName, #mrargs{}).
+
+
+query_view(Db, DDoc, VName, Args) when is_list(Args) ->
+ query_view(Db, DDoc, VName, to_mrargs(Args), fun default_cb/2, []);
+query_view(Db, DDoc, VName, Args) ->
+ query_view(Db, DDoc, VName, Args, fun default_cb/2, []).
+
+
+query_view(Db, DDoc, VName, Args, Callback, Acc) when is_list(Args) ->
+ query_view(Db, DDoc, VName, to_mrargs(Args), Callback, Acc);
+query_view(Db, DDoc, VName, Args0, Callback, Acc0) ->
+ case couch_mrview_util:get_view(Db, DDoc, VName, Args0) of
+ {ok, VInfo, Sig, Args} ->
+ {ok, Acc1} = case Args#mrargs.preflight_fun of
+ PFFun when is_function(PFFun, 2) -> PFFun(Sig, Acc0);
+ _ -> {ok, Acc0}
+ end,
+ query_view(Db, VInfo, Args, Callback, Acc1);
+ ddoc_updated ->
+ Callback(ok, ddoc_updated)
+ end.
+
+
+get_view_index_pid(Db, DDoc, ViewName, Args0) ->
+ couch_mrview_util:get_view_index_pid(Db, DDoc, ViewName, Args0).
+
+
+query_view(Db, {Type, View, Ref}, Args, Callback, Acc) ->
+ try
+ case Type of
+ map -> map_fold(Db, View, Args, Callback, Acc);
+ red -> red_fold(Db, View, Args, Callback, Acc)
+ end
+ after
+ erlang:demonitor(Ref, [flush])
+ end.
+
+
+get_info(Db, DDoc) ->
+ {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+ couch_index:get_info(Pid).
+
+
+trigger_update(Db, DDoc) ->
+ trigger_update(Db, DDoc, couch_db:get_update_seq(Db)).
+
+trigger_update(Db, DDoc, UpdateSeq) ->
+ {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+ couch_index:trigger_update(Pid, UpdateSeq).
+
+%% get informations on a view
+get_view_info(Db, DDoc, VName) ->
+ {ok, {_, View, _}, _, _Args} = couch_mrview_util:get_view(Db, DDoc, VName,
+ #mrargs{}),
+
+ %% get the total number of rows
+ {ok, TotalRows} = couch_mrview_util:get_row_count(View),
+
+ {ok, [{update_seq, View#mrview.update_seq},
+ {purge_seq, View#mrview.purge_seq},
+ {total_rows, TotalRows}]}.
+
+
+%% @doc refresh a view index
+refresh(DbName, DDoc) when is_binary(DbName)->
+ UpdateSeq = couch_util:with_db(DbName, fun(WDb) ->
+ couch_db:get_update_seq(WDb)
+ end),
+
+ case couch_index_server:get_index(couch_mrview_index, DbName, DDoc) of
+ {ok, Pid} ->
+ case catch couch_index:get_state(Pid, UpdateSeq) of
+ {ok, _} -> ok;
+ Error -> {error, Error}
+ end;
+ Error ->
+ {error, Error}
+ end;
+
+refresh(Db, DDoc) ->
+ refresh(couch_db:name(Db), DDoc).
+
+compact(Db, DDoc) ->
+ compact(Db, DDoc, []).
+
+
+compact(Db, DDoc, Opts) ->
+ {ok, Pid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+ couch_index:compact(Pid, Opts).
+
+
+cancel_compaction(Db, DDoc) ->
+ {ok, IPid} = couch_index_server:get_index(couch_mrview_index, Db, DDoc),
+ {ok, CPid} = couch_index:get_compactor_pid(IPid),
+ ok = couch_index_compactor:cancel(CPid),
+
+ % Cleanup the compaction file if it exists
+ {ok, #mrst{sig=Sig, db_name=DbName}} = couch_index:get_state(IPid, 0),
+ couch_mrview_util:delete_compaction_file(DbName, Sig),
+ ok.
+
+
+cleanup(Db) ->
+ couch_mrview_cleanup:run(Db).
+
+
+all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) ->
+ ReduceFun = get_reduce_fun(Args),
+ Total = get_total_rows(Db, Args),
+ UpdateSeq = get_update_seq(Db, Args),
+ Acc = #mracc{
+ db=Db,
+ total_rows=Total,
+ limit=Args#mrargs.limit,
+ skip=Args#mrargs.skip,
+ callback=Callback,
+ user_acc=UAcc,
+ reduce_fun=ReduceFun,
+ update_seq=UpdateSeq,
+ args=Args
+ },
+ [Opts1] = couch_mrview_util:all_docs_key_opts(Args),
+ % TODO: This is a terrible hack for now. We'll probably have
+ % to rewrite _all_docs to not be part of mrview and not expect
+ % a btree. For now non-btree's will just have to pass 0 or
+ % some fake reductions to get an offset.
+ Opts2 = [include_reductions | Opts1],
+ FunName = case couch_util:get_value(namespace, Args#mrargs.extra) of
+ <<"_design">> -> fold_design_docs;
+ <<"_local">> -> fold_local_docs;
+ _ -> fold_docs
+ end,
+ {ok, Offset, FinalAcc} = couch_db:FunName(Db, fun map_fold/3, Acc, Opts2),
+ finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]);
+all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) ->
+ ReduceFun = get_reduce_fun(Args),
+ Total = get_total_rows(Db, Args),
+ UpdateSeq = get_update_seq(Db, Args),
+ Acc = #mracc{
+ db=Db,
+ total_rows=Total,
+ limit=Args#mrargs.limit,
+ skip=Args#mrargs.skip,
+ callback=Callback,
+ user_acc=UAcc,
+ reduce_fun=ReduceFun,
+ update_seq=UpdateSeq,
+ args=Args
+ },
+ % Backwards compatibility hack. The old _all_docs iterates keys
+ % in reverse if descending=true was passed. Here we'll just
+ % reverse the list instead.
+ Keys = if Dir =:= fwd -> Keys0; true -> lists:reverse(Keys0) end,
+
+ FoldFun = fun(Key, Acc0) ->
+ DocInfo = (catch couch_db:get_doc_info(Db, Key)),
+ {Doc, Acc1} = case DocInfo of
+ {ok, #doc_info{id=Id, revs=[RevInfo | _RestRevs]}=DI} ->
+ Rev = couch_doc:rev_to_str(RevInfo#rev_info.rev),
+ Props = [{rev, Rev}] ++ case RevInfo#rev_info.deleted of
+ true -> [{deleted, true}];
+ false -> []
+ end,
+ {{{Id, Id}, {Props}}, Acc0#mracc{doc_info=DI}};
+ not_found ->
+ {{{Key, error}, not_found}, Acc0}
+ end,
+ {_, Acc2} = map_fold(Doc, {[], [{0, 0, 0}]}, Acc1),
+ Acc2
+ end,
+ FinalAcc = lists:foldl(FoldFun, Acc, Keys),
+ finish_fold(FinalAcc, [{total, Total}]).
+
+
+map_fold(Db, View, Args, Callback, UAcc) ->
+ {ok, Total} = couch_mrview_util:get_row_count(View),
+ Acc = #mracc{
+ db=Db,
+ total_rows=Total,
+ limit=Args#mrargs.limit,
+ skip=Args#mrargs.skip,
+ callback=Callback,
+ user_acc=UAcc,
+ reduce_fun=fun couch_mrview_util:reduce_to_count/1,
+ update_seq=View#mrview.update_seq,
+ args=Args
+ },
+ OptList = couch_mrview_util:key_opts(Args),
+ {Reds, Acc2} = lists:foldl(fun(Opts, {_, Acc0}) ->
+ {ok, R, A} = couch_mrview_util:fold(View, fun map_fold/3, Acc0, Opts),
+ {R, A}
+ end, {nil, Acc}, OptList),
+ Offset = couch_mrview_util:reduce_to_count(Reds),
+ finish_fold(Acc2, [{total, Total}, {offset, Offset}]).
+
+
+map_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
+ % matches for _all_docs and translates #full_doc_info{} -> KV pair
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
+ Value = {[{rev, couch_doc:rev_to_str(Rev)}]},
+ map_fold({{Id, Id}, Value}, OffsetReds, Acc#mracc{doc_info=DI});
+ #doc_info{revs=[#rev_info{deleted=true}|_]} ->
+ {ok, Acc}
+ end;
+map_fold(_KV, _Offset, #mracc{skip=N}=Acc) when N > 0 ->
+ {ok, Acc#mracc{skip=N-1, last_go=ok}};
+map_fold(KV, OffsetReds, #mracc{offset=undefined}=Acc) ->
+ #mracc{
+ total_rows=Total,
+ callback=Callback,
+ user_acc=UAcc0,
+ reduce_fun=Reduce,
+ update_seq=UpdateSeq,
+ args=Args
+ } = Acc,
+ Offset = Reduce(OffsetReds),
+ Meta = make_meta(Args, UpdateSeq, [{total, Total}, {offset, Offset}]),
+ {Go, UAcc1} = Callback(Meta, UAcc0),
+ Acc1 = Acc#mracc{meta_sent=true, offset=Offset, user_acc=UAcc1, last_go=Go},
+ case Go of
+ ok -> map_fold(KV, OffsetReds, Acc1);
+ stop -> {stop, Acc1}
+ end;
+map_fold(_KV, _Offset, #mracc{limit=0}=Acc) ->
+ {stop, Acc};
+map_fold({{Key, Id}, Val}, _Offset, Acc) ->
+ #mracc{
+ db=Db,
+ limit=Limit,
+ doc_info=DI,
+ callback=Callback,
+ user_acc=UAcc0,
+ args=Args
+ } = Acc,
+ Doc = case DI of
+ #doc_info{} -> couch_mrview_util:maybe_load_doc(Db, DI, Args);
+ _ -> couch_mrview_util:maybe_load_doc(Db, Id, Val, Args)
+ end,
+ Row = [{id, Id}, {key, Key}, {value, Val}] ++ Doc,
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{
+ limit=Limit-1,
+ doc_info=undefined,
+ user_acc=UAcc1,
+ last_go=Go
+ }};
+map_fold(#doc{id = <<"_local/", _/binary>>} = Doc, _Offset, #mracc{} = Acc) ->
+ #mracc{
+ limit=Limit,
+ callback=Callback,
+ user_acc=UAcc0,
+ args=Args
+ } = Acc,
+ #doc{
+ id = DocId,
+ revs = {Pos, [RevId | _]}
+ } = Doc,
+ Rev = {Pos, RevId},
+ Row = [
+ {id, DocId},
+ {key, DocId},
+ {value, {[{rev, couch_doc:rev_to_str(Rev)}]}}
+ ] ++ if not Args#mrargs.include_docs -> []; true ->
+ [{doc, couch_doc:to_json_obj(Doc, Args#mrargs.doc_options)}]
+ end,
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{
+ limit=Limit-1,
+ reduce_fun=undefined,
+ doc_info=undefined,
+ user_acc=UAcc1,
+ last_go=Go
+ }}.
+
+red_fold(Db, {NthRed, _Lang, View}=RedView, Args, Callback, UAcc) ->
+ Finalizer = case couch_util:get_value(finalizer, Args#mrargs.extra) of
+ undefined ->
+ {_, FunSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
+ FunSrc;
+ CustomFun->
+ CustomFun
+ end,
+ Acc = #mracc{
+ db=Db,
+ total_rows=null,
+ limit=Args#mrargs.limit,
+ skip=Args#mrargs.skip,
+ group_level=Args#mrargs.group_level,
+ callback=Callback,
+ user_acc=UAcc,
+ update_seq=View#mrview.update_seq,
+ finalizer=Finalizer,
+ args=Args
+ },
+ Grouping = {key_group_level, Args#mrargs.group_level},
+ OptList = couch_mrview_util:key_opts(Args, [Grouping]),
+ Acc2 = lists:foldl(fun(Opts, Acc0) ->
+ {ok, Acc1} =
+ couch_mrview_util:fold_reduce(RedView, fun red_fold/3, Acc0, Opts),
+ Acc1
+ end, Acc, OptList),
+ finish_fold(Acc2, []).
+
+red_fold({p, _Partition, Key}, Red, Acc) ->
+ red_fold(Key, Red, Acc);
+red_fold(_Key, _Red, #mracc{skip=N}=Acc) when N > 0 ->
+ {ok, Acc#mracc{skip=N-1, last_go=ok}};
+red_fold(Key, Red, #mracc{meta_sent=false}=Acc) ->
+ #mracc{
+ args=Args,
+ callback=Callback,
+ user_acc=UAcc0,
+ update_seq=UpdateSeq
+ } = Acc,
+ Meta = make_meta(Args, UpdateSeq, []),
+ {Go, UAcc1} = Callback(Meta, UAcc0),
+ Acc1 = Acc#mracc{user_acc=UAcc1, meta_sent=true, last_go=Go},
+ case Go of
+ ok -> red_fold(Key, Red, Acc1);
+ _ -> {Go, Acc1}
+ end;
+red_fold(_Key, _Red, #mracc{limit=0} = Acc) ->
+ {stop, Acc};
+red_fold(_Key, Red, #mracc{group_level=0} = Acc) ->
+ #mracc{
+ finalizer=Finalizer,
+ limit=Limit,
+ callback=Callback,
+ user_acc=UAcc0
+ } = Acc,
+ Row = [{key, null}, {value, maybe_finalize(Red, Finalizer)}],
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(Key, Red, #mracc{group_level=exact} = Acc) ->
+ #mracc{
+ finalizer=Finalizer,
+ limit=Limit,
+ callback=Callback,
+ user_acc=UAcc0
+ } = Acc,
+ Row = [{key, Key}, {value, maybe_finalize(Red, Finalizer)}],
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0, is_list(K) ->
+ #mracc{
+ finalizer=Finalizer,
+ limit=Limit,
+ callback=Callback,
+ user_acc=UAcc0
+ } = Acc,
+ Row = [{key, lists:sublist(K, I)}, {value, maybe_finalize(Red, Finalizer)}],
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}};
+red_fold(K, Red, #mracc{group_level=I} = Acc) when I > 0 ->
+ #mracc{
+ finalizer=Finalizer,
+ limit=Limit,
+ callback=Callback,
+ user_acc=UAcc0
+ } = Acc,
+ Row = [{key, K}, {value, maybe_finalize(Red, Finalizer)}],
+ {Go, UAcc1} = Callback({row, Row}, UAcc0),
+ {Go, Acc#mracc{user_acc=UAcc1, limit=Limit-1, last_go=Go}}.
+
+maybe_finalize(Red, null) ->
+ Red;
+maybe_finalize(Red, RedSrc) ->
+ {ok, Finalized} = couch_query_servers:finalize(RedSrc, Red),
+ Finalized.
+
+finish_fold(#mracc{last_go=ok, update_seq=UpdateSeq}=Acc, ExtraMeta) ->
+ #mracc{callback=Callback, user_acc=UAcc, args=Args}=Acc,
+ % Possible send meta info
+ Meta = make_meta(Args, UpdateSeq, ExtraMeta),
+ {Go, UAcc1} = case Acc#mracc.meta_sent of
+ false -> Callback(Meta, UAcc);
+ _ -> {ok, Acc#mracc.user_acc}
+ end,
+ % Notify callback that the fold is complete.
+ {_, UAcc2} = case Go of
+ ok -> Callback(complete, UAcc1);
+ _ -> {ok, UAcc1}
+ end,
+ {ok, UAcc2};
+finish_fold(#mracc{user_acc=UAcc}, _ExtraMeta) ->
+ {ok, UAcc}.
+
+
+make_meta(Args, UpdateSeq, Base) ->
+ case Args#mrargs.update_seq of
+ true -> {meta, Base ++ [{update_seq, UpdateSeq}]};
+ _ -> {meta, Base}
+ end.
+
+
+get_reduce_fun(#mrargs{extra = Extra}) ->
+ case couch_util:get_value(namespace, Extra) of
+ <<"_local">> ->
+ fun(_) -> null end;
+ _ ->
+ fun couch_mrview_util:all_docs_reduce_to_count/1
+ end.
+
+
+get_total_rows(Db, #mrargs{extra = Extra}) ->
+ case couch_util:get_value(namespace, Extra) of
+ <<"_local">> ->
+ null;
+ <<"_design">> ->
+ {ok, N} = couch_db:get_design_doc_count(Db),
+ N;
+ _ ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ couch_util:get_value(doc_count, Info)
+ end.
+
+
+get_update_seq(Db, #mrargs{extra = Extra}) ->
+ case couch_util:get_value(namespace, Extra) of
+ <<"_local">> ->
+ null;
+ _ ->
+ couch_db:get_update_seq(Db)
+ end.
+
+
+default_cb(complete, Acc) ->
+ {ok, lists:reverse(Acc)};
+default_cb({final, Info}, []) ->
+ {ok, [Info]};
+default_cb({final, _}, Acc) ->
+ {ok, Acc};
+default_cb(ok, ddoc_updated) ->
+ {ok, ddoc_updated};
+default_cb(Row, Acc) ->
+ {ok, [Row | Acc]}.
+
+
+to_mrargs(KeyList) ->
+ lists:foldl(fun({Key, Value}, Acc) ->
+ Index = lookup_index(couch_util:to_existing_atom(Key)),
+ setelement(Index, Acc, Value)
+ end, #mrargs{}, KeyList).
+
+
+lookup_index(Key) ->
+ Index = lists:zip(
+ record_info(fields, mrargs), lists:seq(2, record_info(size, mrargs))
+ ),
+ couch_util:get_value(Key, Index).
diff --git a/src/couch_mrview/src/couch_mrview_http.erl.orig b/src/couch_mrview/src/couch_mrview_http.erl.orig
new file mode 100644
index 000000000..3cf8833d7
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_http.erl.orig
@@ -0,0 +1,640 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_http).
+
+-export([
+ handle_all_docs_req/2,
+ handle_local_docs_req/2,
+ handle_design_docs_req/2,
+ handle_reindex_req/3,
+ handle_view_req/3,
+ handle_temp_view_req/2,
+ handle_info_req/3,
+ handle_compact_req/3,
+ handle_cleanup_req/2
+]).
+
+-export([
+ parse_boolean/1,
+ parse_int/1,
+ parse_pos_int/1,
+ prepend_val/1,
+ parse_body_and_query/2,
+ parse_body_and_query/3,
+ parse_params/2,
+ parse_params/3,
+ parse_params/4,
+ view_cb/2,
+ row_to_json/1,
+ row_to_json/2,
+ check_view_etag/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+handle_all_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined);
+handle_all_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys);
+handle_all_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_local_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined, <<"_local">>);
+handle_local_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys, <<"_local">>);
+handle_local_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_design_docs_req(#httpd{method='GET'}=Req, Db) ->
+ all_docs_req(Req, Db, undefined, <<"_design">>);
+handle_design_docs_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ Keys = couch_mrview_util:get_view_keys(chttpd:json_body_obj(Req)),
+ all_docs_req(Req, Db, Keys, <<"_design">>);
+handle_design_docs_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+handle_reindex_req(#httpd{method='POST',
+ path_parts=[_, _, DName,<<"_reindex">>]}=Req,
+ Db, _DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ couch_mrview:trigger_update(Db, <<"_design/", DName/binary>>),
+ chttpd:send_json(Req, 201, {[{<<"ok">>, true}]});
+handle_reindex_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_view_req(#httpd{method='GET',
+ path_parts=[_, _, DDocName, _, VName, <<"_info">>]}=Req,
+ Db, _DDoc) ->
+ DbName = couch_db:name(Db),
+ DDocId = <<"_design/", DDocName/binary >>,
+ {ok, Info} = couch_mrview:get_view_info(DbName, DDocId, VName),
+
+ FinalInfo = [{db_name, DbName},
+ {ddoc, DDocId},
+ {view, VName}] ++ Info,
+ chttpd:send_json(Req, 200, {FinalInfo});
+handle_view_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+ [_, _, _, _, ViewName] = Req#httpd.path_parts,
+ couch_stats:increment_counter([couchdb, httpd, view_reads]),
+ design_doc_view(Req, Db, DDoc, ViewName, undefined);
+handle_view_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ [_, _, _, _, ViewName] = Req#httpd.path_parts,
+ Props = chttpd:json_body_obj(Req),
+ Keys = couch_mrview_util:get_view_keys(Props),
+ Queries = couch_mrview_util:get_view_queries(Props),
+ case {Queries, Keys} of
+ {Queries, undefined} when is_list(Queries) ->
+ IncrBy = length(Queries),
+ couch_stats:increment_counter([couchdb, httpd, view_reads], IncrBy),
+ multi_query_view(Req, Db, DDoc, ViewName, Queries);
+ {undefined, Keys} when is_list(Keys) ->
+ couch_stats:increment_counter([couchdb, httpd, view_reads]),
+ design_doc_view(Req, Db, DDoc, ViewName, Keys);
+ {undefined, undefined} ->
+ throw({
+ bad_request,
+ "POST body must contain `keys` or `queries` field"
+ });
+ {_, _} ->
+ throw({bad_request, "`keys` and `queries` are mutually exclusive"})
+ end;
+handle_view_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "GET,POST,HEAD").
+
+
+handle_temp_view_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ {Body} = chttpd:json_body_obj(Req),
+ DDoc = couch_mrview_util:temp_view_to_ddoc({Body}),
+ Keys = couch_mrview_util:get_view_keys({Body}),
+ couch_stats:increment_counter([couchdb, httpd, temporary_view_reads]),
+ design_doc_view(Req, Db, DDoc, <<"temp">>, Keys);
+handle_temp_view_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_info_req(#httpd{method='GET'}=Req, Db, DDoc) ->
+ [_, _, Name, _] = Req#httpd.path_parts,
+ {ok, Info} = couch_mrview:get_info(Db, DDoc),
+ chttpd:send_json(Req, 200, {[
+ {name, Name},
+ {view_index, {Info}}
+ ]});
+handle_info_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "GET").
+
+
+handle_compact_req(#httpd{method='POST'}=Req, Db, DDoc) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ ok = couch_mrview:compact(Db, DDoc),
+ chttpd:send_json(Req, 202, {[{ok, true}]});
+handle_compact_req(Req, _Db, _DDoc) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+handle_cleanup_req(#httpd{method='POST'}=Req, Db) ->
+ chttpd:validate_ctype(Req, "application/json"),
+ ok = couch_db:check_is_admin(Db),
+ ok = couch_mrview:cleanup(Db),
+ chttpd:send_json(Req, 202, {[{ok, true}]});
+handle_cleanup_req(Req, _Db) ->
+ chttpd:send_method_not_allowed(Req, "POST").
+
+
+all_docs_req(Req, Db, Keys) ->
+ all_docs_req(Req, Db, Keys, undefined).
+
+all_docs_req(Req, Db, Keys, NS) ->
+ case is_restricted(Db, NS) of
+ true ->
+ case (catch couch_db:check_is_admin(Db)) of
+ ok ->
+ do_all_docs_req(Req, Db, Keys, NS);
+ _ when NS == <<"_local">> ->
+ throw({forbidden, <<"Only admins can access _local_docs">>});
+ _ ->
+ case is_public_fields_configured(Db) of
+ true ->
+ do_all_docs_req(Req, Db, Keys, NS);
+ false ->
+ throw({forbidden, <<"Only admins can access _all_docs",
+ " of system databases.">>})
+ end
+ end;
+ false ->
+ do_all_docs_req(Req, Db, Keys, NS)
+ end.
+
+is_restricted(_Db, <<"_local">>) ->
+ true;
+is_restricted(Db, _) ->
+ couch_db:is_system_db(Db).
+
+is_public_fields_configured(Db) ->
+ DbName = ?b2l(couch_db:name(Db)),
+ case config:get("couch_httpd_auth", "authentication_db", "_users") of
+ DbName ->
+ UsersDbPublic = config:get("couch_httpd_auth", "users_db_public", "false"),
+ PublicFields = config:get("couch_httpd_auth", "public_fields"),
+ case {UsersDbPublic, PublicFields} of
+ {"true", PublicFields} when PublicFields =/= undefined ->
+ true;
+ {_, _} ->
+ false
+ end;
+ _ ->
+ false
+ end.
+
+do_all_docs_req(Req, Db, Keys, NS) ->
+ Args0 = couch_mrview_http:parse_body_and_query(Req, Keys),
+ Args1 = set_namespace(NS, Args0),
+ ETagFun = fun(Sig, Acc0) ->
+ check_view_etag(Sig, Acc0, Req)
+ end,
+ Args = Args1#mrargs{preflight_fun=ETagFun},
+ {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, threshold=Max},
+ DbName = ?b2l(couch_db:name(Db)),
+ UsersDbName = config:get("couch_httpd_auth",
+ "authentication_db",
+ "_users"),
+ IsAdmin = is_admin(Db),
+ Callback = get_view_callback(DbName, UsersDbName, IsAdmin),
+ couch_mrview:query_all_docs(Db, Args, Callback, VAcc0)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end.
+
+set_namespace(NS, #mrargs{extra = Extra} = Args) ->
+ Args#mrargs{extra = [{namespace, NS} | Extra]}.
+
+is_admin(Db) ->
+ case catch couch_db:check_is_admin(Db) of
+ {unauthorized, _} ->
+ false;
+ ok ->
+ true
+ end.
+
+
+% admin users always get all fields
+get_view_callback(_, _, true) ->
+ fun view_cb/2;
+% if we are operating on the users db and we aren't
+% admin, filter the view
+get_view_callback(_DbName, _DbName, false) ->
+ fun filtered_view_cb/2;
+% non _users databases get all fields
+get_view_callback(_, _, _) ->
+ fun view_cb/2.
+
+
+design_doc_view(Req, Db, DDoc, ViewName, Keys) ->
+ Args0 = parse_params(Req, Keys),
+ ETagFun = fun(Sig, Acc0) ->
+ check_view_etag(Sig, Acc0, Req)
+ end,
+ Args = Args0#mrargs{preflight_fun=ETagFun},
+ {ok, Resp} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, threshold=Max},
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0)
+ end),
+ case is_record(Resp, vacc) of
+ true -> {ok, Resp#vacc.resp};
+ _ -> {ok, Resp}
+ end.
+
+
+multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
+ Args0 = parse_params(Req, undefined),
+ {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0),
+ ArgQueries = lists:map(fun({Query}) ->
+ QueryArg = parse_params(Query, undefined, Args1),
+ couch_mrview_util:validate_args(Db, DDoc, QueryArg)
+ end, Queries),
+ {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() ->
+ Max = chttpd:chunked_response_buffer_size(),
+ VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n", threshold=Max},
+ %% TODO: proper calculation of etag
+ Etag = [$", couch_uuids:new(), $"],
+ Headers = [{"ETag", Etag}],
+ FirstChunk = "{\"results\":[",
+ {ok, Resp0} = chttpd:start_delayed_json_response(VAcc0#vacc.req, 200, Headers, FirstChunk),
+ VAcc1 = VAcc0#vacc{resp=Resp0},
+ VAcc2 = lists:foldl(fun(Args, Acc0) ->
+ {ok, Acc1} = couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, Acc0),
+ Acc1
+ end, VAcc1, ArgQueries),
+ {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, VAcc2#vacc{resp=Resp2}}
+ end),
+ case is_record(Resp2, vacc) of
+ true -> {ok, Resp2#vacc.resp};
+ _ -> {ok, Resp2}
+ end.
+
+filtered_view_cb({row, Row0}, Acc) ->
+ Row1 = lists:map(fun({doc, null}) ->
+ {doc, null};
+ ({doc, Body}) ->
+ Doc = couch_users_db:strip_non_public_fields(#doc{body=Body}),
+ {doc, Doc#doc.body};
+ (KV) ->
+ KV
+ end, Row0),
+ view_cb({row, Row1}, Acc);
+filtered_view_cb(Obj, Acc) ->
+ view_cb(Obj, Acc).
+
+
+%% these clauses start (and possibly end) the response
+view_cb({error, Reason}, #vacc{resp=undefined}=Acc) ->
+ {ok, Resp} = chttpd:send_error(Acc#vacc.req, Reason),
+ {ok, Acc#vacc{resp=Resp}};
+
+view_cb(complete, #vacc{resp=undefined}=Acc) ->
+ % Nothing in view
+ {ok, Resp} = chttpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
+ {ok, Acc#vacc{resp=Resp}};
+
+view_cb(Msg, #vacc{resp=undefined}=Acc) ->
+ %% Start response
+ Headers = [],
+ {ok, Resp} = chttpd:start_delayed_json_response(Acc#vacc.req, 200, Headers),
+ view_cb(Msg, Acc#vacc{resp=Resp, should_close=true});
+
+%% ---------------------------------------------------
+
+%% From here on down, the response has been started.
+
+view_cb({error, Reason}, #vacc{resp=Resp}=Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_error(Resp, Reason),
+ {ok, Acc#vacc{resp=Resp1}};
+
+view_cb(complete, #vacc{resp=Resp, buffer=Buf, threshold=Max}=Acc) ->
+ % Finish view output and possibly end the response
+ {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, "\r\n]}", Max),
+ case Acc#vacc.should_close of
+ true ->
+ {ok, Resp2} = chttpd:end_delayed_json_response(Resp1),
+ {ok, Acc#vacc{resp=Resp2}};
+ _ ->
+ {ok, Acc#vacc{resp=Resp1, meta_sent=false, row_sent=false,
+ prepend=",\r\n", buffer=[], bufsize=0}}
+ end;
+
+view_cb({meta, Meta}, #vacc{meta_sent=false, row_sent=false}=Acc) ->
+ % Sending metadata as we've not sent it or any row yet
+ Parts = case couch_util:get_value(total, Meta) of
+ undefined -> [];
+ Total -> [io_lib:format("\"total_rows\":~p", [Total])]
+ end ++ case couch_util:get_value(offset, Meta) of
+ undefined -> [];
+ Offset -> [io_lib:format("\"offset\":~p", [Offset])]
+ end ++ case couch_util:get_value(update_seq, Meta) of
+ undefined -> [];
+ null ->
+ ["\"update_seq\":null"];
+ UpdateSeq when is_integer(UpdateSeq) ->
+ [io_lib:format("\"update_seq\":~B", [UpdateSeq])];
+ UpdateSeq when is_binary(UpdateSeq) ->
+ [io_lib:format("\"update_seq\":\"~s\"", [UpdateSeq])]
+ end ++ ["\"rows\":["],
+ Chunk = [prepend_val(Acc), "{", string:join(Parts, ","), "\r\n"],
+ {ok, AccOut} = maybe_flush_response(Acc, Chunk, iolist_size(Chunk)),
+ {ok, AccOut#vacc{prepend="", meta_sent=true}};
+
+view_cb({meta, _Meta}, #vacc{}=Acc) ->
+ %% ignore metadata
+ {ok, Acc};
+
+view_cb({row, Row}, #vacc{meta_sent=false}=Acc) ->
+ %% sorted=false and row arrived before meta
+ % Adding another row
+ Chunk = [prepend_val(Acc), "{\"rows\":[\r\n", row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{meta_sent=true, row_sent=true}, Chunk, iolist_size(Chunk));
+
+view_cb({row, Row}, #vacc{meta_sent=true}=Acc) ->
+ % Adding another row
+ Chunk = [prepend_val(Acc), row_to_json(Row)],
+ maybe_flush_response(Acc#vacc{row_sent=true}, Chunk, iolist_size(Chunk)).
+
+
+maybe_flush_response(#vacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when Size > 0 andalso (Size + Len) > Max ->
+ #vacc{buffer = Buffer, resp = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#vacc{prepend = ",\r\n", buffer = Data, bufsize = Len, resp = R1}};
+maybe_flush_response(Acc0, Data, Len) ->
+ #vacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#vacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
+prepend_val(#vacc{prepend=Prepend}) ->
+ case Prepend of
+ undefined ->
+ "";
+ _ ->
+ Prepend
+ end.
+
+
+row_to_json(Row) ->
+ Id = couch_util:get_value(id, Row),
+ row_to_json(Id, Row).
+
+
+row_to_json(error, Row) ->
+ % Special case for _all_docs request with KEYS to
+ % match prior behavior.
+ Key = couch_util:get_value(key, Row),
+ Val = couch_util:get_value(value, Row),
+ Reason = couch_util:get_value(reason, Row),
+ ReasonProp = if Reason == undefined -> []; true ->
+ [{reason, Reason}]
+ end,
+ Obj = {[{key, Key}, {error, Val}] ++ ReasonProp},
+ ?JSON_ENCODE(Obj);
+row_to_json(Id0, Row) ->
+ Id = case Id0 of
+ undefined -> [];
+ Id0 -> [{id, Id0}]
+ end,
+ Key = couch_util:get_value(key, Row, null),
+ Val = couch_util:get_value(value, Row),
+ Doc = case couch_util:get_value(doc, Row) of
+ undefined -> [];
+ Doc0 -> [{doc, Doc0}]
+ end,
+ Obj = {Id ++ [{key, Key}, {value, Val}] ++ Doc},
+ ?JSON_ENCODE(Obj).
+
+
+parse_params(#httpd{}=Req, Keys) ->
+ parse_params(chttpd:qs(Req), Keys);
+parse_params(Props, Keys) ->
+ Args = #mrargs{},
+ parse_params(Props, Keys, Args).
+
+
+parse_params(Props, Keys, Args) ->
+ parse_params(Props, Keys, Args, []).
+
+parse_params(Props, Keys, #mrargs{}=Args0, Options) ->
+ IsDecoded = lists:member(decoded, Options),
+ Args1 = case lists:member(keep_group_level, Options) of
+ true ->
+ Args0;
+ _ ->
+ % group_level set to undefined to detect if explicitly set by user
+ Args0#mrargs{keys=Keys, group=undefined, group_level=undefined}
+ end,
+ lists:foldl(fun({K, V}, Acc) ->
+ parse_param(K, V, Acc, IsDecoded)
+ end, Args1, Props).
+
+
+parse_body_and_query(#httpd{method='POST'} = Req, Keys) ->
+ Props = chttpd:json_body_obj(Req),
+ parse_body_and_query(Req, Props, Keys);
+
+parse_body_and_query(Req, Keys) ->
+ parse_params(chttpd:qs(Req), Keys, #mrargs{keys=Keys, group=undefined,
+ group_level=undefined}, [keep_group_level]).
+
+parse_body_and_query(Req, {Props}, Keys) ->
+ Args = #mrargs{keys=Keys, group=undefined, group_level=undefined},
+ BodyArgs = parse_params(Props, Keys, Args, [decoded]),
+ parse_params(chttpd:qs(Req), Keys, BodyArgs, [keep_group_level]).
+
+parse_param(Key, Val, Args, IsDecoded) when is_binary(Key) ->
+ parse_param(binary_to_list(Key), Val, Args, IsDecoded);
+parse_param(Key, Val, Args, IsDecoded) ->
+ case Key of
+ "" ->
+ Args;
+ "reduce" ->
+ Args#mrargs{reduce=parse_boolean(Val)};
+ "key" when IsDecoded ->
+ Args#mrargs{start_key=Val, end_key=Val};
+ "key" ->
+ JsonKey = ?JSON_DECODE(Val),
+ Args#mrargs{start_key=JsonKey, end_key=JsonKey};
+ "keys" when IsDecoded ->
+ Args#mrargs{keys=Val};
+ "keys" ->
+ Args#mrargs{keys=?JSON_DECODE(Val)};
+ "startkey" when IsDecoded ->
+ Args#mrargs{start_key=Val};
+ "start_key" when IsDecoded ->
+ Args#mrargs{start_key=Val};
+ "startkey" ->
+ Args#mrargs{start_key=?JSON_DECODE(Val)};
+ "start_key" ->
+ Args#mrargs{start_key=?JSON_DECODE(Val)};
+ "startkey_docid" ->
+ Args#mrargs{start_key_docid=couch_util:to_binary(Val)};
+ "start_key_doc_id" ->
+ Args#mrargs{start_key_docid=couch_util:to_binary(Val)};
+ "endkey" when IsDecoded ->
+ Args#mrargs{end_key=Val};
+ "end_key" when IsDecoded ->
+ Args#mrargs{end_key=Val};
+ "endkey" ->
+ Args#mrargs{end_key=?JSON_DECODE(Val)};
+ "end_key" ->
+ Args#mrargs{end_key=?JSON_DECODE(Val)};
+ "endkey_docid" ->
+ Args#mrargs{end_key_docid=couch_util:to_binary(Val)};
+ "end_key_doc_id" ->
+ Args#mrargs{end_key_docid=couch_util:to_binary(Val)};
+ "limit" ->
+ Args#mrargs{limit=parse_pos_int(Val)};
+ "stale" when Val == "ok" orelse Val == <<"ok">> ->
+ Args#mrargs{stable=true, update=false};
+ "stale" when Val == "update_after" orelse Val == <<"update_after">> ->
+ Args#mrargs{stable=true, update=lazy};
+ "stale" ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ "stable" when Val == "true" orelse Val == <<"true">> ->
+ Args#mrargs{stable=true};
+ "stable" when Val == "false" orelse Val == <<"false">> ->
+ Args#mrargs{stable=false};
+ "stable" ->
+ throw({query_parse_error, <<"Invalid value for `stable`.">>});
+ "update" when Val == "true" orelse Val == <<"true">> ->
+ Args#mrargs{update=true};
+ "update" when Val == "false" orelse Val == <<"false">> ->
+ Args#mrargs{update=false};
+ "update" when Val == "lazy" orelse Val == <<"lazy">> ->
+ Args#mrargs{update=lazy};
+ "update" ->
+ throw({query_parse_error, <<"Invalid value for `update`.">>});
+ "descending" ->
+ case parse_boolean(Val) of
+ true -> Args#mrargs{direction=rev};
+ _ -> Args#mrargs{direction=fwd}
+ end;
+ "skip" ->
+ Args#mrargs{skip=parse_pos_int(Val)};
+ "group" ->
+ Args#mrargs{group=parse_boolean(Val)};
+ "group_level" ->
+ Args#mrargs{group_level=parse_pos_int(Val)};
+ "inclusive_end" ->
+ Args#mrargs{inclusive_end=parse_boolean(Val)};
+ "include_docs" ->
+ Args#mrargs{include_docs=parse_boolean(Val)};
+ "attachments" ->
+ case parse_boolean(Val) of
+ true ->
+ Opts = Args#mrargs.doc_options,
+ Args#mrargs{doc_options=[attachments|Opts]};
+ false ->
+ Args
+ end;
+ "att_encoding_info" ->
+ case parse_boolean(Val) of
+ true ->
+ Opts = Args#mrargs.doc_options,
+ Args#mrargs{doc_options=[att_encoding_info|Opts]};
+ false ->
+ Args
+ end;
+ "update_seq" ->
+ Args#mrargs{update_seq=parse_boolean(Val)};
+ "conflicts" ->
+ Args#mrargs{conflicts=parse_boolean(Val)};
+ "callback" ->
+ Args#mrargs{callback=couch_util:to_binary(Val)};
+ "sorted" ->
+ Args#mrargs{sorted=parse_boolean(Val)};
+ "partition" ->
+ Partition = couch_util:to_binary(Val),
+ couch_partition:validate_partition(Partition),
+ couch_mrview_util:set_extra(Args, partition, Partition);
+ _ ->
+ BKey = couch_util:to_binary(Key),
+ BVal = couch_util:to_binary(Val),
+ Args#mrargs{extra=[{BKey, BVal} | Args#mrargs.extra]}
+ end.
+
+
+parse_boolean(true) ->
+ true;
+parse_boolean(false) ->
+ false;
+
+parse_boolean(Val) when is_binary(Val) ->
+ parse_boolean(?b2l(Val));
+
+parse_boolean(Val) ->
+ case string:to_lower(Val) of
+ "true" -> true;
+ "false" -> false;
+ _ ->
+ Msg = io_lib:format("Invalid boolean parameter: ~p", [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+parse_int(Val) when is_integer(Val) ->
+ Val;
+parse_int(Val) ->
+ case (catch list_to_integer(Val)) of
+ IntVal when is_integer(IntVal) ->
+ IntVal;
+ _ ->
+ Msg = io_lib:format("Invalid value for integer: ~p", [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+parse_pos_int(Val) ->
+ case parse_int(Val) of
+ IntVal when IntVal >= 0 ->
+ IntVal;
+ _ ->
+ Fmt = "Invalid value for positive integer: ~p",
+ Msg = io_lib:format(Fmt, [Val]),
+ throw({query_parse_error, ?l2b(Msg)})
+ end.
+
+
+check_view_etag(Sig, Acc0, Req) ->
+ ETag = chttpd:make_etag(Sig),
+ case chttpd:etag_match(Req, ETag) of
+ true -> throw({etag_match, ETag});
+ false -> {ok, Acc0#vacc{etag=ETag}}
+ end.
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.orig b/src/couch_mrview/src/couch_mrview_updater.erl.orig
new file mode 100644
index 000000000..7d6823e6a
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_updater.erl.orig
@@ -0,0 +1,380 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_updater).
+
+-export([start_update/4, purge/4, process_doc/3, finish_update/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+-define(REM_VAL, removed).
+
+start_update(Partial, State, NumChanges, NumChangesDone) ->
+ MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
+ MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
+ QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
+ {ok, DocQueue} = couch_work_queue:new(QueueOpts),
+ {ok, WriteQueue} = couch_work_queue:new(QueueOpts),
+ InitState = State#mrst{
+ first_build=State#mrst.update_seq==0,
+ partial_resp_pid=Partial,
+ doc_acc=[],
+ doc_queue=DocQueue,
+ write_queue=WriteQueue
+ },
+
+ Self = self(),
+
+ MapFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
+ Progress = case NumChanges of
+ 0 -> 0;
+ _ -> (NumChangesDone * 100) div NumChanges
+ end,
+ couch_task_status:add_task([
+ {indexer_pid, ?l2b(pid_to_list(Partial))},
+ {type, indexer},
+ {database, State#mrst.db_name},
+ {design_document, State#mrst.idx_name},
+ {progress, Progress},
+ {changes_done, NumChangesDone},
+ {total_changes, NumChanges}
+ ]),
+ couch_task_status:set_update_frequency(500),
+ map_docs(Self, InitState)
+ end,
+ WriteFun = fun() ->
+ erlang:put(io_priority,
+ {view_update, State#mrst.db_name, State#mrst.idx_name}),
+ write_results(Self, InitState)
+ end,
+ spawn_link(MapFun),
+ spawn_link(WriteFun),
+
+ {ok, InitState}.
+
+
+purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
+ #mrst{
+ id_btree=IdBtree,
+ views=Views,
+ partitioned=Partitioned
+ } = State,
+
+ Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
+ {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
+
+ MakeDictFun = fun
+ ({ok, {DocId, ViewNumRowKeys}}, DictAcc) ->
+ FoldFun = fun
+ ({ViewNum, {Key, Seq, _Op}}, DictAcc2) ->
+ dict:append(ViewNum, {Key, Seq, DocId}, DictAcc2);
+ ({ViewNum, RowKey0}, DictAcc2) ->
+ RowKey = if not Partitioned -> RowKey0; true ->
+ [{RK, _}] = inject_partition([{RowKey0, DocId}]),
+ RK
+ end,
+ dict:append(ViewNum, {RowKey, DocId}, DictAcc2)
+ end,
+ lists:foldl(FoldFun, DictAcc, ViewNumRowKeys);
+ ({not_found, _}, DictAcc) ->
+ DictAcc
+ end,
+ KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups),
+
+ RemKeysFun = fun(#mrview{id_num=ViewId}=View) ->
+ ToRem = couch_util:dict_find(ViewId, KeysToRemove, []),
+ {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, [], ToRem),
+ NewPurgeSeq = case VBtree2 =/= View#mrview.btree of
+ true -> PurgeSeq;
+ _ -> View#mrview.purge_seq
+ end,
+ View#mrview{btree=VBtree2, purge_seq=NewPurgeSeq}
+ end,
+
+ Views2 = lists:map(RemKeysFun, Views),
+ {ok, State#mrst{
+ id_btree=IdBtree2,
+ views=Views2,
+ purge_seq=PurgeSeq
+ }}.
+
+
+process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 ->
+ couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)),
+ process_doc(Doc, Seq, State#mrst{doc_acc=[]});
+process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) ->
+ {ok, State#mrst{doc_acc=[{nil, Seq, nil} | Acc]}};
+% process_doc(#doc{id=Id, deleted=true}, Seq, #mrst{doc_acc=Acc}=State) ->
+% {ok, State#mrst{doc_acc=[{Id, Seq, deleted} | Acc]}};
+process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
+ {ok, State#mrst{doc_acc=[{Id, Seq, Doc} | Acc]}}.
+
+
+finish_update(#mrst{doc_acc=Acc}=State) ->
+ if Acc /= [] ->
+ couch_work_queue:queue(State#mrst.doc_queue, Acc);
+ true -> ok
+ end,
+ couch_work_queue:close(State#mrst.doc_queue),
+ receive
+ {new_state, NewState} ->
+ {ok, NewState#mrst{
+ first_build=undefined,
+ partial_resp_pid=undefined,
+ doc_acc=undefined,
+ doc_queue=undefined,
+ write_queue=undefined,
+ qserver=nil
+ }}
+ end.
+
+make_deleted_body({Props}, Meta, Seq) ->
+ BodySp = couch_util:get_value(body_sp, Meta),
+ Result = [{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}],
+ case couch_util:get_value(<<"_access">>, Props) of
+ undefined -> Result;
+ Access -> [{<<"_access">>, Access} | Result]
+ end.
+
+map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) ->
+ erlang:put(io_priority, {view_update, DbName, IdxName}),
+ case couch_work_queue:dequeue(State0#mrst.doc_queue) of
+ closed ->
+ couch_query_servers:stop_doc_map(State0#mrst.qserver),
+ couch_work_queue:close(State0#mrst.write_queue);
+ {ok, Dequeued} ->
+ % Run all the non deleted docs through the view engine and
+ % then pass the results on to the writer process.
+ State1 = case State0#mrst.qserver of
+ nil -> start_query_server(State0);
+ _ -> State0
+ end,
+ QServer = State1#mrst.qserver,
+ DocFun = fun
+ ({nil, Seq, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
+ ({Id, Seq, deleted}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+ ({Id, Seq, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
+ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+ {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+ end,
+ FoldFun = fun(Docs, Acc) ->
+ update_task(length(Docs)),
+ lists:foldl(DocFun, Acc, Docs)
+ end,
+ Results = lists:foldl(FoldFun, {0, []}, Dequeued),
+ couch_work_queue:queue(State1#mrst.write_queue, Results),
+ map_docs(Parent, State1)
+ end.
+
+
+write_results(Parent, #mrst{} = State) ->
+ case accumulate_writes(State, State#mrst.write_queue, nil) of
+ stop ->
+ Parent ! {new_state, State};
+ {Go, {Seq, ViewKVs, DocIdKeys}} ->
+ NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys),
+ if Go == stop ->
+ Parent ! {new_state, NewState};
+ true ->
+ send_partial(NewState#mrst.partial_resp_pid, NewState),
+ write_results(Parent, NewState)
+ end
+ end.
+
+
+start_query_server(State) ->
+ #mrst{
+ language=Language,
+ lib=Lib,
+ views=Views
+ } = State,
+ Defs = [View#mrview.def || View <- Views],
+ {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+ State#mrst{qserver=QServer}.
+
+
+accumulate_writes(State, W, Acc0) ->
+ {Seq, ViewKVs, DocIdKVs} = case Acc0 of
+ nil -> {0, [{V#mrview.id_num, []} || V <- State#mrst.views], []};
+ _ -> Acc0
+ end,
+ case couch_work_queue:dequeue(W) of
+ closed when Seq == 0 ->
+ stop;
+ closed ->
+ {stop, {Seq, ViewKVs, DocIdKVs}};
+ {ok, Info} ->
+ {_, _, NewIds} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs),
+ case accumulate_more(length(NewIds), Acc) of
+ true -> accumulate_writes(State, W, Acc);
+ false -> {ok, Acc}
+ end
+ end.
+
+
+accumulate_more(NumDocIds, Acc) ->
+ % check if we have enough items now
+ MinItems = config:get("view_updater", "min_writer_items", "100"),
+ MinSize = config:get("view_updater", "min_writer_size", "16777216"),
+ CurrMem = ?term_size(Acc),
+ NumDocIds < list_to_integer(MinItems)
+ andalso CurrMem < list_to_integer(MinSize).
+
+
+merge_results([], SeqAcc, ViewKVs, DocIdKeys) ->
+ {SeqAcc, ViewKVs, DocIdKeys};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys) ->
+ Fun = fun(RawResults, {VKV, DIK}) ->
+ merge_results(RawResults, VKV, DIK)
+ end,
+ {ViewKVs1, DocIdKeys1} = lists:foldl(Fun, {ViewKVs, DocIdKeys}, Results),
+ merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1).
+
+
+merge_results({DocId, []}, ViewKVs, DocIdKeys) ->
+ {ViewKVs, [{DocId, []} | DocIdKeys]};
+merge_results({DocId, RawResults}, ViewKVs, DocIdKeys) ->
+ JsonResults = couch_query_servers:raw_to_ejson(RawResults),
+ Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
+ case lists:flatten(Results) of
+ [] ->
+ {ViewKVs, [{DocId, []} | DocIdKeys]};
+ _ ->
+ {ViewKVs1, ViewIdKeys} = insert_results(DocId, Results, ViewKVs, [], []),
+ {ViewKVs1, [ViewIdKeys | DocIdKeys]}
+ end.
+
+
+insert_results(DocId, [], [], ViewKVs, ViewIdKeys) ->
+ {lists:reverse(ViewKVs), {DocId, ViewIdKeys}};
+insert_results(DocId, [KVs | RKVs], [{Id, VKVs} | RVKVs], VKVAcc, VIdKeys) ->
+ CombineDupesFun = fun
+ ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys}) ->
+ {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys};
+ ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys}) ->
+ {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys};
+ ({Key, Value}, {Rest, IdKeys}) ->
+ {[{Key, Value} | Rest], [{Id, Key} | IdKeys]}
+ end,
+ InitAcc = {[], VIdKeys},
+ couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)),
+ {Duped, VIdKeys0} = lists:foldl(CombineDupesFun, InitAcc,
+ lists:sort(KVs)),
+ FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
+ insert_results(DocId, RKVs, RVKVs, [{Id, FinalKVs} | VKVAcc], VIdKeys0).
+
+
+write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys) ->
+ #mrst{
+ id_btree=IdBtree,
+ first_build=FirstBuild,
+ partitioned=Partitioned
+ } = State,
+
+ {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
+ ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
+
+ UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, KVs0}) ->
+ ToRem0 = couch_util:dict_find(ViewId, ToRemByView, []),
+ {KVs, ToRem} = case Partitioned of
+ true ->
+ KVs1 = inject_partition(KVs0),
+ ToRem1 = inject_partition(ToRem0),
+ {KVs1, ToRem1};
+ false ->
+ {KVs0, ToRem0}
+ end,
+ {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
+ NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
+ true -> UpdateSeq;
+ _ -> View#mrview.update_seq
+ end,
+
+ View2 = View#mrview{btree=VBtree2, update_seq=NewUpdateSeq},
+ maybe_notify(State, View2, KVs, ToRem),
+ View2
+ end,
+
+ State#mrst{
+ views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
+ update_seq=UpdateSeq,
+ id_btree=IdBtree2
+ }.
+
+
+inject_partition(Rows) ->
+ lists:map(fun
+ ({{Key, DocId}, Value}) ->
+ % Adding a row to the view
+ {Partition, _} = couch_partition:extract(DocId),
+ {{{p, Partition, Key}, DocId}, Value};
+ ({Key, DocId}) ->
+ % Removing a row based on values in id_tree
+ {Partition, _} = couch_partition:extract(DocId),
+ {{p, Partition, Key}, DocId}
+ end, Rows).
+
+
+update_id_btree(Btree, DocIdKeys, true) ->
+ ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+ couch_btree:query_modify(Btree, [], ToAdd, []);
+update_id_btree(Btree, DocIdKeys, _) ->
+ ToFind = [Id || {Id, _} <- DocIdKeys],
+ ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
+ ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
+ couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
+
+
+collapse_rem_keys([], Acc) ->
+ Acc;
+collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
+ NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
+ dict:append(ViewId, {Key, DocId}, Acc2)
+ end, Acc, ViewIdKeys),
+ collapse_rem_keys(Rest, NewAcc);
+collapse_rem_keys([{not_found, _} | Rest], Acc) ->
+ collapse_rem_keys(Rest, Acc).
+
+
+send_partial(Pid, State) when is_pid(Pid) ->
+ gen_server:cast(Pid, {new_state, State});
+send_partial(_, _) ->
+ ok.
+
+
+update_task(NumChanges) ->
+ [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+ Changes2 = Changes + NumChanges,
+ Progress = case Total of
+ 0 ->
+ % updater restart after compaction finishes
+ 0;
+ _ ->
+ (Changes2 * 100) div Total
+ end,
+ couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
+
+
+maybe_notify(State, View, KVs, ToRem) ->
+ Updated = fun() ->
+ [Key || {{Key, _}, _} <- KVs]
+ end,
+ Removed = fun() ->
+ [Key || {Key, _DocId} <- ToRem]
+ end,
+ couch_index_plugin:index_update(State, View, Updated, Removed).
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl.rej b/src/couch_mrview/src/couch_mrview_updater.erl.rej
new file mode 100644
index 000000000..81a2ce15f
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_updater.erl.rej
@@ -0,0 +1,52 @@
+***************
+*** 192,202 ****
+ DocFun = fun
+ ({nil, Seq, _, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
+- ({Id, Seq, Rev, deleted}, {SeqAcc, Results}) ->
+- {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]};
+ ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
+- {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}
+ end,
+
+--- 199,236 ----
+ DocFun = fun
+ ({nil, Seq, _, _}, {SeqAcc, Results}) ->
+ {erlang:max(Seq, SeqAcc), Results};
++ ({Id, Seq, Rev, #doc{deleted=true, body=Body, meta=Meta}}, {SeqAcc, Results}) ->
++ % _access needs deleted docs
++ case IdxName of
++ <<"_design/_access">> ->
++ % splice in seq
++ {Start, Rev1} = Rev,
++ Doc = #doc{
++ id = Id,
++ revs = {Start, [Rev1]},
++ body = {make_deleted_body(Body, Meta, Seq)}, %% todo: only keep _access and add _seq
++ deleted = true
++ },
++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]};
++ _Else ->
++ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]}
++ end;
+ ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) ->
+ couch_stats:increment_counter([couchdb, mrview, map_doc]),
++ % couch_log:info("~nIdxName: ~p, Doc: ~p~n~n", [IdxName, Doc]),
++ Doc0 = case IdxName of
++ <<"_design/_access">> ->
++ % splice in seq
++ {Props} = Doc#doc.body,
++ BodySp = couch_util:get_value(body_sp, Doc#doc.meta),
++ Doc#doc{
++ body = {Props++[{<<"_seq">>, Seq}, {<<"_body_sp">>, BodySp}]}
++ };
++ _Else ->
++ Doc
++ end,
++ {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc0),
+ {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}
+ end,
+
diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl
index be75dd5e5..2bf168073 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -409,11 +409,11 @@ validate_args(Db, DDoc, Args0) ->
validate_args(#mrst{} = State, Args0) ->
Args = validate_args(Args0),
-
ViewPartitioned = State#mrst.partitioned,
Partition = get_extra(Args, partition),
+ AllDocsAccess = get_extra(Args, all_docs_access, false),
- case {ViewPartitioned, Partition} of
+ case {ViewPartitioned and not AllDocsAccess, Partition} of
{true, undefined} ->
Msg1 = <<"`partition` parameter is mandatory "
"for queries to this view.">>,
diff --git a/src/couch_mrview/src/couch_mrview_util.erl.orig b/src/couch_mrview/src/couch_mrview_util.erl.orig
new file mode 100644
index 000000000..e971720c9
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_util.erl.orig
@@ -0,0 +1,1177 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_util).
+
+-export([get_view/4, get_view_index_pid/4]).
+-export([get_local_purge_doc_id/1, get_value_from_options/2]).
+-export([verify_view_filename/1, get_signature_from_filename/1]).
+-export([ddoc_to_mrst/2, init_state/4, reset_index/3]).
+-export([make_header/1]).
+-export([index_file/2, compaction_file/2, open_file/1]).
+-export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
+-export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
+-export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
+-export([fold/4, fold_reduce/4]).
+-export([temp_view_to_ddoc/1]).
+-export([calculate_external_size/1]).
+-export([calculate_active_size/1]).
+-export([validate_all_docs_args/2, validate_args/1, validate_args/3]).
+-export([maybe_load_doc/3, maybe_load_doc/4]).
+-export([maybe_update_index_file/1]).
+-export([extract_view/4, extract_view_reduce/1]).
+-export([get_view_keys/1, get_view_queries/1]).
+-export([set_view_type/3]).
+-export([set_extra/3, get_extra/2, get_extra/3]).
+
+-define(MOD, couch_mrview_index).
+-define(GET_VIEW_RETRY_COUNT, 1).
+-define(GET_VIEW_RETRY_DELAY, 50).
+-define(LOWEST_KEY, null).
+-define(HIGHEST_KEY, {<<255, 255, 255, 255>>}).
+-define(LOWEST(A, B), (if A < B -> A; true -> B end)).
+-define(HIGHEST(A, B), (if A > B -> A; true -> B end)).
+
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
+
+
+get_local_purge_doc_id(Sig) ->
+ ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig).
+
+
+get_value_from_options(Key, Options) ->
+ case couch_util:get_value(Key, Options) of
+ undefined ->
+ Reason = <<"'", Key/binary, "' must exists in options.">>,
+ throw({bad_request, Reason});
+ Value -> Value
+ end.
+
+
+verify_view_filename(FileName) ->
+ FilePathList = filename:split(FileName),
+ PureFN = lists:last(FilePathList),
+ case filename:extension(PureFN) of
+ ".view" ->
+ Sig = filename:basename(PureFN),
+ case [Ch || Ch <- Sig, not (((Ch >= $0) and (Ch =< $9))
+ orelse ((Ch >= $a) and (Ch =< $f))
+ orelse ((Ch >= $A) and (Ch =< $F)))] == [] of
+ true -> true;
+ false -> false
+ end;
+ _ -> false
+ end.
+
+get_signature_from_filename(FileName) ->
+ FilePathList = filename:split(FileName),
+ PureFN = lists:last(FilePathList),
+ filename:basename(PureFN, ".view").
+
+get_view(Db, DDoc, ViewName, Args0) ->
+ case get_view_index_state(Db, DDoc, ViewName, Args0) of
+ {ok, State, Args2} ->
+ Ref = erlang:monitor(process, State#mrst.fd),
+ #mrst{language=Lang, views=Views} = State,
+ {Type, View, Args3} = extract_view(Lang, Args2, ViewName, Views),
+ check_range(Args3, view_cmp(View)),
+ Sig = view_sig(Db, State, View, Args3),
+ {ok, {Type, View, Ref}, Sig, Args3};
+ ddoc_updated ->
+ ddoc_updated
+ end.
+
+
+get_view_index_pid(Db, DDoc, ViewName, Args0) ->
+ ArgCheck = fun(InitState) ->
+ Args1 = set_view_type(Args0, ViewName, InitState#mrst.views),
+ {ok, validate_args(InitState, Args1)}
+ end,
+ couch_index_server:get_index(?MOD, Db, DDoc, ArgCheck).
+
+
+get_view_index_state(Db, DDoc, ViewName, Args0) ->
+ get_view_index_state(Db, DDoc, ViewName, Args0, ?GET_VIEW_RETRY_COUNT).
+
+get_view_index_state(_, DDoc, _, _, RetryCount) when RetryCount < 0 ->
+ couch_log:warning("DDoc '~s' recreated too frequently", [DDoc#doc.id]),
+ throw({get_view_state, exceeded_retry_count});
+get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount) ->
+ try
+ {ok, Pid, Args} = get_view_index_pid(Db, DDoc, ViewName, Args0),
+ UpdateSeq = couch_util:with_db(Db, fun(WDb) ->
+ couch_db:get_update_seq(WDb)
+ end),
+ State = case Args#mrargs.update of
+ lazy ->
+ spawn(fun() ->
+ catch couch_index:get_state(Pid, UpdateSeq)
+ end),
+ couch_index:get_state(Pid, 0);
+ false ->
+ couch_index:get_state(Pid, 0);
+ _ ->
+ couch_index:get_state(Pid, UpdateSeq)
+ end,
+ case State of
+ {ok, State0} -> {ok, State0, Args};
+ ddoc_updated -> ddoc_updated;
+ Else -> throw(Else)
+ end
+ catch
+ exit:{Reason, _} when Reason == noproc; Reason == normal ->
+ timer:sleep(?GET_VIEW_RETRY_DELAY),
+ get_view_index_state(Db, DDoc, ViewName, Args0, RetryCount - 1);
+ error:{badmatch, Error} ->
+ throw(Error);
+ Error ->
+ throw(Error)
+ end.
+
+
+ddoc_to_mrst(DbName, #doc{id=Id, body={Fields}}) ->
+ MakeDict = fun({Name, {MRFuns}}, DictBySrcAcc) ->
+ case couch_util:get_value(<<"map">>, MRFuns) of
+ MapSrc when MapSrc /= undefined ->
+ RedSrc = couch_util:get_value(<<"reduce">>, MRFuns, null),
+ {ViewOpts} = couch_util:get_value(<<"options">>, MRFuns, {[]}),
+ View = case dict:find({MapSrc, ViewOpts}, DictBySrcAcc) of
+ {ok, View0} -> View0;
+ error -> #mrview{def=MapSrc, options=ViewOpts}
+ end,
+ {MapNames, RedSrcs} = case RedSrc of
+ null ->
+ MNames = [Name | View#mrview.map_names],
+ {MNames, View#mrview.reduce_funs};
+ _ ->
+ RedFuns = [{Name, RedSrc} | View#mrview.reduce_funs],
+ {View#mrview.map_names, RedFuns}
+ end,
+ View2 = View#mrview{map_names=MapNames, reduce_funs=RedSrcs},
+ dict:store({MapSrc, ViewOpts}, View2, DictBySrcAcc);
+ undefined ->
+ DictBySrcAcc
+ end;
+ ({Name, Else}, DictBySrcAcc) ->
+ couch_log:error("design_doc_to_view_group ~s views ~p",
+ [Name, Else]),
+ DictBySrcAcc
+ end,
+ {DesignOpts} = proplists:get_value(<<"options">>, Fields, {[]}),
+ Partitioned = proplists:get_value(<<"partitioned">>, DesignOpts, false),
+
+ {RawViews} = couch_util:get_value(<<"views">>, Fields, {[]}),
+ BySrc = lists:foldl(MakeDict, dict:new(), RawViews),
+
+ NumViews = fun({_, View}, N) ->
+ {View#mrview{id_num=N}, N+1}
+ end,
+ {Views, _} = lists:mapfoldl(NumViews, 0, lists:sort(dict:to_list(BySrc))),
+
+ Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
+ Lib = couch_util:get_value(<<"lib">>, RawViews, {[]}),
+
+ IdxState = #mrst{
+ db_name=DbName,
+ idx_name=Id,
+ lib=Lib,
+ views=Views,
+ language=Language,
+ design_opts=DesignOpts,
+ partitioned=Partitioned
+ },
+ SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
+ {ok, IdxState#mrst{sig=couch_hash:md5_hash(term_to_binary(SigInfo))}}.
+
+
+set_view_type(_Args, _ViewName, []) ->
+ throw({not_found, missing_named_view});
+set_view_type(Args, ViewName, [View | Rest]) ->
+ RedNames = [N || {N, _} <- View#mrview.reduce_funs],
+ case lists:member(ViewName, RedNames) of
+ true ->
+ case Args#mrargs.reduce of
+ false -> Args#mrargs{view_type=map};
+ _ -> Args#mrargs{view_type=red}
+ end;
+ false ->
+ case lists:member(ViewName, View#mrview.map_names) of
+ true -> Args#mrargs{view_type=map};
+ false -> set_view_type(Args, ViewName, Rest)
+ end
+ end.
+
+
+set_extra(#mrargs{} = Args, Key, Value) ->
+ Extra0 = Args#mrargs.extra,
+ Extra1 = lists:ukeysort(1, [{Key, Value} | Extra0]),
+ Args#mrargs{extra = Extra1}.
+
+
+get_extra(#mrargs{} = Args, Key) ->
+ couch_util:get_value(Key, Args#mrargs.extra).
+
+get_extra(#mrargs{} = Args, Key, Default) ->
+ couch_util:get_value(Key, Args#mrargs.extra, Default).
+
+
+extract_view(_Lang, _Args, _ViewName, []) ->
+ throw({not_found, missing_named_view});
+extract_view(Lang, #mrargs{view_type=map}=Args, Name, [View | Rest]) ->
+ Names = View#mrview.map_names ++ [N || {N, _} <- View#mrview.reduce_funs],
+ case lists:member(Name, Names) of
+ true -> {map, View, Args};
+ _ -> extract_view(Lang, Args, Name, Rest)
+ end;
+extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) ->
+ RedNames = [N || {N, _} <- View#mrview.reduce_funs],
+ case lists:member(Name, RedNames) of
+ true -> {red, {index_of(Name, RedNames), Lang, View}, Args};
+ false -> extract_view(Lang, Args, Name, Rest)
+ end.
+
+
+view_sig(Db, State, View, #mrargs{include_docs=true}=Args) ->
+ BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}),
+ UpdateSeq = couch_db:get_update_seq(Db),
+ PurgeSeq = couch_db:get_purge_seq(Db),
+ Term = view_sig_term(BaseSig, UpdateSeq, PurgeSeq),
+ couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term)));
+view_sig(Db, State, {_Nth, _Lang, View}, Args) ->
+ view_sig(Db, State, View, Args);
+view_sig(_Db, State, View, Args0) ->
+ Sig = State#mrst.sig,
+ UpdateSeq = View#mrview.update_seq,
+ PurgeSeq = View#mrview.purge_seq,
+ Args = Args0#mrargs{
+ preflight_fun=undefined,
+ extra=[]
+ },
+ Term = view_sig_term(Sig, UpdateSeq, PurgeSeq, Args),
+ couch_index_util:hexsig(couch_hash:md5_hash(term_to_binary(Term))).
+
+view_sig_term(BaseSig, UpdateSeq, PurgeSeq) ->
+ {BaseSig, UpdateSeq, PurgeSeq}.
+
+view_sig_term(BaseSig, UpdateSeq, PurgeSeq, Args) ->
+ {BaseSig, UpdateSeq, PurgeSeq, Args}.
+
+
+init_state(Db, Fd, #mrst{views=Views}=State, nil) ->
+ PurgeSeq = couch_db:get_purge_seq(Db),
+ Header = #mrheader{
+ seq=0,
+ purge_seq=PurgeSeq,
+ id_btree_state=nil,
+ view_states=[make_view_state(#mrview{}) || _ <- Views]
+ },
+ init_state(Db, Fd, State, Header);
+init_state(Db, Fd, State, Header) ->
+ #mrst{
+ language=Lang,
+ views=Views
+ } = State,
+ #mrheader{
+ seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree_state=IdBtreeState,
+ view_states=ViewStates
+ } = maybe_update_header(Header),
+
+ IdBtOpts = [
+ {compression, couch_compress:get_compression_method()}
+ ],
+ {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts),
+
+ OpenViewFun = fun(St, View) -> open_view(Db, Fd, Lang, St, View) end,
+ Views2 = lists:zipwith(OpenViewFun, ViewStates, Views),
+
+ State#mrst{
+ fd=Fd,
+ fd_monitor=erlang:monitor(process, Fd),
+ update_seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree=IdBtree,
+ views=Views2
+ }.
+
+open_view(_Db, Fd, Lang, ViewState, View) ->
+ ReduceFun = make_reduce_fun(Lang, View#mrview.reduce_funs),
+ LessFun = maybe_define_less_fun(View),
+ Compression = couch_compress:get_compression_method(),
+ BTState = get_key_btree_state(ViewState),
+ ViewBtOpts = [
+ {less, LessFun},
+ {reduce, ReduceFun},
+ {compression, Compression}
+ ],
+ {ok, Btree} = couch_btree:open(BTState, Fd, ViewBtOpts),
+
+ View#mrview{btree=Btree,
+ update_seq=get_update_seq(ViewState),
+ purge_seq=get_purge_seq(ViewState)}.
+
+
+temp_view_to_ddoc({Props}) ->
+ Language = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ Options = couch_util:get_value(<<"options">>, Props, {[]}),
+ View0 = [{<<"map">>, couch_util:get_value(<<"map">>, Props)}],
+ View1 = View0 ++ case couch_util:get_value(<<"reduce">>, Props) of
+ RedSrc when is_binary(RedSrc) -> [{<<"reduce">>, RedSrc}];
+ _ -> []
+ end,
+ DDoc = {[
+ {<<"_id">>, couch_uuids:random()},
+ {<<"language">>, Language},
+ {<<"options">>, Options},
+ {<<"views">>, {[
+ {<<"temp">>, {View1}}
+ ]}}
+ ]},
+ couch_doc:from_json_obj(DDoc).
+
+
+get_row_count(#mrview{btree=Bt}) ->
+ Count = case couch_btree:full_reduce(Bt) of
+ {ok, {Count0, _Reds, _}} -> Count0;
+ {ok, {Count0, _Reds}} -> Count0
+ end,
+ {ok, Count}.
+
+
+all_docs_reduce_to_count(Reductions) ->
+ Reduce = fun couch_bt_engine:id_tree_reduce/2,
+ {Count, _, _} = couch_btree:final_reduce(Reduce, Reductions),
+ Count.
+
+reduce_to_count(nil) ->
+ 0;
+reduce_to_count(Reductions) ->
+ CountReduceFun = fun count_reduce/2,
+ FinalReduction = couch_btree:final_reduce(CountReduceFun, Reductions),
+ get_count(FinalReduction).
+
+
+fold(#mrview{btree=Bt}, Fun, Acc, Opts) ->
+ WrapperFun = fun(KV, Reds, Acc2) ->
+ fold_fun(Fun, expand_dups([KV], []), Reds, Acc2)
+ end,
+ {ok, _LastRed, _Acc} = couch_btree:fold(Bt, WrapperFun, Acc, Opts).
+
+fold_fun(_Fun, [], _, Acc) ->
+ {ok, Acc};
+fold_fun(Fun, [KV|Rest], {KVReds, Reds}, Acc) ->
+ case Fun(KV, {KVReds, Reds}, Acc) of
+ {ok, Acc2} ->
+ fold_fun(Fun, Rest, {[KV|KVReds], Reds}, Acc2);
+ {stop, Acc2} ->
+ {stop, Acc2}
+ end.
+
+
+fold_reduce({NthRed, Lang, View}, Fun, Acc, Options) ->
+ #mrview{
+ btree=Bt,
+ reduce_funs=RedFuns
+ } = View,
+
+ ReduceFun = make_user_reds_reduce_fun(Lang, RedFuns, NthRed),
+
+ WrapperFun = fun({GroupedKey, _}, PartialReds, Acc0) ->
+ FinalReduction = couch_btree:final_reduce(ReduceFun, PartialReds),
+ UserReductions = get_user_reds(FinalReduction),
+ Fun(GroupedKey, lists:nth(NthRed, UserReductions), Acc0)
+ end,
+
+ couch_btree:fold_reduce(Bt, WrapperFun, Acc, Options).
+
+
+validate_args(Db, DDoc, Args0) ->
+ {ok, State} = couch_mrview_index:init(Db, DDoc),
+ Args1 = apply_limit(State#mrst.partitioned, Args0),
+ validate_args(State, Args1).
+
+
+validate_args(#mrst{} = State, Args0) ->
+ Args = validate_args(Args0),
+
+ ViewPartitioned = State#mrst.partitioned,
+ Partition = get_extra(Args, partition),
+
+ case {ViewPartitioned, Partition} of
+ {true, undefined} ->
+ Msg1 = <<"`partition` parameter is mandatory "
+ "for queries to this view.">>,
+ mrverror(Msg1);
+ {true, _} ->
+ apply_partition(Args, Partition);
+ {false, undefined} ->
+ Args;
+ {false, Value} when is_binary(Value) ->
+ Msg2 = <<"`partition` parameter is not "
+ "supported in this design doc">>,
+ mrverror(Msg2)
+ end.
+
+
+apply_limit(ViewPartitioned, Args) ->
+ LimitType = case ViewPartitioned of
+ true -> "partition_query_limit";
+ false -> "query_limit"
+ end,
+
+ MaxLimit = config:get_integer("query_server_config",
+ LimitType, ?MAX_VIEW_LIMIT),
+
+ % Set the highest limit possible if a user has not
+ % specified a limit
+ Args1 = case Args#mrargs.limit == ?MAX_VIEW_LIMIT of
+ true -> Args#mrargs{limit = MaxLimit};
+ false -> Args
+ end,
+
+ if Args1#mrargs.limit =< MaxLimit -> Args1; true ->
+ Fmt = "Limit is too large, must not exceed ~p",
+ mrverror(io_lib:format(Fmt, [MaxLimit]))
+ end.
+
+
+validate_all_docs_args(Db, Args0) ->
+ Args = validate_args(Args0),
+
+ DbPartitioned = couch_db:is_partitioned(Db),
+ Partition = get_extra(Args, partition),
+
+ case {DbPartitioned, Partition} of
+ {false, <<_/binary>>} ->
+ mrverror(<<"`partition` parameter is not supported on this db">>);
+ {_, <<_/binary>>} ->
+ Args1 = apply_limit(true, Args),
+ apply_all_docs_partition(Args1, Partition);
+ _ ->
+ Args
+ end.
+
+
+validate_args(Args) ->
+ GroupLevel = determine_group_level(Args),
+ Reduce = Args#mrargs.reduce,
+ case Reduce == undefined orelse is_boolean(Reduce) of
+ true -> ok;
+ _ -> mrverror(<<"Invalid `reduce` value.">>)
+ end,
+
+ case {Args#mrargs.view_type, Reduce} of
+ {map, true} -> mrverror(<<"Reduce is invalid for map-only views.">>);
+ _ -> ok
+ end,
+
+ case {Args#mrargs.view_type, GroupLevel, Args#mrargs.keys} of
+ {red, exact, _} -> ok;
+ {red, _, KeyList} when is_list(KeyList) ->
+ Msg = <<"Multi-key fetchs for reduce views must use `group=true`">>,
+ mrverror(Msg);
+ _ -> ok
+ end,
+
+ case Args#mrargs.keys of
+ Keys when is_list(Keys) -> ok;
+ undefined -> ok;
+ _ -> mrverror(<<"`keys` must be an array of strings.">>)
+ end,
+
+ case {Args#mrargs.keys, Args#mrargs.start_key,
+ Args#mrargs.end_key} of
+ {undefined, _, _} -> ok;
+ {[], _, _} -> ok;
+ {[_|_], undefined, undefined} -> ok;
+ _ -> mrverror(<<"`keys` is incompatible with `key`"
+ ", `start_key` and `end_key`">>)
+ end,
+
+ case Args#mrargs.start_key_docid of
+ undefined -> ok;
+ SKDocId0 when is_binary(SKDocId0) -> ok;
+ _ -> mrverror(<<"`start_key_docid` must be a string.">>)
+ end,
+
+ case Args#mrargs.end_key_docid of
+ undefined -> ok;
+ EKDocId0 when is_binary(EKDocId0) -> ok;
+ _ -> mrverror(<<"`end_key_docid` must be a string.">>)
+ end,
+
+ case Args#mrargs.direction of
+ fwd -> ok;
+ rev -> ok;
+ _ -> mrverror(<<"Invalid direction.">>)
+ end,
+
+ case {Args#mrargs.limit >= 0, Args#mrargs.limit == undefined} of
+ {true, _} -> ok;
+ {_, true} -> ok;
+ _ -> mrverror(<<"`limit` must be a positive integer.">>)
+ end,
+
+ case Args#mrargs.skip < 0 of
+ true -> mrverror(<<"`skip` must be >= 0">>);
+ _ -> ok
+ end,
+
+ case {Args#mrargs.view_type, GroupLevel} of
+ {red, exact} -> ok;
+ {_, 0} -> ok;
+ {red, Int} when is_integer(Int), Int >= 0 -> ok;
+ {red, _} -> mrverror(<<"`group_level` must be >= 0">>);
+ {map, _} -> mrverror(<<"Invalid use of grouping on a map view.">>)
+ end,
+
+ case Args#mrargs.stable of
+ true -> ok;
+ false -> ok;
+ _ -> mrverror(<<"Invalid value for `stable`.">>)
+ end,
+
+ case Args#mrargs.update of
+ true -> ok;
+ false -> ok;
+ lazy -> ok;
+ _ -> mrverror(<<"Invalid value for `update`.">>)
+ end,
+
+ case is_boolean(Args#mrargs.inclusive_end) of
+ true -> ok;
+ _ -> mrverror(<<"Invalid value for `inclusive_end`.">>)
+ end,
+
+ case {Args#mrargs.view_type, Args#mrargs.include_docs} of
+ {red, true} -> mrverror(<<"`include_docs` is invalid for reduce">>);
+ {_, ID} when is_boolean(ID) -> ok;
+ _ -> mrverror(<<"Invalid value for `include_docs`">>)
+ end,
+
+ case {Args#mrargs.view_type, Args#mrargs.conflicts} of
+ {_, undefined} -> ok;
+ {map, V} when is_boolean(V) -> ok;
+ {red, undefined} -> ok;
+ {map, _} -> mrverror(<<"Invalid value for `conflicts`.">>);
+ {red, _} -> mrverror(<<"`conflicts` is invalid for reduce views.">>)
+ end,
+
+ SKDocId = case {Args#mrargs.direction, Args#mrargs.start_key_docid} of
+ {fwd, undefined} -> <<>>;
+ {rev, undefined} -> <<255>>;
+ {_, SKDocId1} -> SKDocId1
+ end,
+
+ EKDocId = case {Args#mrargs.direction, Args#mrargs.end_key_docid} of
+ {fwd, undefined} -> <<255>>;
+ {rev, undefined} -> <<>>;
+ {_, EKDocId1} -> EKDocId1
+ end,
+
+ case is_boolean(Args#mrargs.sorted) of
+ true -> ok;
+ _ -> mrverror(<<"Invalid value for `sorted`.">>)
+ end,
+
+ case get_extra(Args, partition) of
+ undefined -> ok;
+ Partition when is_binary(Partition), Partition /= <<>> -> ok;
+ _ -> mrverror(<<"Invalid value for `partition`.">>)
+ end,
+
+ Args#mrargs{
+ start_key_docid=SKDocId,
+ end_key_docid=EKDocId,
+ group_level=GroupLevel
+ }.
+
+
+determine_group_level(#mrargs{group=undefined, group_level=undefined}) ->
+ 0;
+determine_group_level(#mrargs{group=false, group_level=undefined}) ->
+ 0;
+determine_group_level(#mrargs{group=false, group_level=Level}) when Level > 0 ->
+ mrverror(<<"Can't specify group=false and group_level>0 at the same time">>);
+determine_group_level(#mrargs{group=true, group_level=undefined}) ->
+ exact;
+determine_group_level(#mrargs{group_level=GroupLevel}) ->
+ GroupLevel.
+
+apply_partition(#mrargs{keys=[{p, _, _} | _]} = Args, _Partition) ->
+ Args; % already applied
+
+apply_partition(#mrargs{keys=Keys} = Args, Partition) when Keys /= undefined ->
+ Args#mrargs{keys=[{p, Partition, K} || K <- Keys]};
+
+apply_partition(#mrargs{start_key={p, _, _}, end_key={p, _, _}} = Args, _Partition) ->
+ Args; % already applied.
+
+apply_partition(Args, Partition) ->
+ #mrargs{
+ direction = Dir,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+
+ {DefSK, DefEK} = case Dir of
+ fwd -> {?LOWEST_KEY, ?HIGHEST_KEY};
+ rev -> {?HIGHEST_KEY, ?LOWEST_KEY}
+ end,
+
+ SK0 = if StartKey /= undefined -> StartKey; true -> DefSK end,
+ EK0 = if EndKey /= undefined -> EndKey; true -> DefEK end,
+
+ Args#mrargs{
+ start_key = {p, Partition, SK0},
+ end_key = {p, Partition, EK0}
+ }.
+
+%% all_docs is special as it's not really a view and is already
+%% effectively partitioned as the partition is a prefix of all keys.
+apply_all_docs_partition(#mrargs{} = Args, Partition) ->
+ #mrargs{
+ direction = Dir,
+ start_key = StartKey,
+ end_key = EndKey
+ } = Args,
+
+ {DefSK, DefEK} = case Dir of
+ fwd ->
+ {
+ couch_partition:start_key(Partition),
+ couch_partition:end_key(Partition)
+ };
+ rev ->
+ {
+ couch_partition:end_key(Partition),
+ couch_partition:start_key(Partition)
+ }
+ end,
+
+ SK0 = if StartKey == undefined -> DefSK; true -> StartKey end,
+ EK0 = if EndKey == undefined -> DefEK; true -> EndKey end,
+
+ {SK1, EK1} = case Dir of
+ fwd -> {?HIGHEST(DefSK, SK0), ?LOWEST(DefEK, EK0)};
+ rev -> {?LOWEST(DefSK, SK0), ?HIGHEST(DefEK, EK0)}
+ end,
+
+ Args#mrargs{
+ start_key = SK1,
+ end_key = EK1
+ }.
+
+
+check_range(#mrargs{start_key=undefined}, _Cmp) ->
+ ok;
+check_range(#mrargs{end_key=undefined}, _Cmp) ->
+ ok;
+check_range(#mrargs{start_key=K, end_key=K}, _Cmp) ->
+ ok;
+check_range(Args, Cmp) ->
+ #mrargs{
+ direction=Dir,
+ start_key=SK,
+ start_key_docid=SKD,
+ end_key=EK,
+ end_key_docid=EKD
+ } = Args,
+ case {Dir, Cmp({SK, SKD}, {EK, EKD})} of
+ {fwd, false} ->
+ throw({query_parse_error,
+ <<"No rows can match your key range, reverse your ",
+ "start_key and end_key or set descending=true">>});
+ {rev, true} ->
+ throw({query_parse_error,
+ <<"No rows can match your key range, reverse your ",
+ "start_key and end_key or set descending=false">>});
+ _ -> ok
+ end.
+
+
+view_cmp({_Nth, _Lang, View}) ->
+ view_cmp(View);
+view_cmp(View) ->
+ fun(A, B) -> couch_btree:less(View#mrview.btree, A, B) end.
+
+
+make_header(State) ->
+ #mrst{
+ update_seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree=IdBtree,
+ views=Views
+ } = State,
+
+ #mrheader{
+ seq=Seq,
+ purge_seq=PurgeSeq,
+ id_btree_state=get_btree_state(IdBtree),
+ view_states=[make_view_state(V) || V <- Views]
+ }.
+
+
+index_file(DbName, Sig) ->
+ FileName = couch_index_util:hexsig(Sig) ++ ".view",
+ couch_index_util:index_file(mrview, DbName, FileName).
+
+
+compaction_file(DbName, Sig) ->
+ FileName = couch_index_util:hexsig(Sig) ++ ".compact.view",
+ couch_index_util:index_file(mrview, DbName, FileName).
+
+
+open_file(FName) ->
+ case couch_file:open(FName, [nologifmissing]) of
+ {ok, Fd} -> {ok, Fd};
+ {error, enoent} -> couch_file:open(FName, [create]);
+ Error -> Error
+ end.
+
+
+delete_files(DbName, Sig) ->
+ delete_index_file(DbName, Sig),
+ delete_compaction_file(DbName, Sig).
+
+
+delete_index_file(DbName, Sig) ->
+ delete_file(index_file(DbName, Sig)).
+
+
+delete_compaction_file(DbName, Sig) ->
+ delete_file(compaction_file(DbName, Sig)).
+
+
+delete_file(FName) ->
+ case filelib:is_file(FName) of
+ true ->
+ RootDir = couch_index_util:root_dir(),
+ couch_file:delete(RootDir, FName);
+ _ ->
+ ok
+ end.
+
+
+reset_index(Db, Fd, #mrst{sig=Sig}=State) ->
+ ok = couch_file:truncate(Fd, 0),
+ ok = couch_file:write_header(Fd, {Sig, nil}),
+ init_state(Db, Fd, reset_state(State), nil).
+
+
+reset_state(State) ->
+ State#mrst{
+ fd=nil,
+ qserver=nil,
+ update_seq=0,
+ id_btree=nil,
+ views=[View#mrview{btree=nil} || View <- State#mrst.views]
+ }.
+
+
+all_docs_key_opts(#mrargs{extra = Extra} = Args) ->
+ all_docs_key_opts(Args, Extra).
+
+all_docs_key_opts(#mrargs{keys=undefined}=Args, Extra) ->
+ all_docs_key_opts(Args#mrargs{keys=[]}, Extra);
+all_docs_key_opts(#mrargs{keys=[], direction=Dir}=Args, Extra) ->
+ [[{dir, Dir}] ++ ad_skey_opts(Args) ++ ad_ekey_opts(Args) ++ Extra];
+all_docs_key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) ->
+ lists:map(fun(K) ->
+ [{dir, Dir}]
+ ++ ad_skey_opts(Args#mrargs{start_key=K})
+ ++ ad_ekey_opts(Args#mrargs{end_key=K})
+ ++ Extra
+ end, Keys).
+
+
+ad_skey_opts(#mrargs{start_key=SKey}) when is_binary(SKey) ->
+ [{start_key, SKey}];
+ad_skey_opts(#mrargs{start_key_docid=SKeyDocId}) ->
+ [{start_key, SKeyDocId}].
+
+
+ad_ekey_opts(#mrargs{end_key=EKey}=Args) when is_binary(EKey) ->
+ Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end,
+ [{Type, EKey}];
+ad_ekey_opts(#mrargs{end_key_docid=EKeyDocId}=Args) ->
+ Type = if Args#mrargs.inclusive_end -> end_key; true -> end_key_gt end,
+ [{Type, EKeyDocId}].
+
+
+key_opts(Args) ->
+ key_opts(Args, []).
+
+key_opts(#mrargs{keys=undefined, direction=Dir}=Args, Extra) ->
+ [[{dir, Dir}] ++ skey_opts(Args) ++ ekey_opts(Args) ++ Extra];
+key_opts(#mrargs{keys=Keys, direction=Dir}=Args, Extra) ->
+ lists:map(fun(K) ->
+ [{dir, Dir}]
+ ++ skey_opts(Args#mrargs{start_key=K})
+ ++ ekey_opts(Args#mrargs{end_key=K})
+ ++ Extra
+ end, Keys).
+
+
+skey_opts(#mrargs{start_key=undefined}) ->
+ [];
+skey_opts(#mrargs{start_key=SKey, start_key_docid=SKeyDocId}) ->
+ [{start_key, {SKey, SKeyDocId}}].
+
+
+ekey_opts(#mrargs{end_key=undefined}) ->
+ [];
+ekey_opts(#mrargs{end_key=EKey, end_key_docid=EKeyDocId}=Args) ->
+ case Args#mrargs.inclusive_end of
+ true -> [{end_key, {EKey, EKeyDocId}}];
+ false -> [{end_key_gt, {EKey, reverse_key_default(EKeyDocId)}}]
+ end.
+
+
+reverse_key_default(<<>>) -> <<255>>;
+reverse_key_default(<<255>>) -> <<>>;
+reverse_key_default(Key) -> Key.
+
+
+reduced_external_size(Tree) ->
+ case couch_btree:full_reduce(Tree) of
+ {ok, {_, _, Size}} -> Size;
+ % return 0 for versions of the reduce function without Size
+ {ok, {_, _}} -> 0
+ end.
+
+
+calculate_external_size(Views) ->
+ SumFun = fun
+ (#mrview{btree=nil}, Acc) ->
+ Acc;
+ (#mrview{btree=Bt}, Acc) ->
+ Acc + reduced_external_size(Bt)
+ end,
+ {ok, lists:foldl(SumFun, 0, Views)}.
+
+
+calculate_active_size(Views) ->
+ FoldFun = fun
+ (#mrview{btree=nil}, Acc) ->
+ Acc;
+ (#mrview{btree=Bt}, Acc) ->
+ Acc + couch_btree:size(Bt)
+ end,
+ {ok, lists:foldl(FoldFun, 0, Views)}.
+
+
+detuple_kvs([], Acc) ->
+ lists:reverse(Acc);
+detuple_kvs([KV | Rest], Acc) ->
+ {{Key,Id},Value} = KV,
+ NKV = [[Key, Id], Value],
+ detuple_kvs(Rest, [NKV | Acc]).
+
+
+expand_dups([], Acc) ->
+ lists:reverse(Acc);
+expand_dups([{Key, {dups, Vals}} | Rest], Acc) ->
+ Expanded = [{Key, Val} || Val <- Vals],
+ expand_dups(Rest, Expanded ++ Acc);
+expand_dups([KV | Rest], Acc) ->
+ expand_dups(Rest, [KV | Acc]).
+
+
+maybe_load_doc(_Db, _DI, #mrargs{include_docs=false}) ->
+ [];
+maybe_load_doc(Db, #doc_info{}=DI, #mrargs{conflicts=true, doc_options=Opts}) ->
+ doc_row(couch_index_util:load_doc(Db, DI, [conflicts]), Opts);
+maybe_load_doc(Db, #doc_info{}=DI, #mrargs{doc_options=Opts}) ->
+ doc_row(couch_index_util:load_doc(Db, DI, []), Opts).
+
+
+maybe_load_doc(_Db, _Id, _Val, #mrargs{include_docs=false}) ->
+ [];
+maybe_load_doc(Db, Id, Val, #mrargs{conflicts=true, doc_options=Opts}) ->
+ doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), [conflicts]), Opts);
+maybe_load_doc(Db, Id, Val, #mrargs{doc_options=Opts}) ->
+ doc_row(couch_index_util:load_doc(Db, docid_rev(Id, Val), []), Opts).
+
+
+doc_row(null, _Opts) ->
+ [{doc, null}];
+doc_row(Doc, Opts) ->
+ [{doc, couch_doc:to_json_obj(Doc, Opts)}].
+
+
+docid_rev(Id, {Props}) ->
+ DocId = couch_util:get_value(<<"_id">>, Props, Id),
+ Rev = case couch_util:get_value(<<"_rev">>, Props, nil) of
+ nil -> nil;
+ Rev0 -> couch_doc:parse_rev(Rev0)
+ end,
+ {DocId, Rev};
+docid_rev(Id, _) ->
+ {Id, nil}.
+
+
+index_of(Key, List) ->
+ index_of(Key, List, 1).
+
+
+index_of(_, [], _) ->
+ throw({error, missing_named_view});
+index_of(Key, [Key | _], Idx) ->
+ Idx;
+index_of(Key, [_ | Rest], Idx) ->
+ index_of(Key, Rest, Idx+1).
+
+
+mrverror(Mesg) ->
+ throw({query_parse_error, Mesg}).
+
+
+%% Updates 2.x view files to 3.x or later view files
+%% transparently, the first time the 2.x view file is opened by
+%% 3.x or later.
+%%
+%% Here's how it works:
+%%
+%% Before opening a view index,
+%% If no matching index file is found in the new location:
+%% calculate the <= 2.x view signature
+%% if a file with that signature lives in the old location
+%% rename it to the new location with the new signature in the name.
+%% Then proceed to open the view index as usual.
+
+maybe_update_index_file(State) ->
+ DbName = State#mrst.db_name,
+ NewIndexFile = index_file(DbName, State#mrst.sig),
+ % open in read-only mode so we don't create
+ % the file if it doesn't exist.
+ case file:open(NewIndexFile, [read, raw]) of
+ {ok, Fd_Read} ->
+ % the new index file exists, there is nothing to do here.
+ file:close(Fd_Read);
+ _Error ->
+ update_index_file(State)
+ end.
+
+update_index_file(State) ->
+ Sig = sig_vsn_2x(State),
+ DbName = State#mrst.db_name,
+ FileName = couch_index_util:hexsig(Sig) ++ ".view",
+ IndexFile = couch_index_util:index_file("mrview", DbName, FileName),
+
+ % If we have an old index, rename it to the new position.
+ case file:read_file_info(IndexFile) of
+ {ok, _FileInfo} ->
+ % Crash if the rename fails for any reason.
+ % If the target exists, e.g. the next request will find the
+ % new file and we are good. We might need to catch this
+ % further up to avoid a full server crash.
+ NewIndexFile = index_file(DbName, State#mrst.sig),
+ couch_log:notice("Attempting to update legacy view index file"
+ " from ~p to ~s", [IndexFile, NewIndexFile]),
+ ok = filelib:ensure_dir(NewIndexFile),
+ ok = file:rename(IndexFile, NewIndexFile),
+ couch_log:notice("Successfully updated legacy view index file"
+ " ~s", [IndexFile]),
+ Sig;
+ {error, enoent} ->
+ % Ignore missing index file
+ ok;
+ {error, Reason} ->
+ couch_log:error("Failed to update legacy view index file"
+ " ~s : ~s", [IndexFile, file:format_error(Reason)]),
+ ok
+ end.
+
+sig_vsn_2x(State) ->
+ #mrst{
+ lib = Lib,
+ language = Language,
+ design_opts = DesignOpts
+ } = State,
+ SI = proplists:get_value(<<"seq_indexed">>, DesignOpts, false),
+ KSI = proplists:get_value(<<"keyseq_indexed">>, DesignOpts, false),
+ Views = [old_view_format(V, SI, KSI) || V <- State#mrst.views],
+ SigInfo = {Views, Language, DesignOpts, couch_index_util:sort_lib(Lib)},
+ couch_hash:md5_hash(term_to_binary(SigInfo)).
+
+old_view_format(View, SI, KSI) ->
+{
+ mrview,
+ View#mrview.id_num,
+ View#mrview.update_seq,
+ View#mrview.purge_seq,
+ View#mrview.map_names,
+ View#mrview.reduce_funs,
+ View#mrview.def,
+ View#mrview.btree,
+ nil,
+ nil,
+ SI,
+ KSI,
+ View#mrview.options
+}.
+
+maybe_update_header(#mrheader{} = Header) ->
+ Header;
+maybe_update_header(Header) when tuple_size(Header) == 6 ->
+ #mrheader{
+ seq = element(2, Header),
+ purge_seq = element(3, Header),
+ id_btree_state = element(4, Header),
+ view_states = [make_view_state(S) || S <- element(6, Header)]
+ }.
+
+%% End of <= 2.x upgrade code.
+
+make_view_state(#mrview{} = View) ->
+ BTState = get_btree_state(View#mrview.btree),
+ {
+ BTState,
+ View#mrview.update_seq,
+ View#mrview.purge_seq
+ };
+make_view_state({BTState, _SeqBTState, _KSeqBTState, UpdateSeq, PurgeSeq}) ->
+ {BTState, UpdateSeq, PurgeSeq};
+make_view_state(nil) ->
+ {nil, 0, 0}.
+
+
+get_key_btree_state(ViewState) ->
+ element(1, ViewState).
+
+get_update_seq(ViewState) ->
+ element(2, ViewState).
+
+get_purge_seq(ViewState) ->
+ element(3, ViewState).
+
+get_count(Reduction) ->
+ element(1, Reduction).
+
+get_user_reds(Reduction) ->
+ element(2, Reduction).
+
+
+% This is for backwards compatibility for seq btree reduces
+get_external_size_reds(Reduction) when is_integer(Reduction) ->
+ 0;
+
+get_external_size_reds(Reduction) when tuple_size(Reduction) == 2 ->
+ 0;
+
+get_external_size_reds(Reduction) when tuple_size(Reduction) == 3 ->
+ element(3, Reduction).
+
+
+make_reduce_fun(Lang, ReduceFuns) ->
+ FunSrcs = [FunSrc || {_, FunSrc} <- ReduceFuns],
+ fun
+ (reduce, KVs0) ->
+ KVs = detuple_kvs(expand_dups(KVs0, []), []),
+ {ok, Result} = couch_query_servers:reduce(Lang, FunSrcs, KVs),
+ ExternalSize = kv_external_size(KVs, Result),
+ {length(KVs), Result, ExternalSize};
+ (rereduce, Reds) ->
+ ExtractFun = fun(Red, {CountsAcc0, URedsAcc0, ExtAcc0}) ->
+ CountsAcc = CountsAcc0 + get_count(Red),
+ URedsAcc = lists:append(URedsAcc0, [get_user_reds(Red)]),
+ ExtAcc = ExtAcc0 + get_external_size_reds(Red),
+ {CountsAcc, URedsAcc, ExtAcc}
+ end,
+ {Counts, UReds, ExternalSize} = lists:foldl(ExtractFun,
+ {0, [], 0}, Reds),
+ {ok, Result} = couch_query_servers:rereduce(Lang, FunSrcs, UReds),
+ {Counts, Result, ExternalSize}
+ end.
+
+
+maybe_define_less_fun(#mrview{options = Options}) ->
+ case couch_util:get_value(<<"collation">>, Options) of
+ <<"raw">> -> undefined;
+ _ -> fun couch_ejson_compare:less_json_ids/2
+ end.
+
+
+count_reduce(reduce, KVs) ->
+ CountFun = fun
+ ({_, {dups, Vals}}, Acc) -> Acc + length(Vals);
+ (_, Acc) -> Acc + 1
+ end,
+ Count = lists:foldl(CountFun, 0, KVs),
+ {Count, []};
+count_reduce(rereduce, Reds) ->
+ CountFun = fun(Red, Acc) ->
+ Acc + get_count(Red)
+ end,
+ Count = lists:foldl(CountFun, 0, Reds),
+ {Count, []}.
+
+
+make_user_reds_reduce_fun(Lang, ReduceFuns, NthRed) ->
+ LPad = lists:duplicate(NthRed - 1, []),
+ RPad = lists:duplicate(length(ReduceFuns) - NthRed, []),
+ {_, FunSrc} = lists:nth(NthRed, ReduceFuns),
+ fun
+ (reduce, KVs0) ->
+ KVs = detuple_kvs(expand_dups(KVs0, []), []),
+ {ok, Result} = couch_query_servers:reduce(Lang, [FunSrc], KVs),
+ {0, LPad ++ Result ++ RPad};
+ (rereduce, Reds) ->
+ ExtractFun = fun(Reds0) ->
+ [lists:nth(NthRed, get_user_reds(Reds0))]
+ end,
+ UReds = lists:map(ExtractFun, Reds),
+ {ok, Result} = couch_query_servers:rereduce(Lang, [FunSrc], UReds),
+ {0, LPad ++ Result ++ RPad}
+ end.
+
+
+get_btree_state(nil) ->
+ nil;
+get_btree_state(#btree{} = Btree) ->
+ couch_btree:get_state(Btree).
+
+
+extract_view_reduce({red, {N, _Lang, #mrview{reduce_funs=Reds}}, _Ref}) ->
+ {_Name, FunSrc} = lists:nth(N, Reds),
+ FunSrc.
+
+
+get_view_keys({Props}) ->
+ case couch_util:get_value(<<"keys">>, Props) of
+ undefined ->
+ undefined;
+ Keys when is_list(Keys) ->
+ Keys;
+ _ ->
+ throw({bad_request, "`keys` member must be an array."})
+ end.
+
+
+get_view_queries({Props}) ->
+ case couch_util:get_value(<<"queries">>, Props) of
+ undefined ->
+ undefined;
+ Queries when is_list(Queries) ->
+ Queries;
+ _ ->
+ throw({bad_request, "`queries` member must be an array."})
+ end.
+
+
+kv_external_size(KVList, Reduction) ->
+ lists:foldl(fun([[Key, _], Value], Acc) ->
+ ?term_size(Key) + ?term_size(Value) + Acc
+ end, ?term_size(Reduction), KVList).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl.rej b/src/couch_mrview/src/couch_mrview_util.erl.rej
new file mode 100644
index 000000000..2bcf1262c
--- /dev/null
+++ b/src/couch_mrview/src/couch_mrview_util.erl.rej
@@ -0,0 +1,16 @@
+***************
+*** 20,25 ****
+ -export([index_file/2, compaction_file/2, open_file/1]).
+ -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
+ -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
+ -export([get_view_changes_count/1]).
+ -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
+ -export([fold/4, fold_reduce/4]).
+--- 20,26 ----
+ -export([index_file/2, compaction_file/2, open_file/1]).
+ -export([delete_files/2, delete_index_file/2, delete_compaction_file/2]).
+ -export([get_row_count/1, all_docs_reduce_to_count/1, reduce_to_count/1]).
++ -export([get_access_row_count/2]).
+ -export([get_view_changes_count/1]).
+ -export([all_docs_key_opts/1, all_docs_key_opts/2, key_opts/1, key_opts/2]).
+ -export([fold/4, fold_reduce/4]).