diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-08-20 12:43:14 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-09-25 09:42:45 -0500 |
commit | 41b72111b130fa0c5e04eb9b670f5d575eacb62e (patch) | |
tree | 18005f6cf095dc46244bf5040919fb27e7d84a46 | |
parent | 422416a1c620a7e403bf3bde8a43a4cb69c13086 (diff) | |
download | couchdb-41b72111b130fa0c5e04eb9b670f5d575eacb62e.tar.gz |
Initial creation of couch_js application
This commit is mostly a copy paste of the existing modules in the
`couch` application. For now I've left the build of the `couchjs`
executable in `couch/priv` to avoid having to do the work of moving the
build config over. I had contemplated just referencing the modules as
they current exist but decided this would prepare us a bit better for
when we eventually remove the old modules.
-rw-r--r-- | rebar.config.script | 1 | ||||
-rw-r--r-- | rel/reltool.config | 2 | ||||
-rw-r--r-- | src/couch_js/README.md | 6 | ||||
-rw-r--r-- | src/couch_js/src/couch_js.app.src | 27 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_app.erl | 31 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_io_logger.erl | 107 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_native_process.erl | 452 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_os_process.erl | 265 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_proc_manager.erl | 602 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_query_servers.erl | 683 | ||||
-rw-r--r-- | src/couch_js/src/couch_js_sup.erl | 45 |
11 files changed, 2221 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script index ba7b754ff..8ef1abcc8 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -79,6 +79,7 @@ SubDirs = [ "src/mem3", "src/couch_index", "src/couch_mrview", + "src/couch_js", "src/couch_replicator", "src/couch_plugins", "src/couch_pse_tests", diff --git a/rel/reltool.config b/rel/reltool.config index e2ae71c43..caeea381e 100644 --- a/rel/reltool.config +++ b/rel/reltool.config @@ -41,6 +41,7 @@ couch_replicator, couch_stats, couch_eval, + couch_js, couch_event, couch_peruser, couch_views, @@ -95,6 +96,7 @@ {app, couch, [{incl_cond, include}]}, {app, couch_epi, [{incl_cond, include}]}, {app, couch_eval, [{incl_cond, include}]}, + {app, couch_js, [{incl_cond, include}]}, {app, couch_jobs, [{incl_cond, include}]}, {app, couch_index, [{incl_cond, include}]}, {app, couch_log, [{incl_cond, include}]}, diff --git a/src/couch_js/README.md b/src/couch_js/README.md new file mode 100644 index 000000000..4084b7d8e --- /dev/null +++ b/src/couch_js/README.md @@ -0,0 +1,6 @@ +couch_js +=== + +This application is just an isolation of most of the code required for running couchjs. + +For the time being I'm not moving the implementation of couchjs due to the specifics of the build system configuration. Once we go to remove the `couch` application we'll have to revisit that approach.
\ No newline at end of file diff --git a/src/couch_js/src/couch_js.app.src b/src/couch_js/src/couch_js.app.src new file mode 100644 index 000000000..0db37b68c --- /dev/null +++ b/src/couch_js/src/couch_js.app.src @@ -0,0 +1,27 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +{application, couch_js, [ + {description, "An OTP application"}, + {vsn, git}, + {registered, [ + couch_js_proc_manager + ]}, + {mod, {couch_js_app, []}}, + {applications, [ + kernel, + stdlib, + config, + couch_log, + couch + ]} + ]}. diff --git a/src/couch_js/src/couch_js_app.erl b/src/couch_js/src/couch_js_app.erl new file mode 100644 index 000000000..b28f5852e --- /dev/null +++ b/src/couch_js/src/couch_js_app.erl @@ -0,0 +1,31 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_js_app). + + +-behaviour(application). + + +-export([ + start/2, + stop/1 +]). + + +start(_StartType, _StartArgs) -> + couch_js_sup:start_link(). + + +stop(_State) -> + ok.
\ No newline at end of file diff --git a/src/couch_js/src/couch_js_io_logger.erl b/src/couch_js/src/couch_js_io_logger.erl new file mode 100644 index 000000000..5a1695c01 --- /dev/null +++ b/src/couch_js/src/couch_js_io_logger.erl @@ -0,0 +1,107 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_js_io_logger). + +-export([ + start/1, + log_output/1, + log_input/1, + stop_noerror/0, + stop_error/1 +]). + + +start(undefined) -> + ok; +start(Dir) -> + case filelib:is_dir(Dir) of + true -> + Name = log_name(), + Path = Dir ++ "/" ++ Name, + OPath = Path ++ ".out.log_", + IPath = Path ++ ".in.log_", + {ok, OFd} = file:open(OPath, [read, write, raw]), + {ok, IFd} = file:open(IPath, [read, write, raw]), + ok = file:delete(OPath), + ok = file:delete(IPath), + put(logger_path, Path), + put(logger_out_fd, OFd), + put(logger_in_fd, IFd), + ok; + false -> + ok + end. + + +stop_noerror() -> + case get(logger_path) of + undefined -> + ok; + _Path -> + close_logs() + end. + + +stop_error(Err) -> + case get(logger_path) of + undefined -> + ok; + Path -> + save_error_logs(Path, Err), + close_logs() + end. + + +log_output(Data) -> + log(get(logger_out_fd), Data). + + +log_input(Data) -> + log(get(logger_in_fd), Data). + + +unix_time() -> + {Mega, Sec, USec} = os:timestamp(), + UnixTs = (Mega * 1000000 + Sec) * 1000000 + USec, + integer_to_list(UnixTs). + + +log_name() -> + Ts = unix_time(), + Pid0 = erlang:pid_to_list(self()), + Pid1 = string:strip(Pid0, left, $<), + Pid2 = string:strip(Pid1, right, $>), + lists:flatten(io_lib:format("~s_~s", [Ts, Pid2])). + + +close_logs() -> + file:close(get(logger_out_fd)), + file:close(get(logger_in_fd)). + + +save_error_logs(Path, Err) -> + Otp = erlang:system_info(otp_release), + Msg = io_lib:format("Error: ~p~nNode: ~p~nOTP: ~p~n", [Err, node(), Otp]), + file:write_file(Path ++ ".meta", Msg), + IFd = get(logger_out_fd), + OFd = get(logger_in_fd), + file:position(IFd, 0), + file:position(OFd, 0), + file:copy(IFd, Path ++ ".out.log"), + file:copy(OFd, Path ++ ".in.log"). + + +log(undefined, _Data) -> + ok; +log(Fd, Data) -> + ok = file:write(Fd, [Data, io_lib:nl()]). diff --git a/src/couch_js/src/couch_js_native_process.erl b/src/couch_js/src/couch_js_native_process.erl new file mode 100644 index 000000000..d2c4c1ee0 --- /dev/null +++ b/src/couch_js/src/couch_js_native_process.erl @@ -0,0 +1,452 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); +% you may not use this file except in compliance with the License. +% +% You may obtain a copy of the License at +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, +% software distributed under the License is distributed on an +% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, +% either express or implied. +% +% See the License for the specific language governing permissions +% and limitations under the License. +% +% This file drew much inspiration from erlview, which was written by and +% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0 +% +% +% This module provides the smallest possible native view-server. +% With this module in-place, you can add the following to your couch INI files: +% [native_query_servers] +% erlang={couch_native_process, start_link, []} +% +% Which will then allow following example map function to be used: +% +% fun({Doc}) -> +% % Below, we emit a single record - the _id as key, null as value +% DocId = couch_util:get_value(<<"_id">>, Doc, null), +% Emit(DocId, null) +% end. +% +% which should be roughly the same as the javascript: +% emit(doc._id, null); +% +% This module exposes enough functions such that a native erlang server can +% act as a fully-fleged view server, but no 'helper' functions specifically +% for simplifying your erlang view code. It is expected other third-party +% extensions will evolve which offer useful layers on top of this view server +% to help simplify your view code. +-module(couch_js_native_process). +-behaviour(gen_server). +-vsn(1). + +-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, + handle_info/2]). +-export([set_timeout/2, prompt/2]). + +-define(STATE, native_proc_state). +-record(evstate, { + ddocs, + funs = [], + query_config = [], + list_pid = nil, + timeout = 5000, + idle = 5000 +}). + +-include_lib("couch/include/couch_db.hrl"). + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +% this is a bit messy, see also couch_query_servers handle_info +% stop(_Pid) -> +% ok. + +set_timeout(Pid, TimeOut) -> + gen_server:call(Pid, {set_timeout, TimeOut}). + +prompt(Pid, Data) when is_list(Data) -> + gen_server:call(Pid, {prompt, Data}). + +% gen_server callbacks +init([]) -> + V = config:get("query_server_config", "os_process_idle_limit", "300"), + Idle = list_to_integer(V) * 1000, + {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}. + +handle_call({set_timeout, TimeOut}, _From, State) -> + {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle}; + +handle_call({prompt, Data}, _From, State) -> + couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), + {NewState, Resp} = try run(State, to_binary(Data)) of + {S, R} -> {S, R} + catch + throw:{error, Why} -> + {State, [<<"error">>, Why, Why]} + end, + + Idle = State#evstate.idle, + case Resp of + {error, Reason} -> + Msg = io_lib:format("couch native server error: ~p", [Reason]), + Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], + {reply, Error, NewState, Idle}; + [<<"error">> | Rest] -> + % Msg = io_lib:format("couch native server error: ~p", [Rest]), + % TODO: markh? (jan) + {reply, [<<"error">> | Rest], NewState, Idle}; + [<<"fatal">> | Rest] -> + % Msg = io_lib:format("couch native server error: ~p", [Rest]), + % TODO: markh? (jan) + {stop, fatal, [<<"error">> | Rest], NewState}; + Resp -> + {reply, Resp, NewState, Idle} + end. + +handle_cast(garbage_collect, State) -> + erlang:garbage_collect(), + {noreply, State, State#evstate.idle}; +handle_cast(stop, State) -> + {stop, normal, State}; +handle_cast(_Msg, State) -> + {noreply, State, State#evstate.idle}. + +handle_info(timeout, State) -> + gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}), + erlang:garbage_collect(), + {noreply, State, State#evstate.idle}; +handle_info({'EXIT',_,normal}, State) -> + {noreply, State, State#evstate.idle}; +handle_info({'EXIT',_,Reason}, State) -> + {stop, Reason, State}. +terminate(_Reason, _State) -> ok. +code_change(_OldVersion, State, _Extra) -> {ok, State}. + +run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> + Pid ! {self(), list_row, Row}, + receive + {Pid, chunks, Data} -> + {State, [<<"chunks">>, Data]}; + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, [<<"end">>, Data]} + after State#evstate.timeout -> + throw({timeout, list_row}) + end; +run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> + Pid ! {self(), list_end}, + Resp = + receive + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + [<<"end">>, Data] + after State#evstate.timeout -> + throw({timeout, list_end}) + end, + process_flag(trap_exit, erlang:get(do_trap)), + {State#evstate{list_pid=nil}, Resp}; +run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> + {State, [<<"error">>, list_error, list_error]}; +run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> + {#evstate{ddocs=DDocs}, true}; +run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) -> + NewState = #evstate{ + ddocs = DDocs, + query_config = QueryConfig, + idle = Idle + }, + {NewState, true}; +run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> + FunInfo = makefun(State, BinFunc), + {State#evstate{funs=Funs ++ [FunInfo]}, true}; +run(State, [<<"map_doc">> , Doc]) -> + Resp = lists:map(fun({Sig, Fun}) -> + erlang:put(Sig, []), + Fun(Doc), + lists:reverse(erlang:get(Sig)) + end, State#evstate.funs), + {State, Resp}; +run(State, [<<"reduce">>, Funs, KVs]) -> + {Keys, Vals} = + lists:foldl(fun([K, V], {KAcc, VAcc}) -> + {[K | KAcc], [V | VAcc]} + end, {[], []}, KVs), + Keys2 = lists:reverse(Keys), + Vals2 = lists:reverse(Vals), + {State, catch reduce(State, Funs, Keys2, Vals2, false)}; +run(State, [<<"rereduce">>, Funs, Vals]) -> + {State, catch reduce(State, Funs, null, Vals, true)}; +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> + DDocs2 = store_ddoc(DDocs, DDocId, DDoc), + {State#evstate{ddocs=DDocs2}, true}; +run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> + DDoc = load_ddoc(DDocs, DDocId), + ddoc(State, DDoc, Rest); +run(_, Unknown) -> + couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]), + throw({error, unknown_command}). + +ddoc(State, {DDoc}, [FunPath, Args]) -> + % load fun from the FunPath + BFun = lists:foldl(fun + (Key, {Props}) when is_list(Props) -> + couch_util:get_value(Key, Props, nil); + (_Key, Fun) when is_binary(Fun) -> + Fun; + (_Key, nil) -> + throw({error, not_found}); + (_Key, _Fun) -> + throw({error, malformed_ddoc}) + end, {DDoc}, FunPath), + ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). + +ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> + {State, (catch apply(Fun, Args))}; +ddoc(State, {_, Fun}, [<<"rewrites">>], Args) -> + {State, (catch apply(Fun, Args))}; +ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> + FilterFunWrapper = fun(Doc) -> + case catch Fun(Doc, Req) of + true -> true; + false -> false; + {'EXIT', Error} -> couch_log:error("~p", [Error]) + end + end, + Resp = lists:map(FilterFunWrapper, Docs), + {State, [true, Resp]}; +ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) -> + MapFunWrapper = fun(Doc) -> + case catch Fun(Doc) of + undefined -> true; + ok -> false; + false -> false; + [_|_] -> true; + {'EXIT', Error} -> couch_log:error("~p", [Error]) + end + end, + Resp = lists:map(MapFunWrapper, Docs), + {State, [true, Resp]}; +ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of + FunResp when is_list(FunResp) -> + FunResp; + {FunResp} -> + [<<"resp">>, {FunResp}]; + FunResp -> + FunResp + end, + {State, Resp}; +ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> + Resp = case (catch apply(Fun, Args)) of + [JsonDoc, JsonResp] -> + [<<"up">>, JsonDoc, JsonResp] + end, + {State, Resp}; +ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> + Self = self(), + SpawnFun = fun() -> + LastChunk = (catch apply(Fun, Args)), + case start_list_resp(Self, Sig) of + started -> + receive + {Self, list_row, _Row} -> ignore; + {Self, list_end} -> ignore + after State#evstate.timeout -> + throw({timeout, list_cleanup_pid}) + end; + _ -> + ok + end, + LastChunks = + case erlang:get(Sig) of + undefined -> [LastChunk]; + OtherChunks -> [LastChunk | OtherChunks] + end, + Self ! {self(), list_end, lists:reverse(LastChunks)} + end, + erlang:put(do_trap, process_flag(trap_exit, true)), + Pid = spawn_link(SpawnFun), + Resp = + receive + {Pid, start, Chunks, JsonResp} -> + [<<"start">>, Chunks, JsonResp] + after State#evstate.timeout -> + throw({timeout, list_start}) + end, + {State#evstate{list_pid=Pid}, Resp}. + +store_ddoc(DDocs, DDocId, DDoc) -> + dict:store(DDocId, DDoc, DDocs). +load_ddoc(DDocs, DDocId) -> + try dict:fetch(DDocId, DDocs) of + {DDoc} -> {DDoc} + catch + _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) + end. + +bindings(State, Sig) -> + bindings(State, Sig, nil). +bindings(State, Sig, DDoc) -> + Self = self(), + + Log = fun(Msg) -> + couch_log:info(Msg, []) + end, + + Emit = fun(Id, Value) -> + Curr = erlang:get(Sig), + erlang:put(Sig, [[Id, Value] | Curr]) + end, + + Start = fun(Headers) -> + erlang:put(list_headers, Headers) + end, + + Send = fun(Chunk) -> + Curr = + case erlang:get(Sig) of + undefined -> []; + Else -> Else + end, + erlang:put(Sig, [Chunk | Curr]) + end, + + GetRow = fun() -> + case start_list_resp(Self, Sig) of + started -> + ok; + _ -> + Chunks = + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, + Self ! {self(), chunks, lists:reverse(Chunks)} + end, + erlang:put(Sig, []), + receive + {Self, list_row, Row} -> Row; + {Self, list_end} -> nil + after State#evstate.timeout -> + throw({timeout, list_pid_getrow}) + end + end, + + FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, + + Bindings = [ + {'Log', Log}, + {'Emit', Emit}, + {'Start', Start}, + {'Send', Send}, + {'GetRow', GetRow}, + {'FoldRows', FoldRows} + ], + case DDoc of + {_Props} -> + Bindings ++ [{'DDoc', DDoc}]; + _Else -> Bindings + end. + +% thanks to erlview, via: +% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html +makefun(State, Source) -> + Sig = couch_hash:md5_hash(Source), + BindFuns = bindings(State, Sig), + {Sig, makefun(State, Source, BindFuns)}. +makefun(State, Source, {DDoc}) -> + Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])), + BindFuns = bindings(State, Sig, {DDoc}), + {Sig, makefun(State, Source, BindFuns)}; +makefun(_State, Source, BindFuns) when is_list(BindFuns) -> + FunStr = binary_to_list(Source), + {ok, Tokens, _} = erl_scan:string(FunStr), + Form = case (catch erl_parse:parse_exprs(Tokens)) of + {ok, [ParsedForm]} -> + ParsedForm; + {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> + couch_log:error("Syntax error on line: ~p~n~s~p~n", + [LineNum, Mesg, Params]), + throw(Error) + end, + Bindings = lists:foldl(fun({Name, Fun}, Acc) -> + erl_eval:add_binding(Name, Fun, Acc) + end, erl_eval:new_bindings(), BindFuns), + {value, Fun, _} = erl_eval:expr(Form, Bindings), + Fun. + +reduce(State, BinFuns, Keys, Vals, ReReduce) -> + Funs = case is_list(BinFuns) of + true -> + lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); + _ -> + [makefun(State, BinFuns)] + end, + Reds = lists:map(fun({_Sig, Fun}) -> + Fun(Keys, Vals, ReReduce) + end, Funs), + [true, Reds]. + +foldrows(GetRow, ProcRow, Acc) -> + case GetRow() of + nil -> + {ok, Acc}; + Row -> + case (catch ProcRow(Row, Acc)) of + {ok, Acc2} -> + foldrows(GetRow, ProcRow, Acc2); + {stop, Acc2} -> + {ok, Acc2} + end + end. + +start_list_resp(Self, Sig) -> + case erlang:get(list_started) of + undefined -> + Headers = + case erlang:get(list_headers) of + undefined -> {[{<<"headers">>, {[]}}]}; + CurrHdrs -> CurrHdrs + end, + Chunks = + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, + Self ! {self(), start, lists:reverse(Chunks), Headers}, + erlang:put(list_started, true), + erlang:put(Sig, []), + started; + _ -> + ok + end. + +to_binary({Data}) -> + Pred = fun({Key, Value}) -> + {to_binary(Key), to_binary(Value)} + end, + {lists:map(Pred, Data)}; +to_binary(Data) when is_list(Data) -> + [to_binary(D) || D <- Data]; +to_binary(null) -> + null; +to_binary(true) -> + true; +to_binary(false) -> + false; +to_binary(Data) when is_atom(Data) -> + list_to_binary(atom_to_list(Data)); +to_binary(Data) -> + Data. diff --git a/src/couch_js/src/couch_js_os_process.erl b/src/couch_js/src/couch_js_os_process.erl new file mode 100644 index 000000000..a453d1ab2 --- /dev/null +++ b/src/couch_js/src/couch_js_os_process.erl @@ -0,0 +1,265 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_js_os_process). +-behaviour(gen_server). +-vsn(1). + +-export([start_link/1, start_link/2, start_link/3, stop/1]). +-export([set_timeout/2, prompt/2, killer/1]). +-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]). + +-include_lib("couch/include/couch_db.hrl"). + +-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]). + +-record(os_proc, + {command, + port, + writer, + reader, + timeout=5000, + idle + }). + +start_link(Command) -> + start_link(Command, []). +start_link(Command, Options) -> + start_link(Command, Options, ?PORT_OPTIONS). +start_link(Command, Options, PortOptions) -> + gen_server:start_link(?MODULE, [Command, Options, PortOptions], []). + +stop(Pid) -> + gen_server:cast(Pid, stop). + +% Read/Write API +set_timeout(Pid, TimeOut) when is_integer(TimeOut) -> + ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity). + +% Used by couch_event_os_process.erl +send(Pid, Data) -> + gen_server:cast(Pid, {send, Data}). + +prompt(Pid, Data) -> + case ioq:call(Pid, {prompt, Data}, erlang:get(io_priority)) of + {ok, Result} -> + Result; + Error -> + couch_log:error("OS Process Error ~p :: ~p",[Pid,Error]), + throw(Error) + end. + +% Utility functions for reading and writing +% in custom functions +writeline(OsProc, Data) when is_record(OsProc, os_proc) -> + Res = port_command(OsProc#os_proc.port, [Data, $\n]), + couch_js_io_logger:log_output(Data), + Res. + +readline(#os_proc{} = OsProc) -> + Res = readline(OsProc, []), + couch_js_io_logger:log_input(Res), + Res. +readline(#os_proc{port = Port} = OsProc, Acc) -> + receive + {Port, {data, {noeol, Data}}} when is_binary(Acc) -> + readline(OsProc, <<Acc/binary,Data/binary>>); + {Port, {data, {noeol, Data}}} when is_binary(Data) -> + readline(OsProc, Data); + {Port, {data, {noeol, Data}}} -> + readline(OsProc, [Data|Acc]); + {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) -> + [<<Acc/binary,Data/binary>>]; + {Port, {data, {eol, Data}}} when is_binary(Data) -> + [Data]; + {Port, {data, {eol, Data}}} -> + lists:reverse(Acc, Data); + {Port, Err} -> + catch port_close(Port), + throw({os_process_error, Err}) + after OsProc#os_proc.timeout -> + catch port_close(Port), + throw({os_process_error, "OS process timed out."}) + end. + +% Standard JSON functions +writejson(OsProc, Data) when is_record(OsProc, os_proc) -> + JsonData = ?JSON_ENCODE(Data), + couch_log:debug("OS Process ~p Input :: ~s", + [OsProc#os_proc.port, JsonData]), + true = writeline(OsProc, JsonData). + +readjson(OsProc) when is_record(OsProc, os_proc) -> + Line = iolist_to_binary(readline(OsProc)), + couch_log:debug("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), + try + % Don't actually parse the whole JSON. Just try to see if it's + % a command or a doc map/reduce/filter/show/list/update output. + % If it's a command then parse the whole JSON and execute the + % command, otherwise return the raw JSON line to the caller. + pick_command(Line) + catch + throw:abort -> + {json, Line}; + throw:{cmd, _Cmd} -> + case ?JSON_DECODE(Line) of + [<<"log">>, Msg] when is_binary(Msg) -> + % we got a message to log. Log it and continue + couch_log:info("OS Process ~p Log :: ~s", + [OsProc#os_proc.port, Msg]), + readjson(OsProc); + [<<"error">>, Id, Reason] -> + throw({error, {couch_util:to_existing_atom(Id),Reason}}); + [<<"fatal">>, Id, Reason] -> + couch_log:info("OS Process ~p Fatal Error :: ~s ~p", + [OsProc#os_proc.port, Id, Reason]), + throw({couch_util:to_existing_atom(Id),Reason}); + _Result -> + {json, Line} + end + end. + +pick_command(Line) -> + json_stream_parse:events(Line, fun pick_command0/1). + +pick_command0(array_start) -> + fun pick_command1/1; +pick_command0(_) -> + throw(abort). + +pick_command1(<<"log">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(<<"error">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(<<"fatal">> = Cmd) -> + throw({cmd, Cmd}); +pick_command1(_) -> + throw(abort). + + +% gen_server API +init([Command, Options, PortOptions]) -> + couch_js_io_logger:start(os:getenv("COUCHDB_IO_LOG_DIR")), + PrivDir = couch_util:priv_dir(), + Spawnkiller = "\"" ++ filename:join(PrivDir, "couchspawnkillable") ++ "\"", + V = config:get("query_server_config", "os_process_idle_limit", "300"), + IdleLimit = list_to_integer(V) * 1000, + BaseProc = #os_proc{ + command=Command, + port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions), + writer=fun ?MODULE:writejson/2, + reader=fun ?MODULE:readjson/1, + idle=IdleLimit + }, + KillCmd = iolist_to_binary(readline(BaseProc)), + Pid = self(), + couch_log:debug("OS Process Start :: ~p", [BaseProc#os_proc.port]), + spawn(fun() -> + % this ensure the real os process is killed when this process dies. + erlang:monitor(process, Pid), + killer(?b2l(KillCmd)) + end), + OsProc = + lists:foldl(fun(Opt, Proc) -> + case Opt of + {writer, Writer} when is_function(Writer) -> + Proc#os_proc{writer=Writer}; + {reader, Reader} when is_function(Reader) -> + Proc#os_proc{reader=Reader}; + {timeout, TimeOut} when is_integer(TimeOut) -> + Proc#os_proc{timeout=TimeOut} + end + end, BaseProc, Options), + {ok, OsProc, IdleLimit}. + +terminate(Reason, #os_proc{port=Port}) -> + catch port_close(Port), + case Reason of + normal -> + couch_js_io_logger:stop_noerror(); + Error -> + couch_js_io_logger:stop_error(Error) + end, + ok. + +handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) -> + {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle}; +handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) -> + #os_proc{writer=Writer, reader=Reader} = OsProc, + try + Writer(OsProc, Data), + {reply, {ok, Reader(OsProc)}, OsProc, Idle} + catch + throw:{error, OsError} -> + {reply, OsError, OsProc, Idle}; + throw:{fatal, OsError} -> + {stop, normal, OsError, OsProc}; + throw:OtherError -> + {stop, normal, OtherError, OsProc} + after + garbage_collect() + end. + +handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) -> + try + Writer(OsProc, Data), + {noreply, OsProc, Idle} + catch + throw:OsError -> + couch_log:error("Failed sending data: ~p -> ~p", [Data, OsError]), + {stop, normal, OsProc} + end; +handle_cast(garbage_collect, #os_proc{idle=Idle}=OsProc) -> + erlang:garbage_collect(), + {noreply, OsProc, Idle}; +handle_cast(stop, OsProc) -> + {stop, normal, OsProc}; +handle_cast(Msg, #os_proc{idle=Idle}=OsProc) -> + couch_log:debug("OS Proc: Unknown cast: ~p", [Msg]), + {noreply, OsProc, Idle}. + +handle_info(timeout, #os_proc{idle=Idle}=OsProc) -> + gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}), + erlang:garbage_collect(), + {noreply, OsProc, Idle}; +handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) -> + couch_log:info("OS Process terminated normally", []), + {stop, normal, OsProc}; +handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) -> + couch_log:error("OS Process died with status: ~p", [Status]), + {stop, {exit_status, Status}, OsProc}; +handle_info(Msg, #os_proc{idle=Idle}=OsProc) -> + couch_log:debug("OS Proc: Unknown info: ~p", [Msg]), + {noreply, OsProc, Idle}. + +code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) -> + V = config:get("query_server_config","os_process_idle_limit","300"), + State = #os_proc{ + command = Cmd, + port = Port, + writer = W, + reader = R, + timeout = Timeout, + idle = list_to_integer(V) * 1000 + }, + {ok, State}; +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +killer(KillCmd) -> + receive _ -> + os:cmd(KillCmd) + after 1000 -> + ?MODULE:killer(KillCmd) + end. + diff --git a/src/couch_js/src/couch_js_proc_manager.erl b/src/couch_js/src/couch_js_proc_manager.erl new file mode 100644 index 000000000..096469612 --- /dev/null +++ b/src/couch_js/src/couch_js_proc_manager.erl @@ -0,0 +1,602 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_js_proc_manager). +-behaviour(gen_server). +-behaviour(config_listener). +-vsn(1). + +-export([ + start_link/0, + get_proc_count/0, + get_stale_proc_count/0, + new_proc/1, + reload/0, + terminate_stale_procs/0 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + +-export([ + handle_config_change/5, + handle_config_terminate/3 +]). + +-include_lib("couch/include/couch_db.hrl"). + +-define(PROCS, couch_js_proc_manager_procs). +-define(WAITERS, couch_js_proc_manager_waiters). +-define(OPENING, couch_js_proc_manager_opening). +-define(SERVERS, couch_js_proc_manager_servers). +-define(RELISTEN_DELAY, 5000). + +-record(state, { + config, + counts, + threshold_ts, + hard_limit, + soft_limit +}). + +-type docid() :: iodata(). +-type revision() :: {integer(), binary()}. + +-record(client, { + timestamp :: os:timestamp() | '_', + from :: undefined | {pid(), reference()} | '_', + lang :: binary() | '_', + ddoc :: #doc{} | '_', + ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_' +}). + +-record(proc_int, { + pid, + lang, + client, + ddoc_keys = [], + prompt_fun, + set_timeout_fun, + stop_fun, + t0 = os:timestamp() +}). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + + +get_proc_count() -> + gen_server:call(?MODULE, get_proc_count). + + +get_stale_proc_count() -> + gen_server:call(?MODULE, get_stale_proc_count). + + +reload() -> + gen_server:call(?MODULE, set_threshold_ts). + + +terminate_stale_procs() -> + gen_server:call(?MODULE, terminate_stale_procs). + + +init([]) -> + process_flag(trap_exit, true), + ok = config:listen_for_changes(?MODULE, undefined), + + TableOpts = [public, named_table, ordered_set], + ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]), + ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]), + ets:new(?OPENING, [public, named_table, set]), + ets:new(?SERVERS, [public, named_table, set]), + ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")), + ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")), + ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]), + maybe_configure_erlang_native_servers(), + + {ok, #state{ + config = get_proc_config(), + counts = dict:new(), + threshold_ts = os:timestamp(), + hard_limit = get_hard_limit(), + soft_limit = get_soft_limit() + }}. + + +terminate(_Reason, _State) -> + ets:foldl(fun(#proc_int{pid=P}, _) -> + couch_util:shutdown_sync(P) + end, 0, ?PROCS), + ok. + + +handle_call(get_proc_count, _From, State) -> + NumProcs = ets:info(?PROCS, size), + NumOpening = ets:info(?OPENING, size), + {reply, NumProcs + NumOpening, State}; + +handle_call(get_stale_proc_count, _From, State) -> + #state{threshold_ts = T0} = State, + MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}], + {reply, ets:select_count(?PROCS, MatchSpec), State}; + +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) -> + LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>), + Lang = couch_util:to_binary(LangStr), + Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey}, + add_waiting_client(Client), + {noreply, flush_waiters(State, Lang)}; + +handle_call({get_proc, LangStr}, From, State) -> + Lang = couch_util:to_binary(LangStr), + Client = #client{from=From, lang=Lang}, + add_waiting_client(Client), + {noreply, flush_waiters(State, Lang)}; + +handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) -> + erlang:demonitor(Ref, [flush]), + NewState = case ets:lookup(?PROCS, Proc#proc.pid) of + [#proc_int{}=ProcInt] -> + return_proc(State, ProcInt); + [] -> + % Proc must've died and we already + % cleared it out of the table in + % the handle_info clause. + State + end, + {reply, true, NewState}; + +handle_call(set_threshold_ts, _From, State) -> + FoldFun = fun + (#proc_int{client = undefined} = Proc, StateAcc) -> + remove_proc(StateAcc, Proc); + (_, StateAcc) -> + StateAcc + end, + NewState = ets:foldl(FoldFun, State, ?PROCS), + {reply, ok, NewState#state{threshold_ts = os:timestamp()}}; + +handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) -> + FoldFun = fun + (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) -> + case Ts1 > Ts2 of + true -> + remove_proc(StateAcc, Proc); + false -> + StateAcc + end; + (_, StateAcc) -> + StateAcc + end, + NewState = ets:foldl(FoldFun, State, ?PROCS), + {reply, ok, NewState}; + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + + +handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) -> + NewState = case ets:lookup(?PROCS, Pid) of + [#proc_int{client=undefined, lang=Lang}=Proc] -> + case dict:find(Lang, Counts) of + {ok, Count} when Count >= State#state.soft_limit -> + couch_log:info("Closing idle OS Process: ~p", [Pid]), + remove_proc(State, Proc); + {ok, _} -> + State + end; + _ -> + State + end, + {noreply, NewState}; + +handle_cast(reload_config, State) -> + NewState = State#state{ + config = get_proc_config(), + hard_limit = get_hard_limit(), + soft_limit = get_soft_limit() + }, + maybe_configure_erlang_native_servers(), + {noreply, flush_waiters(NewState)}; + +handle_cast(_Msg, State) -> + {noreply, State}. + + +handle_info(shutdown, State) -> + {stop, shutdown, State}; + +handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) -> + ets:delete(?OPENING, Pid), + link(Proc0#proc_int.pid), + Proc = assign_proc(ClientPid, Proc0), + gen_server:reply(From, {ok, Proc, State#state.config}), + {noreply, State}; + +handle_info({'EXIT', Pid, spawn_error}, State) -> + [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid), + ets:delete(?OPENING, Pid), + NewState = State#state{ + counts = dict:update_counter(Lang, -1, State#state.counts) + }, + {noreply, flush_waiters(NewState, Lang)}; + +handle_info({'EXIT', Pid, Reason}, State) -> + couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]), + case ets:lookup(?PROCS, Pid) of + [#proc_int{} = Proc] -> + NewState = remove_proc(State, Proc), + {noreply, flush_waiters(NewState, Proc#proc_int.lang)}; + [] -> + {noreply, State} + end; + +handle_info({'DOWN', Ref, _, _, _Reason}, State0) -> + case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of + [#proc_int{} = Proc] -> + {noreply, return_proc(State0, Proc)}; + [] -> + {noreply, State0} + end; + + +handle_info(restart_config_listener, State) -> + ok = config:listen_for_changes(?MODULE, nil), + {noreply, State}; + +handle_info(_Msg, State) -> + {noreply, State}. + + +code_change(_OldVsn, #state{}=State, _Extra) -> + {ok, State}. + +handle_config_terminate(_, stop, _) -> + ok; +handle_config_terminate(_Server, _Reason, _State) -> + gen_server:cast(?MODULE, reload_config), + erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). + +handle_config_change("native_query_servers", _, _, _, _) -> + gen_server:cast(?MODULE, reload_config), + {ok, undefined}; +handle_config_change("query_server_config", _, _, _, _) -> + gen_server:cast(?MODULE, reload_config), + {ok, undefined}; +handle_config_change(_, _, _, _, _) -> + {ok, undefined}. + + +find_proc(#client{lang = Lang, ddoc_key = undefined}) -> + Pred = fun(_) -> + true + end, + find_proc(Lang, Pred); +find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) -> + Pred = fun(#proc_int{ddoc_keys = DDocKeys}) -> + lists:member(DDocKey, DDocKeys) + end, + case find_proc(Lang, Pred) of + not_found -> + case find_proc(Client#client{ddoc_key=undefined}) of + {ok, Proc} -> + teach_ddoc(DDoc, DDocKey, Proc); + Else -> + Else + end; + Else -> + Else + end. + +find_proc(Lang, Fun) -> + try iter_procs(Lang, Fun) + catch error:Reason -> + StackTrace = erlang:get_stacktrace(), + couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]), + {error, Reason} + end. + + +iter_procs(Lang, Fun) when is_binary(Lang) -> + Pattern = #proc_int{lang=Lang, client=undefined, _='_'}, + MSpec = [{Pattern, [], ['$_']}], + case ets:select_reverse(?PROCS, MSpec, 25) of + '$end_of_table' -> + not_found; + Continuation -> + iter_procs_int(Continuation, Fun) + end. + + +iter_procs_int({[], Continuation0}, Fun) -> + case ets:select_reverse(Continuation0) of + '$end_of_table' -> + not_found; + Continuation1 -> + iter_procs_int(Continuation1, Fun) + end; +iter_procs_int({[Proc | Rest], Continuation}, Fun) -> + case Fun(Proc) of + true -> + {ok, Proc}; + false -> + iter_procs_int({Rest, Continuation}, Fun) + end. + + +spawn_proc(State, Client) -> + Pid = spawn_link(?MODULE, new_proc, [Client]), + ets:insert(?OPENING, {Pid, Client}), + Counts = State#state.counts, + Lang = Client#client.lang, + State#state{ + counts = dict:update_counter(Lang, 1, Counts) + }. + + +new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) -> + #client{from=From, lang=Lang} = Client, + Resp = try + case new_proc_int(From, Lang) of + {ok, Proc} -> + {spawn_ok, Proc, From}; + Error -> + gen_server:reply(From, {error, Error}), + spawn_error + end + catch _:_ -> + spawn_error + end, + exit(Resp); + +new_proc(Client) -> + #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client, + Resp = try + case new_proc_int(From, Lang) of + {ok, NewProc} -> + {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc), + {spawn_ok, Proc, From}; + Error -> + gen_server:reply(From, {error, Error}), + spawn_error + end + catch _:_ -> + spawn_error + end, + exit(Resp). + +split_string_if_longer(String, Pos) -> + case length(String) > Pos of + true -> lists:split(Pos, String); + false -> false + end. + +split_by_char(String, Char) -> + %% 17.5 doesn't have string:split + %% the function doesn't handle errors + %% it is designed to be used only in specific context + Pos = string:chr(String, Char), + {Key, [_Eq | Value]} = lists:split(Pos - 1, String), + {Key, Value}. + +get_servers_from_env(Spec) -> + SpecLen = length(Spec), + % loop over os:getenv(), match SPEC_ + lists:filtermap(fun(EnvStr) -> + case split_string_if_longer(EnvStr, SpecLen) of + {Spec, Rest} -> + {true, split_by_char(Rest, $=)}; + _ -> + false + end + end, os:getenv()). + +get_query_server(LangStr) -> + case ets:lookup(?SERVERS, string:to_upper(LangStr)) of + [{_, Command}] -> Command; + _ -> undefined + end. + +native_query_server_enabled() -> + % 1. [native_query_server] enable_erlang_query_server = true | false + % 2. if [native_query_server] erlang == {couch_native_process, start_link, []} -> pretend true as well + NativeEnabled = config:get_boolean("native_query_servers", "enable_erlang_query_server", false), + NativeLegacyConfig = config:get("native_query_servers", "erlang", ""), + NativeLegacyEnabled = NativeLegacyConfig =:= "{couch_native_process, start_link, []}", + NativeEnabled orelse NativeLegacyEnabled. + +maybe_configure_erlang_native_servers() -> + case native_query_server_enabled() of + true -> + ets:insert(?SERVERS, [ + {"ERLANG", {couch_js_native_process, start_link, []}}]); + _Else -> + ok + end. + +new_proc_int(From, Lang) when is_binary(Lang) -> + LangStr = binary_to_list(Lang), + case get_query_server(LangStr) of + undefined -> + gen_server:reply(From, {unknown_query_language, Lang}); + {M, F, A} -> + {ok, Pid} = apply(M, F, A), + make_proc(Pid, Lang, M); + Command -> + {ok, Pid} = couch_js_os_process:start_link(Command), + make_proc(Pid, Lang, couch_js_os_process) + end. + + +teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) -> + % send ddoc over the wire + % we only share the rev with the client we know to update code + % but it only keeps the latest copy, per each ddoc, around. + true = couch_js_query_servers:proc_prompt( + export_proc(Proc), + [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]), + % we should remove any other ddocs keys for this docid + % because the query server overwrites without the rev + Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId], + % add ddoc to the proc + {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}. + + +make_proc(Pid, Lang, Mod) when is_binary(Lang) -> + Proc = #proc_int{ + lang = Lang, + pid = Pid, + prompt_fun = {Mod, prompt}, + set_timeout_fun = {Mod, set_timeout}, + stop_fun = {Mod, stop} + }, + unlink(Pid), + {ok, Proc}. + + +assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) -> + Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)}, + ets:insert(?PROCS, Proc), + export_proc(Proc); +assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) -> + {Pid, _} = Client#client.from, + assign_proc(Pid, Proc). + + +return_proc(#state{} = State, #proc_int{} = ProcInt) -> + #proc_int{pid = Pid, lang = Lang} = ProcInt, + NewState = case is_process_alive(Pid) of true -> + case ProcInt#proc_int.t0 < State#state.threshold_ts of + true -> + remove_proc(State, ProcInt); + false -> + gen_server:cast(Pid, garbage_collect), + true = ets:update_element(?PROCS, Pid, [ + {#proc_int.client, undefined} + ]), + State + end; + false -> + remove_proc(State, ProcInt) + end, + flush_waiters(NewState, Lang). + + +remove_proc(State, #proc_int{}=Proc) -> + ets:delete(?PROCS, Proc#proc_int.pid), + case is_process_alive(Proc#proc_int.pid) of true -> + unlink(Proc#proc_int.pid), + gen_server:cast(Proc#proc_int.pid, stop); + false -> + ok + end, + Counts = State#state.counts, + Lang = Proc#proc_int.lang, + State#state{ + counts = dict:update_counter(Lang, -1, Counts) + }. + + +-spec export_proc(#proc_int{}) -> #proc{}. +export_proc(#proc_int{} = ProcInt) -> + ProcIntList = tuple_to_list(ProcInt), + ProcLen = record_info(size, proc), + [_ | Data] = lists:sublist(ProcIntList, ProcLen), + list_to_tuple([proc | Data]). + + +flush_waiters(State) -> + dict:fold(fun(Lang, Count, StateAcc) -> + case Count < State#state.hard_limit of + true -> + flush_waiters(StateAcc, Lang); + false -> + StateAcc + end + end, State, State#state.counts). + + +flush_waiters(State, Lang) -> + CanSpawn = can_spawn(State, Lang), + case get_waiting_client(Lang) of + #client{from = From} = Client -> + case find_proc(Client) of + {ok, ProcInt} -> + Proc = assign_proc(Client, ProcInt), + gen_server:reply(From, {ok, Proc, State#state.config}), + remove_waiting_client(Client), + flush_waiters(State, Lang); + {error, Error} -> + gen_server:reply(From, {error, Error}), + remove_waiting_client(Client), + flush_waiters(State, Lang); + not_found when CanSpawn -> + NewState = spawn_proc(State, Client), + remove_waiting_client(Client), + flush_waiters(NewState, Lang); + not_found -> + State + end; + undefined -> + State + end. + + +add_waiting_client(Client) -> + ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}). + +-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}. +get_waiting_client(Lang) -> + case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of + '$end_of_table' -> + undefined; + {[#client{}=Client], _} -> + Client + end. + + +remove_waiting_client(#client{timestamp = Timestamp}) -> + ets:delete(?WAITERS, Timestamp). + + +can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) -> + case dict:find(Lang, Counts) of + {ok, Count} -> Count < HardLimit; + error -> true + end. + + +get_proc_config() -> + Limit = config:get("query_server_config", "reduce_limit", "true"), + Timeout = config:get("couchdb", "os_process_timeout", "5000"), + {[ + {<<"reduce_limit">>, list_to_atom(Limit)}, + {<<"timeout">>, list_to_integer(Timeout)} + ]}. + + +get_hard_limit() -> + LimStr = config:get("query_server_config", "os_process_limit", "100"), + list_to_integer(LimStr). + + +get_soft_limit() -> + LimStr = config:get("query_server_config", "os_process_soft_limit", "100"), + list_to_integer(LimStr). diff --git a/src/couch_js/src/couch_js_query_servers.erl b/src/couch_js/src/couch_js_query_servers.erl new file mode 100644 index 000000000..12dc864ea --- /dev/null +++ b/src/couch_js/src/couch_js_query_servers.erl @@ -0,0 +1,683 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_js_query_servers). + +-export([try_compile/4]). +-export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]). +-export([reduce/3, rereduce/3,validate_doc_update/5]). +-export([filter_docs/5]). +-export([filter_view/3]). +-export([finalize/2]). +-export([rewrite/3]). + +-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). + +% For 210-os-proc-pool.t +-export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]). + +-include_lib("couch/include/couch_db.hrl"). + +-define(SUMERROR, <<"The _sum function requires that map values be numbers, " + "arrays of numbers, or objects. Objects cannot be mixed with other " + "data structures. Objects can be arbitrarily nested, provided that the values " + "for all fields are themselves numbers, arrays of numbers, or objects.">>). + +-define(STATERROR, <<"The _stats function requires that map values be numbers " + "or arrays of numbers, not '~p'">>). + + +try_compile(Proc, FunctionType, FunctionName, FunctionSource) -> + try + proc_prompt(Proc, [<<"add_fun">>, FunctionSource]), + ok + catch + {compilation_error, E} -> + Fmt = "Compilation of the ~s function in the '~s' view failed: ~s", + Msg = io_lib:format(Fmt, [FunctionType, FunctionName, E]), + throw({compilation_error, Msg}); + {os_process_error, {exit_status, ExitStatus}} -> + Fmt = "Compilation of the ~s function in the '~s' view failed with exit status: ~p", + Msg = io_lib:format(Fmt, [FunctionType, FunctionName, ExitStatus]), + throw({compilation_error, Msg}) + end. + +start_doc_map(Lang, Functions, Lib) -> + Proc = get_os_process(Lang), + case Lib of + {[]} -> ok; + Lib -> + true = proc_prompt(Proc, [<<"add_lib">>, Lib]) + end, + lists:foreach(fun(FunctionSource) -> + true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource]) + end, Functions), + {ok, Proc}. + +map_doc_raw(Proc, Doc) -> + Json = couch_doc:to_json_obj(Doc, []), + {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}. + + +stop_doc_map(nil) -> + ok; +stop_doc_map(Proc) -> + ok = ret_os_process(Proc). + +group_reductions_results([]) -> + []; +group_reductions_results(List) -> + {Heads, Tails} = lists:foldl( + fun([H|T], {HAcc,TAcc}) -> + {[H|HAcc], [T|TAcc]} + end, {[], []}, List), + case Tails of + [[]|_] -> % no tails left + [Heads]; + _ -> + [Heads | group_reductions_results(Tails)] + end. + +finalize(<<"_approx_count_distinct",_/binary>>, Reduction) -> + true = hyper:is_hyper(Reduction), + {ok, round(hyper:card(Reduction))}; +finalize(<<"_stats",_/binary>>, Unpacked) -> + {ok, pack_stats(Unpacked)}; +finalize(_RedSrc, Reduction) -> + {ok, Reduction}. + +rereduce(_Lang, [], _ReducedValues) -> + {ok, []}; +rereduce(Lang, RedSrcs, ReducedValues) -> + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( + fun + (<<"_", _/binary>> = FunSrc, Values) -> + {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), + Result; + (FunSrc, Values) -> + os_rereduce(Lang, [FunSrc], Values) + end, RedSrcs, Grouped), + {ok, Results}. + +reduce(_Lang, [], _KVs) -> + {ok, []}; +reduce(Lang, RedSrcs, KVs) -> + {OsRedSrcs, BuiltinReds} = lists:partition(fun + (<<"_", _/binary>>) -> false; + (_OsFun) -> true + end, RedSrcs), + {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs), + {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []), + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []). + + +recombine_reduce_results([], [], [], Acc) -> + {ok, lists:reverse(Acc)}; +recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) -> + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]); +recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> + recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). + +os_reduce(_Lang, [], _KVs) -> + {ok, []}; +os_reduce(Lang, OsRedSrcs, KVs) -> + Proc = get_os_process(Lang), + OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of + [true, Reductions] -> Reductions + catch + throw:{reduce_overflow_error, Msg} -> + [{[{reduce_overflow_error, Msg}]} || _ <- OsRedSrcs] + after + ok = ret_os_process(Proc) + end, + {ok, OsResults}. + +os_rereduce(Lang, OsRedSrcs, KVs) -> + case get_overflow_error(KVs) of + undefined -> + Proc = get_os_process(Lang), + try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of + [true, [Reduction]] -> Reduction + catch + throw:{reduce_overflow_error, Msg} -> + {[{reduce_overflow_error, Msg}]} + after + ok = ret_os_process(Proc) + end; + Error -> + Error + end. + + +get_overflow_error([]) -> + undefined; +get_overflow_error([{[{reduce_overflow_error, _}]} = Error | _]) -> + Error; +get_overflow_error([_ | Rest]) -> + get_overflow_error(Rest). + + +builtin_reduce(_Re, [], _KVs, Acc) -> + {ok, lists:reverse(Acc)}; +builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) -> + Sum = builtin_sum_rows(KVs, 0), + Red = check_sum_overflow(?term_size(KVs), ?term_size(Sum), Sum), + builtin_reduce(Re, BuiltinReds, KVs, [Red|Acc]); +builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> + Count = length(KVs), + builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]); +builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) -> + Count = builtin_sum_rows(KVs, 0), + builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]); +builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) -> + Stats = builtin_stats(Re, KVs), + builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]); +builtin_reduce(Re, [<<"_approx_count_distinct",_/binary>>|BuiltinReds], KVs, Acc) -> + Distinct = approx_count_distinct(Re, KVs), + builtin_reduce(Re, BuiltinReds, KVs, [Distinct|Acc]). + + +builtin_sum_rows([], Acc) -> + Acc; +builtin_sum_rows([[_Key, Value] | RestKVs], Acc) -> + try sum_values(Value, Acc) of + NewAcc -> + builtin_sum_rows(RestKVs, NewAcc) + catch + throw:{builtin_reduce_error, Obj} -> + Obj; + throw:{invalid_value, Reason, Cause} -> + {[{<<"error">>, <<"builtin_reduce_error">>}, + {<<"reason">>, Reason}, {<<"caused_by">>, Cause}]} + end. + + +sum_values(Value, Acc) when is_number(Value), is_number(Acc) -> + Acc + Value; +sum_values(Value, Acc) when is_list(Value), is_list(Acc) -> + sum_arrays(Acc, Value); +sum_values(Value, Acc) when is_number(Value), is_list(Acc) -> + sum_arrays(Acc, [Value]); +sum_values(Value, Acc) when is_list(Value), is_number(Acc) -> + sum_arrays([Acc], Value); +sum_values({Props}, Acc) -> + case lists:keyfind(<<"error">>, 1, Props) of + {<<"error">>, <<"builtin_reduce_error">>} -> + throw({builtin_reduce_error, {Props}}); + false -> + ok + end, + case Acc of + 0 -> + {Props}; + {AccProps} -> + {sum_objects(lists:sort(Props), lists:sort(AccProps))} + end; +sum_values(Else, _Acc) -> + throw_sum_error(Else). + +sum_objects([{K1, V1} | Rest1], [{K1, V2} | Rest2]) -> + [{K1, sum_values(V1, V2)} | sum_objects(Rest1, Rest2)]; +sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 < K2 -> + [{K1, V1} | sum_objects(Rest1, [{K2, V2} | Rest2])]; +sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 > K2 -> + [{K2, V2} | sum_objects([{K1, V1} | Rest1], Rest2)]; +sum_objects([], Rest) -> + Rest; +sum_objects(Rest, []) -> + Rest. + +sum_arrays([], []) -> + []; +sum_arrays([_|_]=Xs, []) -> + Xs; +sum_arrays([], [_|_]=Ys) -> + Ys; +sum_arrays([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) -> + [X+Y | sum_arrays(Xs,Ys)]; +sum_arrays(Else, _) -> + throw_sum_error(Else). + +check_sum_overflow(InSize, OutSize, Sum) -> + Overflowed = OutSize > 4906 andalso OutSize * 2 > InSize, + case config:get("query_server_config", "reduce_limit", "true") of + "true" when Overflowed -> + Msg = log_sum_overflow(InSize, OutSize), + {[ + {<<"error">>, <<"builtin_reduce_error">>}, + {<<"reason">>, Msg} + ]}; + "log" when Overflowed -> + log_sum_overflow(InSize, OutSize), + Sum; + _ -> + Sum + end. + +log_sum_overflow(InSize, OutSize) -> + Fmt = "Reduce output must shrink more rapidly: " + "input size: ~b " + "output size: ~b", + Msg = iolist_to_binary(io_lib:format(Fmt, [InSize, OutSize])), + couch_log:error(Msg, []), + Msg. + +builtin_stats(_, []) -> + {0, 0, 0, 0, 0}; +builtin_stats(_, [[_,First]|Rest]) -> + lists:foldl(fun([_Key, Value], Acc) -> + stat_values(Value, Acc) + end, build_initial_accumulator(First), Rest). + +stat_values(Value, Acc) when is_list(Value), is_list(Acc) -> + lists:zipwith(fun stat_values/2, Value, Acc); +stat_values({PreRed}, Acc) when is_list(PreRed) -> + stat_values(unpack_stats({PreRed}), Acc); +stat_values(Value, Acc) when is_number(Value) -> + stat_values({Value, 1, Value, Value, Value*Value}, Acc); +stat_values(Value, Acc) when is_number(Acc) -> + stat_values(Value, {Acc, 1, Acc, Acc, Acc*Acc}); +stat_values(Value, Acc) when is_tuple(Value), is_tuple(Acc) -> + {Sum0, Cnt0, Min0, Max0, Sqr0} = Value, + {Sum1, Cnt1, Min1, Max1, Sqr1} = Acc, + { + Sum0 + Sum1, + Cnt0 + Cnt1, + erlang:min(Min0, Min1), + erlang:max(Max0, Max1), + Sqr0 + Sqr1 + }; +stat_values(Else, _Acc) -> + throw_stat_error(Else). + +build_initial_accumulator(L) when is_list(L) -> + [build_initial_accumulator(X) || X <- L]; +build_initial_accumulator(X) when is_number(X) -> + {X, 1, X, X, X*X}; +build_initial_accumulator({_, _, _, _, _} = AlreadyUnpacked) -> + AlreadyUnpacked; +build_initial_accumulator({Props}) -> + unpack_stats({Props}); +build_initial_accumulator(Else) -> + Msg = io_lib:format("non-numeric _stats input: ~w", [Else]), + throw({invalid_value, iolist_to_binary(Msg)}). + +unpack_stats({PreRed}) when is_list(PreRed) -> + { + get_number(<<"sum">>, PreRed), + get_number(<<"count">>, PreRed), + get_number(<<"min">>, PreRed), + get_number(<<"max">>, PreRed), + get_number(<<"sumsqr">>, PreRed) + }. + + +pack_stats({Sum, Cnt, Min, Max, Sqr}) -> + {[{<<"sum">>,Sum}, {<<"count">>,Cnt}, {<<"min">>,Min}, {<<"max">>,Max}, {<<"sumsqr">>,Sqr}]}; +pack_stats({Packed}) -> + % Legacy code path before we had the finalize operation + {Packed}; +pack_stats(Stats) when is_list(Stats) -> + lists:map(fun pack_stats/1, Stats). + +get_number(Key, Props) -> + case couch_util:get_value(Key, Props) of + X when is_number(X) -> + X; + undefined when is_binary(Key) -> + get_number(binary_to_atom(Key, latin1), Props); + undefined -> + Msg = io_lib:format("user _stats input missing required field ~s (~p)", + [Key, Props]), + throw({invalid_value, iolist_to_binary(Msg)}); + Else -> + Msg = io_lib:format("non-numeric _stats input received for ~s: ~w", + [Key, Else]), + throw({invalid_value, iolist_to_binary(Msg)}) + end. + +% TODO allow customization of precision in the ddoc. +approx_count_distinct(reduce, KVs) -> + lists:foldl(fun([[Key, _Id], _Value], Filter) -> + hyper:insert(term_to_binary(Key), Filter) + end, hyper:new(11), KVs); +approx_count_distinct(rereduce, Reds) -> + hyper:union([Filter || [_, Filter] <- Reds]). + +% use the function stored in ddoc.validate_doc_update to test an update. +-spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when + DDoc :: ddoc(), + EditDoc :: doc(), + DiskDoc :: doc() | nil, + Ctx :: user_ctx(), + SecObj :: sec_obj(). + +validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> + JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), + JsonDiskDoc = json_doc(DiskDoc), + Resp = ddoc_prompt( + DDoc, + [<<"validate_doc_update">>], + [JsonEditDoc, JsonDiskDoc, Ctx, SecObj] + ), + if Resp == 1 -> ok; true -> + couch_stats:increment_counter([couchdb, query_server, vdu_rejects], 1) + end, + case Resp of + RespCode when RespCode =:= 1; RespCode =:= ok; RespCode =:= true -> + ok; + {[{<<"forbidden">>, Message}]} -> + throw({forbidden, Message}); + {[{<<"unauthorized">>, Message}]} -> + throw({unauthorized, Message}); + {[{_, Message}]} -> + throw({unknown_error, Message}); + Message when is_binary(Message) -> + throw({unknown_error, Message}) + end. + + +rewrite(Req, Db, DDoc) -> + Fields = [F || F <- chttpd_external:json_req_obj_fields(), + F =/= <<"info">>, F =/= <<"form">>, + F =/= <<"uuid">>, F =/= <<"id">>], + JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields), + case ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of + {[{<<"forbidden">>, Message}]} -> + throw({forbidden, Message}); + {[{<<"unauthorized">>, Message}]} -> + throw({unauthorized, Message}); + [<<"no_dispatch_rule">>] -> + undefined; + [<<"ok">>, {V}=Rewrite] when is_list(V) -> + ok = validate_rewrite_response(Rewrite), + Rewrite; + [<<"ok">>, _] -> + throw_rewrite_error(<<"bad rewrite">>); + V -> + couch_log:error("bad rewrite return ~p", [V]), + throw({unknown_error, V}) + end. + +validate_rewrite_response({Fields}) when is_list(Fields) -> + validate_rewrite_response_fields(Fields). + +validate_rewrite_response_fields([{Key, Value} | Rest]) -> + validate_rewrite_response_field(Key, Value), + validate_rewrite_response_fields(Rest); +validate_rewrite_response_fields([]) -> + ok. + +validate_rewrite_response_field(<<"method">>, Method) when is_binary(Method) -> + ok; +validate_rewrite_response_field(<<"method">>, _) -> + throw_rewrite_error(<<"bad method">>); +validate_rewrite_response_field(<<"path">>, Path) when is_binary(Path) -> + ok; +validate_rewrite_response_field(<<"path">>, _) -> + throw_rewrite_error(<<"bad path">>); +validate_rewrite_response_field(<<"body">>, Body) when is_binary(Body) -> + ok; +validate_rewrite_response_field(<<"body">>, _) -> + throw_rewrite_error(<<"bad body">>); +validate_rewrite_response_field(<<"headers">>, {Props}=Headers) when is_list(Props) -> + validate_object_fields(Headers); +validate_rewrite_response_field(<<"headers">>, _) -> + throw_rewrite_error(<<"bad headers">>); +validate_rewrite_response_field(<<"query">>, {Props}=Query) when is_list(Props) -> + validate_object_fields(Query); +validate_rewrite_response_field(<<"query">>, _) -> + throw_rewrite_error(<<"bad query">>); +validate_rewrite_response_field(<<"code">>, Code) when is_integer(Code) andalso Code >= 200 andalso Code < 600 -> + ok; +validate_rewrite_response_field(<<"code">>, _) -> + throw_rewrite_error(<<"bad code">>); +validate_rewrite_response_field(K, V) -> + couch_log:debug("unknown rewrite field ~p=~p", [K, V]), + ok. + +validate_object_fields({Props}) when is_list(Props) -> + lists:foreach(fun + ({Key, Value}) when is_binary(Key) andalso is_binary(Value) -> + ok; + ({Key, Value}) -> + Reason = io_lib:format( + "object key/value must be strings ~p=~p", [Key, Value]), + throw_rewrite_error(Reason); + (Value) -> + throw_rewrite_error(io_lib:format("bad value ~p", [Value])) + end, Props). + + +throw_rewrite_error(Reason) when is_list(Reason)-> + throw_rewrite_error(iolist_to_binary(Reason)); +throw_rewrite_error(Reason) when is_binary(Reason) -> + throw({rewrite_error, Reason}). + + +json_doc_options() -> + json_doc_options([]). + +json_doc_options(Options) -> + Limit = config:get_integer("query_server_config", "revs_limit", 20), + [{revs, Limit} | Options]. + +json_doc(Doc) -> + json_doc(Doc, json_doc_options()). + +json_doc(nil, _) -> + null; +json_doc(Doc, Options) -> + couch_doc:to_json_obj(Doc, Options). + +filter_view(DDoc, VName, Docs) -> + Options = json_doc_options(), + JsonDocs = [json_doc(Doc, Options) || Doc <- Docs], + [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]), + {ok, Passes}. + +filter_docs(Req, Db, DDoc, FName, Docs) -> + JsonReq = case Req of + {json_req, JsonObj} -> + JsonObj; + #httpd{} = HttpReq -> + couch_httpd_external:json_req_obj(HttpReq, Db) + end, + Options = json_doc_options(), + JsonDocs = [json_doc(Doc, Options) || Doc <- Docs], + [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], + [JsonDocs, JsonReq]), + {ok, Passes}. + +ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]). + +ddoc_prompt(DDoc, FunPath, Args) -> + with_ddoc_proc(DDoc, fun({Proc, DDocId}) -> + proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]) + end). + +with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) -> + Rev = couch_doc:rev_to_str({Start, DiskRev}), + DDocKey = {DDocId, Rev}, + Proc = get_ddoc_process(DDoc, DDocKey), + try Fun({Proc, DDocId}) + after + ok = ret_os_process(Proc) + end. + +proc_prompt(Proc, Args) -> + case proc_prompt_raw(Proc, Args) of + {json, Json} -> + ?JSON_DECODE(Json); + EJson -> + EJson + end. + +proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) -> + apply(Mod, Func, [Proc#proc.pid, Args]). + +raw_to_ejson({json, Json}) -> + ?JSON_DECODE(Json); +raw_to_ejson(EJson) -> + EJson. + +proc_stop(Proc) -> + {Mod, Func} = Proc#proc.stop_fun, + apply(Mod, Func, [Proc#proc.pid]). + +proc_set_timeout(Proc, Timeout) -> + {Mod, Func} = Proc#proc.set_timeout_fun, + apply(Mod, Func, [Proc#proc.pid, Timeout]). + +get_os_process_timeout() -> + list_to_integer(config:get("couchdb", "os_process_timeout", "5000")). + +get_ddoc_process(#doc{} = DDoc, DDocKey) -> + % remove this case statement + case gen_server:call(couch_js_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of + {ok, Proc, {QueryConfig}} -> + % process knows the ddoc + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of + true -> + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), + Proc; + _ -> + catch proc_stop(Proc), + get_ddoc_process(DDoc, DDocKey) + end; + Error -> + throw(Error) + end. + +get_os_process(Lang) -> + case gen_server:call(couch_js_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of + {ok, Proc, {QueryConfig}} -> + case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of + true -> + proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)), + Proc; + _ -> + catch proc_stop(Proc), + get_os_process(Lang) + end; + Error -> + throw(Error) + end. + +ret_os_process(Proc) -> + true = gen_server:call(couch_js_proc_manager, {ret_proc, Proc}, infinity), + catch unlink(Proc#proc.pid), + ok. + +throw_sum_error(Else) -> + throw({invalid_value, ?SUMERROR, Else}). + +throw_stat_error(Else) -> + throw({invalid_value, iolist_to_binary(io_lib:format(?STATERROR, [Else]))}). + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +builtin_sum_rows_negative_test() -> + A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}], + E = {[{<<"error">>, <<"builtin_reduce_error">>}]}, + ?assertEqual(E, builtin_sum_rows([["K", E]], [])), + % The below case is where the value is invalid, but no error because + % it's only one document. + ?assertEqual(A, builtin_sum_rows([["K", A]], [])), + {Result} = builtin_sum_rows([["K", A]], [1, 2, 3]), + ?assertEqual({<<"error">>, <<"builtin_reduce_error">>}, + lists:keyfind(<<"error">>, 1, Result)). + +sum_values_test() -> + ?assertEqual(3, sum_values(1, 2)), + ?assertEqual([2,4,6], sum_values(1, [1,4,6])), + ?assertEqual([3,5,7], sum_values([3,2,4], [0,3,3])), + X = {[{<<"a">>,1}, {<<"b">>,[1,2]}, {<<"c">>, {[{<<"d">>,3}]}}, + {<<"g">>,1}]}, + Y = {[{<<"a">>,2}, {<<"b">>,3}, {<<"c">>, {[{<<"e">>, 5}]}}, + {<<"f">>,1}, {<<"g">>,1}]}, + Z = {[{<<"a">>,3}, {<<"b">>,[4,2]}, {<<"c">>, {[{<<"d">>,3},{<<"e">>,5}]}}, + {<<"f">>,1}, {<<"g">>,2}]}, + ?assertEqual(Z, sum_values(X, Y)), + ?assertEqual(Z, sum_values(Y, X)). + +sum_values_negative_test() -> + % invalid value + A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}], + B = ["error 1", "error 2"], + C = [<<"error 3">>, <<"error 4">>], + KV = {[{<<"error">>, <<"builtin_reduce_error">>}, + {<<"reason">>, ?SUMERROR}, {<<"caused_by">>, <<"some cause">>}]}, + ?assertThrow({invalid_value, _, _}, sum_values(A, [1, 2, 3])), + ?assertThrow({invalid_value, _, _}, sum_values(A, 0)), + ?assertThrow({invalid_value, _, _}, sum_values(B, [1, 2])), + ?assertThrow({invalid_value, _, _}, sum_values(C, [0])), + ?assertThrow({builtin_reduce_error, KV}, sum_values(KV, [0])). + +stat_values_test() -> + ?assertEqual({1, 2, 0, 1, 1}, stat_values(1, 0)), + ?assertEqual({11, 2, 1, 10, 101}, stat_values(1, 10)), + ?assertEqual([{9, 2, 2, 7, 53}, + {14, 2, 3, 11, 130}, + {18, 2, 5, 13, 194} + ], stat_values([2,3,5], [7,11,13])). + +reduce_stats_test() -> + ?assertEqual([ + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ], test_reduce(<<"_stats">>, [[[null, key], 2]])), + + ?assertEqual([[ + {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]}, + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ]], test_reduce(<<"_stats">>, [[[null, key],[1,2]]])), + + ?assertEqual( + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + , element(2, finalize(<<"_stats">>, {2, 1, 2, 2, 4}))), + + ?assertEqual([ + {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]}, + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ], element(2, finalize(<<"_stats">>, [ + {1, 1, 1, 1, 1}, + {2, 1, 2, 2, 4} + ]))), + + ?assertEqual([ + {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]}, + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ], element(2, finalize(<<"_stats">>, [ + {1, 1, 1, 1, 1}, + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ]))), + + ?assertEqual([ + {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]}, + {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]} + ], element(2, finalize(<<"_stats">>, [ + {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]}, + {2, 1, 2, 2, 4} + ]))), + ok. + +test_reduce(Reducer, KVs) -> + ?assertMatch({ok, _}, reduce(<<"javascript">>, [Reducer], KVs)), + {ok, Reduced} = reduce(<<"javascript">>, [Reducer], KVs), + {ok, Finalized} = finalize(Reducer, Reduced), + Finalized. + +-endif. diff --git a/src/couch_js/src/couch_js_sup.erl b/src/couch_js/src/couch_js_sup.erl new file mode 100644 index 000000000..e87546127 --- /dev/null +++ b/src/couch_js/src/couch_js_sup.erl @@ -0,0 +1,45 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + + +-module(couch_js_sup). +-behaviour(supervisor). + + +-export([ + start_link/0 +]). + +-export([ + init/1 +]). + + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + + +init([]) -> + Flags = #{ + strategy => one_for_one, + intensity => 50, + period => 3600 + }, + Children = [ + #{ + id => couch_js_proc_manager, + restart => permanent, + shutdown => brutal_kill, + start => {couch_js_proc_manager, start_link, []} + } + ], + {ok, {Flags, Children}}. |