summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-08-20 12:43:14 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-09-25 09:42:45 -0500
commit41b72111b130fa0c5e04eb9b670f5d575eacb62e (patch)
tree18005f6cf095dc46244bf5040919fb27e7d84a46
parent422416a1c620a7e403bf3bde8a43a4cb69c13086 (diff)
downloadcouchdb-41b72111b130fa0c5e04eb9b670f5d575eacb62e.tar.gz
Initial creation of couch_js application
This commit is mostly a copy paste of the existing modules in the `couch` application. For now I've left the build of the `couchjs` executable in `couch/priv` to avoid having to do the work of moving the build config over. I had contemplated just referencing the modules as they current exist but decided this would prepare us a bit better for when we eventually remove the old modules.
-rw-r--r--rebar.config.script1
-rw-r--r--rel/reltool.config2
-rw-r--r--src/couch_js/README.md6
-rw-r--r--src/couch_js/src/couch_js.app.src27
-rw-r--r--src/couch_js/src/couch_js_app.erl31
-rw-r--r--src/couch_js/src/couch_js_io_logger.erl107
-rw-r--r--src/couch_js/src/couch_js_native_process.erl452
-rw-r--r--src/couch_js/src/couch_js_os_process.erl265
-rw-r--r--src/couch_js/src/couch_js_proc_manager.erl602
-rw-r--r--src/couch_js/src/couch_js_query_servers.erl683
-rw-r--r--src/couch_js/src/couch_js_sup.erl45
11 files changed, 2221 insertions, 0 deletions
diff --git a/rebar.config.script b/rebar.config.script
index ba7b754ff..8ef1abcc8 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -79,6 +79,7 @@ SubDirs = [
"src/mem3",
"src/couch_index",
"src/couch_mrview",
+ "src/couch_js",
"src/couch_replicator",
"src/couch_plugins",
"src/couch_pse_tests",
diff --git a/rel/reltool.config b/rel/reltool.config
index e2ae71c43..caeea381e 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -41,6 +41,7 @@
couch_replicator,
couch_stats,
couch_eval,
+ couch_js,
couch_event,
couch_peruser,
couch_views,
@@ -95,6 +96,7 @@
{app, couch, [{incl_cond, include}]},
{app, couch_epi, [{incl_cond, include}]},
{app, couch_eval, [{incl_cond, include}]},
+ {app, couch_js, [{incl_cond, include}]},
{app, couch_jobs, [{incl_cond, include}]},
{app, couch_index, [{incl_cond, include}]},
{app, couch_log, [{incl_cond, include}]},
diff --git a/src/couch_js/README.md b/src/couch_js/README.md
new file mode 100644
index 000000000..4084b7d8e
--- /dev/null
+++ b/src/couch_js/README.md
@@ -0,0 +1,6 @@
+couch_js
+===
+
+This application is just an isolation of most of the code required for running couchjs.
+
+For the time being I'm not moving the implementation of couchjs due to the specifics of the build system configuration. Once we go to remove the `couch` application we'll have to revisit that approach. \ No newline at end of file
diff --git a/src/couch_js/src/couch_js.app.src b/src/couch_js/src/couch_js.app.src
new file mode 100644
index 000000000..0db37b68c
--- /dev/null
+++ b/src/couch_js/src/couch_js.app.src
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_js, [
+ {description, "An OTP application"},
+ {vsn, git},
+ {registered, [
+ couch_js_proc_manager
+ ]},
+ {mod, {couch_js_app, []}},
+ {applications, [
+ kernel,
+ stdlib,
+ config,
+ couch_log,
+ couch
+ ]}
+ ]}.
diff --git a/src/couch_js/src/couch_js_app.erl b/src/couch_js/src/couch_js_app.erl
new file mode 100644
index 000000000..b28f5852e
--- /dev/null
+++ b/src/couch_js/src/couch_js_app.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_app).
+
+
+-behaviour(application).
+
+
+-export([
+ start/2,
+ stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+ couch_js_sup:start_link().
+
+
+stop(_State) ->
+ ok. \ No newline at end of file
diff --git a/src/couch_js/src/couch_js_io_logger.erl b/src/couch_js/src/couch_js_io_logger.erl
new file mode 100644
index 000000000..5a1695c01
--- /dev/null
+++ b/src/couch_js/src/couch_js_io_logger.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_io_logger).
+
+-export([
+ start/1,
+ log_output/1,
+ log_input/1,
+ stop_noerror/0,
+ stop_error/1
+]).
+
+
+start(undefined) ->
+ ok;
+start(Dir) ->
+ case filelib:is_dir(Dir) of
+ true ->
+ Name = log_name(),
+ Path = Dir ++ "/" ++ Name,
+ OPath = Path ++ ".out.log_",
+ IPath = Path ++ ".in.log_",
+ {ok, OFd} = file:open(OPath, [read, write, raw]),
+ {ok, IFd} = file:open(IPath, [read, write, raw]),
+ ok = file:delete(OPath),
+ ok = file:delete(IPath),
+ put(logger_path, Path),
+ put(logger_out_fd, OFd),
+ put(logger_in_fd, IFd),
+ ok;
+ false ->
+ ok
+ end.
+
+
+stop_noerror() ->
+ case get(logger_path) of
+ undefined ->
+ ok;
+ _Path ->
+ close_logs()
+ end.
+
+
+stop_error(Err) ->
+ case get(logger_path) of
+ undefined ->
+ ok;
+ Path ->
+ save_error_logs(Path, Err),
+ close_logs()
+ end.
+
+
+log_output(Data) ->
+ log(get(logger_out_fd), Data).
+
+
+log_input(Data) ->
+ log(get(logger_in_fd), Data).
+
+
+unix_time() ->
+ {Mega, Sec, USec} = os:timestamp(),
+ UnixTs = (Mega * 1000000 + Sec) * 1000000 + USec,
+ integer_to_list(UnixTs).
+
+
+log_name() ->
+ Ts = unix_time(),
+ Pid0 = erlang:pid_to_list(self()),
+ Pid1 = string:strip(Pid0, left, $<),
+ Pid2 = string:strip(Pid1, right, $>),
+ lists:flatten(io_lib:format("~s_~s", [Ts, Pid2])).
+
+
+close_logs() ->
+ file:close(get(logger_out_fd)),
+ file:close(get(logger_in_fd)).
+
+
+save_error_logs(Path, Err) ->
+ Otp = erlang:system_info(otp_release),
+ Msg = io_lib:format("Error: ~p~nNode: ~p~nOTP: ~p~n", [Err, node(), Otp]),
+ file:write_file(Path ++ ".meta", Msg),
+ IFd = get(logger_out_fd),
+ OFd = get(logger_in_fd),
+ file:position(IFd, 0),
+ file:position(OFd, 0),
+ file:copy(IFd, Path ++ ".out.log"),
+ file:copy(OFd, Path ++ ".in.log").
+
+
+log(undefined, _Data) ->
+ ok;
+log(Fd, Data) ->
+ ok = file:write(Fd, [Data, io_lib:nl()]).
diff --git a/src/couch_js/src/couch_js_native_process.erl b/src/couch_js/src/couch_js_native_process.erl
new file mode 100644
index 000000000..d2c4c1ee0
--- /dev/null
+++ b/src/couch_js/src/couch_js_native_process.erl
@@ -0,0 +1,452 @@
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+%
+% You may obtain a copy of the License at
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing,
+% software distributed under the License is distributed on an
+% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+% either express or implied.
+%
+% See the License for the specific language governing permissions
+% and limitations under the License.
+%
+% This file drew much inspiration from erlview, which was written by and
+% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
+%
+%
+% This module provides the smallest possible native view-server.
+% With this module in-place, you can add the following to your couch INI files:
+% [native_query_servers]
+% erlang={couch_native_process, start_link, []}
+%
+% Which will then allow following example map function to be used:
+%
+% fun({Doc}) ->
+% % Below, we emit a single record - the _id as key, null as value
+% DocId = couch_util:get_value(<<"_id">>, Doc, null),
+% Emit(DocId, null)
+% end.
+%
+% which should be roughly the same as the javascript:
+% emit(doc._id, null);
+%
+% This module exposes enough functions such that a native erlang server can
+% act as a fully-fleged view server, but no 'helper' functions specifically
+% for simplifying your erlang view code. It is expected other third-party
+% extensions will evolve which offer useful layers on top of this view server
+% to help simplify your view code.
+-module(couch_js_native_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
+ handle_info/2]).
+-export([set_timeout/2, prompt/2]).
+
+-define(STATE, native_proc_state).
+-record(evstate, {
+ ddocs,
+ funs = [],
+ query_config = [],
+ list_pid = nil,
+ timeout = 5000,
+ idle = 5000
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+% this is a bit messy, see also couch_query_servers handle_info
+% stop(_Pid) ->
+% ok.
+
+set_timeout(Pid, TimeOut) ->
+ gen_server:call(Pid, {set_timeout, TimeOut}).
+
+prompt(Pid, Data) when is_list(Data) ->
+ gen_server:call(Pid, {prompt, Data}).
+
+% gen_server callbacks
+init([]) ->
+ V = config:get("query_server_config", "os_process_idle_limit", "300"),
+ Idle = list_to_integer(V) * 1000,
+ {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}.
+
+handle_call({set_timeout, TimeOut}, _From, State) ->
+ {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle};
+
+handle_call({prompt, Data}, _From, State) ->
+ couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
+ {NewState, Resp} = try run(State, to_binary(Data)) of
+ {S, R} -> {S, R}
+ catch
+ throw:{error, Why} ->
+ {State, [<<"error">>, Why, Why]}
+ end,
+
+ Idle = State#evstate.idle,
+ case Resp of
+ {error, Reason} ->
+ Msg = io_lib:format("couch native server error: ~p", [Reason]),
+ Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)],
+ {reply, Error, NewState, Idle};
+ [<<"error">> | Rest] ->
+ % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+ % TODO: markh? (jan)
+ {reply, [<<"error">> | Rest], NewState, Idle};
+ [<<"fatal">> | Rest] ->
+ % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+ % TODO: markh? (jan)
+ {stop, fatal, [<<"error">> | Rest], NewState};
+ Resp ->
+ {reply, Resp, NewState, Idle}
+ end.
+
+handle_cast(garbage_collect, State) ->
+ erlang:garbage_collect(),
+ {noreply, State, State#evstate.idle};
+handle_cast(stop, State) ->
+ {stop, normal, State};
+handle_cast(_Msg, State) ->
+ {noreply, State, State#evstate.idle}.
+
+handle_info(timeout, State) ->
+ gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+ erlang:garbage_collect(),
+ {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,normal}, State) ->
+ {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,Reason}, State) ->
+ {stop, Reason, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVersion, State, _Extra) -> {ok, State}.
+
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+ Pid ! {self(), list_row, Row},
+ receive
+ {Pid, chunks, Data} ->
+ {State, [<<"chunks">>, Data]};
+ {Pid, list_end, Data} ->
+ receive
+ {'EXIT', Pid, normal} -> ok
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup})
+ end,
+ process_flag(trap_exit, erlang:get(do_trap)),
+ {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+ after State#evstate.timeout ->
+ throw({timeout, list_row})
+ end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+ Pid ! {self(), list_end},
+ Resp =
+ receive
+ {Pid, list_end, Data} ->
+ receive
+ {'EXIT', Pid, normal} -> ok
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup})
+ end,
+ [<<"end">>, Data]
+ after State#evstate.timeout ->
+ throw({timeout, list_end})
+ end,
+ process_flag(trap_exit, erlang:get(do_trap)),
+ {State#evstate{list_pid=nil}, Resp};
+run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+ {State, [<<"error">>, list_error, list_error]};
+run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
+ {#evstate{ddocs=DDocs}, true};
+run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) ->
+ NewState = #evstate{
+ ddocs = DDocs,
+ query_config = QueryConfig,
+ idle = Idle
+ },
+ {NewState, true};
+run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+ FunInfo = makefun(State, BinFunc),
+ {State#evstate{funs=Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">> , Doc]) ->
+ Resp = lists:map(fun({Sig, Fun}) ->
+ erlang:put(Sig, []),
+ Fun(Doc),
+ lists:reverse(erlang:get(Sig))
+ end, State#evstate.funs),
+ {State, Resp};
+run(State, [<<"reduce">>, Funs, KVs]) ->
+ {Keys, Vals} =
+ lists:foldl(fun([K, V], {KAcc, VAcc}) ->
+ {[K | KAcc], [V | VAcc]}
+ end, {[], []}, KVs),
+ Keys2 = lists:reverse(Keys),
+ Vals2 = lists:reverse(Vals),
+ {State, catch reduce(State, Funs, Keys2, Vals2, false)};
+run(State, [<<"rereduce">>, Funs, Vals]) ->
+ {State, catch reduce(State, Funs, null, Vals, true)};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+ DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
+ {State#evstate{ddocs=DDocs2}, true};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+ DDoc = load_ddoc(DDocs, DDocId),
+ ddoc(State, DDoc, Rest);
+run(_, Unknown) ->
+ couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]),
+ throw({error, unknown_command}).
+
+ddoc(State, {DDoc}, [FunPath, Args]) ->
+ % load fun from the FunPath
+ BFun = lists:foldl(fun
+ (Key, {Props}) when is_list(Props) ->
+ couch_util:get_value(Key, Props, nil);
+ (_Key, Fun) when is_binary(Fun) ->
+ Fun;
+ (_Key, nil) ->
+ throw({error, not_found});
+ (_Key, _Fun) ->
+ throw({error, malformed_ddoc})
+ end, {DDoc}, FunPath),
+ ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
+
+ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
+ {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"rewrites">>], Args) ->
+ {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
+ FilterFunWrapper = fun(Doc) ->
+ case catch Fun(Doc, Req) of
+ true -> true;
+ false -> false;
+ {'EXIT', Error} -> couch_log:error("~p", [Error])
+ end
+ end,
+ Resp = lists:map(FilterFunWrapper, Docs),
+ {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) ->
+ MapFunWrapper = fun(Doc) ->
+ case catch Fun(Doc) of
+ undefined -> true;
+ ok -> false;
+ false -> false;
+ [_|_] -> true;
+ {'EXIT', Error} -> couch_log:error("~p", [Error])
+ end
+ end,
+ Resp = lists:map(MapFunWrapper, Docs),
+ {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
+ Resp = case (catch apply(Fun, Args)) of
+ FunResp when is_list(FunResp) ->
+ FunResp;
+ {FunResp} ->
+ [<<"resp">>, {FunResp}];
+ FunResp ->
+ FunResp
+ end,
+ {State, Resp};
+ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
+ Resp = case (catch apply(Fun, Args)) of
+ [JsonDoc, JsonResp] ->
+ [<<"up">>, JsonDoc, JsonResp]
+ end,
+ {State, Resp};
+ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
+ Self = self(),
+ SpawnFun = fun() ->
+ LastChunk = (catch apply(Fun, Args)),
+ case start_list_resp(Self, Sig) of
+ started ->
+ receive
+ {Self, list_row, _Row} -> ignore;
+ {Self, list_end} -> ignore
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup_pid})
+ end;
+ _ ->
+ ok
+ end,
+ LastChunks =
+ case erlang:get(Sig) of
+ undefined -> [LastChunk];
+ OtherChunks -> [LastChunk | OtherChunks]
+ end,
+ Self ! {self(), list_end, lists:reverse(LastChunks)}
+ end,
+ erlang:put(do_trap, process_flag(trap_exit, true)),
+ Pid = spawn_link(SpawnFun),
+ Resp =
+ receive
+ {Pid, start, Chunks, JsonResp} ->
+ [<<"start">>, Chunks, JsonResp]
+ after State#evstate.timeout ->
+ throw({timeout, list_start})
+ end,
+ {State#evstate{list_pid=Pid}, Resp}.
+
+store_ddoc(DDocs, DDocId, DDoc) ->
+ dict:store(DDocId, DDoc, DDocs).
+load_ddoc(DDocs, DDocId) ->
+ try dict:fetch(DDocId, DDocs) of
+ {DDoc} -> {DDoc}
+ catch
+ _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+ end.
+
+bindings(State, Sig) ->
+ bindings(State, Sig, nil).
+bindings(State, Sig, DDoc) ->
+ Self = self(),
+
+ Log = fun(Msg) ->
+ couch_log:info(Msg, [])
+ end,
+
+ Emit = fun(Id, Value) ->
+ Curr = erlang:get(Sig),
+ erlang:put(Sig, [[Id, Value] | Curr])
+ end,
+
+ Start = fun(Headers) ->
+ erlang:put(list_headers, Headers)
+ end,
+
+ Send = fun(Chunk) ->
+ Curr =
+ case erlang:get(Sig) of
+ undefined -> [];
+ Else -> Else
+ end,
+ erlang:put(Sig, [Chunk | Curr])
+ end,
+
+ GetRow = fun() ->
+ case start_list_resp(Self, Sig) of
+ started ->
+ ok;
+ _ ->
+ Chunks =
+ case erlang:get(Sig) of
+ undefined -> [];
+ CurrChunks -> CurrChunks
+ end,
+ Self ! {self(), chunks, lists:reverse(Chunks)}
+ end,
+ erlang:put(Sig, []),
+ receive
+ {Self, list_row, Row} -> Row;
+ {Self, list_end} -> nil
+ after State#evstate.timeout ->
+ throw({timeout, list_pid_getrow})
+ end
+ end,
+
+ FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
+
+ Bindings = [
+ {'Log', Log},
+ {'Emit', Emit},
+ {'Start', Start},
+ {'Send', Send},
+ {'GetRow', GetRow},
+ {'FoldRows', FoldRows}
+ ],
+ case DDoc of
+ {_Props} ->
+ Bindings ++ [{'DDoc', DDoc}];
+ _Else -> Bindings
+ end.
+
+% thanks to erlview, via:
+% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
+makefun(State, Source) ->
+ Sig = couch_hash:md5_hash(Source),
+ BindFuns = bindings(State, Sig),
+ {Sig, makefun(State, Source, BindFuns)}.
+makefun(State, Source, {DDoc}) ->
+ Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])),
+ BindFuns = bindings(State, Sig, {DDoc}),
+ {Sig, makefun(State, Source, BindFuns)};
+makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
+ FunStr = binary_to_list(Source),
+ {ok, Tokens, _} = erl_scan:string(FunStr),
+ Form = case (catch erl_parse:parse_exprs(Tokens)) of
+ {ok, [ParsedForm]} ->
+ ParsedForm;
+ {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
+ couch_log:error("Syntax error on line: ~p~n~s~p~n",
+ [LineNum, Mesg, Params]),
+ throw(Error)
+ end,
+ Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
+ erl_eval:add_binding(Name, Fun, Acc)
+ end, erl_eval:new_bindings(), BindFuns),
+ {value, Fun, _} = erl_eval:expr(Form, Bindings),
+ Fun.
+
+reduce(State, BinFuns, Keys, Vals, ReReduce) ->
+ Funs = case is_list(BinFuns) of
+ true ->
+ lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+ _ ->
+ [makefun(State, BinFuns)]
+ end,
+ Reds = lists:map(fun({_Sig, Fun}) ->
+ Fun(Keys, Vals, ReReduce)
+ end, Funs),
+ [true, Reds].
+
+foldrows(GetRow, ProcRow, Acc) ->
+ case GetRow() of
+ nil ->
+ {ok, Acc};
+ Row ->
+ case (catch ProcRow(Row, Acc)) of
+ {ok, Acc2} ->
+ foldrows(GetRow, ProcRow, Acc2);
+ {stop, Acc2} ->
+ {ok, Acc2}
+ end
+ end.
+
+start_list_resp(Self, Sig) ->
+ case erlang:get(list_started) of
+ undefined ->
+ Headers =
+ case erlang:get(list_headers) of
+ undefined -> {[{<<"headers">>, {[]}}]};
+ CurrHdrs -> CurrHdrs
+ end,
+ Chunks =
+ case erlang:get(Sig) of
+ undefined -> [];
+ CurrChunks -> CurrChunks
+ end,
+ Self ! {self(), start, lists:reverse(Chunks), Headers},
+ erlang:put(list_started, true),
+ erlang:put(Sig, []),
+ started;
+ _ ->
+ ok
+ end.
+
+to_binary({Data}) ->
+ Pred = fun({Key, Value}) ->
+ {to_binary(Key), to_binary(Value)}
+ end,
+ {lists:map(Pred, Data)};
+to_binary(Data) when is_list(Data) ->
+ [to_binary(D) || D <- Data];
+to_binary(null) ->
+ null;
+to_binary(true) ->
+ true;
+to_binary(false) ->
+ false;
+to_binary(Data) when is_atom(Data) ->
+ list_to_binary(atom_to_list(Data));
+to_binary(Data) ->
+ Data.
diff --git a/src/couch_js/src/couch_js_os_process.erl b/src/couch_js/src/couch_js_os_process.erl
new file mode 100644
index 000000000..a453d1ab2
--- /dev/null
+++ b/src/couch_js/src/couch_js_os_process.erl
@@ -0,0 +1,265 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_os_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, prompt/2, killer/1]).
+-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).
+
+-record(os_proc,
+ {command,
+ port,
+ writer,
+ reader,
+ timeout=5000,
+ idle
+ }).
+
+start_link(Command) ->
+ start_link(Command, []).
+start_link(Command, Options) ->
+ start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+ gen_server:start_link(?MODULE, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+ ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).
+
+% Used by couch_event_os_process.erl
+send(Pid, Data) ->
+ gen_server:cast(Pid, {send, Data}).
+
+prompt(Pid, Data) ->
+ case ioq:call(Pid, {prompt, Data}, erlang:get(io_priority)) of
+ {ok, Result} ->
+ Result;
+ Error ->
+ couch_log:error("OS Process Error ~p :: ~p",[Pid,Error]),
+ throw(Error)
+ end.
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+ Res = port_command(OsProc#os_proc.port, [Data, $\n]),
+ couch_js_io_logger:log_output(Data),
+ Res.
+
+readline(#os_proc{} = OsProc) ->
+ Res = readline(OsProc, []),
+ couch_js_io_logger:log_input(Res),
+ Res.
+readline(#os_proc{port = Port} = OsProc, Acc) ->
+ receive
+ {Port, {data, {noeol, Data}}} when is_binary(Acc) ->
+ readline(OsProc, <<Acc/binary,Data/binary>>);
+ {Port, {data, {noeol, Data}}} when is_binary(Data) ->
+ readline(OsProc, Data);
+ {Port, {data, {noeol, Data}}} ->
+ readline(OsProc, [Data|Acc]);
+ {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) ->
+ [<<Acc/binary,Data/binary>>];
+ {Port, {data, {eol, Data}}} when is_binary(Data) ->
+ [Data];
+ {Port, {data, {eol, Data}}} ->
+ lists:reverse(Acc, Data);
+ {Port, Err} ->
+ catch port_close(Port),
+ throw({os_process_error, Err})
+ after OsProc#os_proc.timeout ->
+ catch port_close(Port),
+ throw({os_process_error, "OS process timed out."})
+ end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+ JsonData = ?JSON_ENCODE(Data),
+ couch_log:debug("OS Process ~p Input :: ~s",
+ [OsProc#os_proc.port, JsonData]),
+ true = writeline(OsProc, JsonData).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+ Line = iolist_to_binary(readline(OsProc)),
+ couch_log:debug("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
+ try
+ % Don't actually parse the whole JSON. Just try to see if it's
+ % a command or a doc map/reduce/filter/show/list/update output.
+ % If it's a command then parse the whole JSON and execute the
+ % command, otherwise return the raw JSON line to the caller.
+ pick_command(Line)
+ catch
+ throw:abort ->
+ {json, Line};
+ throw:{cmd, _Cmd} ->
+ case ?JSON_DECODE(Line) of
+ [<<"log">>, Msg] when is_binary(Msg) ->
+ % we got a message to log. Log it and continue
+ couch_log:info("OS Process ~p Log :: ~s",
+ [OsProc#os_proc.port, Msg]),
+ readjson(OsProc);
+ [<<"error">>, Id, Reason] ->
+ throw({error, {couch_util:to_existing_atom(Id),Reason}});
+ [<<"fatal">>, Id, Reason] ->
+ couch_log:info("OS Process ~p Fatal Error :: ~s ~p",
+ [OsProc#os_proc.port, Id, Reason]),
+ throw({couch_util:to_existing_atom(Id),Reason});
+ _Result ->
+ {json, Line}
+ end
+ end.
+
+pick_command(Line) ->
+ json_stream_parse:events(Line, fun pick_command0/1).
+
+pick_command0(array_start) ->
+ fun pick_command1/1;
+pick_command0(_) ->
+ throw(abort).
+
+pick_command1(<<"log">> = Cmd) ->
+ throw({cmd, Cmd});
+pick_command1(<<"error">> = Cmd) ->
+ throw({cmd, Cmd});
+pick_command1(<<"fatal">> = Cmd) ->
+ throw({cmd, Cmd});
+pick_command1(_) ->
+ throw(abort).
+
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+ couch_js_io_logger:start(os:getenv("COUCHDB_IO_LOG_DIR")),
+ PrivDir = couch_util:priv_dir(),
+ Spawnkiller = "\"" ++ filename:join(PrivDir, "couchspawnkillable") ++ "\"",
+ V = config:get("query_server_config", "os_process_idle_limit", "300"),
+ IdleLimit = list_to_integer(V) * 1000,
+ BaseProc = #os_proc{
+ command=Command,
+ port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
+ writer=fun ?MODULE:writejson/2,
+ reader=fun ?MODULE:readjson/1,
+ idle=IdleLimit
+ },
+ KillCmd = iolist_to_binary(readline(BaseProc)),
+ Pid = self(),
+ couch_log:debug("OS Process Start :: ~p", [BaseProc#os_proc.port]),
+ spawn(fun() ->
+ % this ensure the real os process is killed when this process dies.
+ erlang:monitor(process, Pid),
+ killer(?b2l(KillCmd))
+ end),
+ OsProc =
+ lists:foldl(fun(Opt, Proc) ->
+ case Opt of
+ {writer, Writer} when is_function(Writer) ->
+ Proc#os_proc{writer=Writer};
+ {reader, Reader} when is_function(Reader) ->
+ Proc#os_proc{reader=Reader};
+ {timeout, TimeOut} when is_integer(TimeOut) ->
+ Proc#os_proc{timeout=TimeOut}
+ end
+ end, BaseProc, Options),
+ {ok, OsProc, IdleLimit}.
+
+terminate(Reason, #os_proc{port=Port}) ->
+ catch port_close(Port),
+ case Reason of
+ normal ->
+ couch_js_io_logger:stop_noerror();
+ Error ->
+ couch_js_io_logger:stop_error(Error)
+ end,
+ ok.
+
+handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) ->
+ {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle};
+handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) ->
+ #os_proc{writer=Writer, reader=Reader} = OsProc,
+ try
+ Writer(OsProc, Data),
+ {reply, {ok, Reader(OsProc)}, OsProc, Idle}
+ catch
+ throw:{error, OsError} ->
+ {reply, OsError, OsProc, Idle};
+ throw:{fatal, OsError} ->
+ {stop, normal, OsError, OsProc};
+ throw:OtherError ->
+ {stop, normal, OtherError, OsProc}
+ after
+ garbage_collect()
+ end.
+
+handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) ->
+ try
+ Writer(OsProc, Data),
+ {noreply, OsProc, Idle}
+ catch
+ throw:OsError ->
+ couch_log:error("Failed sending data: ~p -> ~p", [Data, OsError]),
+ {stop, normal, OsProc}
+ end;
+handle_cast(garbage_collect, #os_proc{idle=Idle}=OsProc) ->
+ erlang:garbage_collect(),
+ {noreply, OsProc, Idle};
+handle_cast(stop, OsProc) ->
+ {stop, normal, OsProc};
+handle_cast(Msg, #os_proc{idle=Idle}=OsProc) ->
+ couch_log:debug("OS Proc: Unknown cast: ~p", [Msg]),
+ {noreply, OsProc, Idle}.
+
+handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
+ gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+ erlang:garbage_collect(),
+ {noreply, OsProc, Idle};
+handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
+ couch_log:info("OS Process terminated normally", []),
+ {stop, normal, OsProc};
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+ couch_log:error("OS Process died with status: ~p", [Status]),
+ {stop, {exit_status, Status}, OsProc};
+handle_info(Msg, #os_proc{idle=Idle}=OsProc) ->
+ couch_log:debug("OS Proc: Unknown info: ~p", [Msg]),
+ {noreply, OsProc, Idle}.
+
+code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) ->
+ V = config:get("query_server_config","os_process_idle_limit","300"),
+ State = #os_proc{
+ command = Cmd,
+ port = Port,
+ writer = W,
+ reader = R,
+ timeout = Timeout,
+ idle = list_to_integer(V) * 1000
+ },
+ {ok, State};
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+killer(KillCmd) ->
+ receive _ ->
+ os:cmd(KillCmd)
+ after 1000 ->
+ ?MODULE:killer(KillCmd)
+ end.
+
diff --git a/src/couch_js/src/couch_js_proc_manager.erl b/src/couch_js/src/couch_js_proc_manager.erl
new file mode 100644
index 000000000..096469612
--- /dev/null
+++ b/src/couch_js/src/couch_js_proc_manager.erl
@@ -0,0 +1,602 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_proc_manager).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-vsn(1).
+
+-export([
+ start_link/0,
+ get_proc_count/0,
+ get_stale_proc_count/0,
+ new_proc/1,
+ reload/0,
+ terminate_stale_procs/0
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export([
+ handle_config_change/5,
+ handle_config_terminate/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PROCS, couch_js_proc_manager_procs).
+-define(WAITERS, couch_js_proc_manager_waiters).
+-define(OPENING, couch_js_proc_manager_opening).
+-define(SERVERS, couch_js_proc_manager_servers).
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+ config,
+ counts,
+ threshold_ts,
+ hard_limit,
+ soft_limit
+}).
+
+-type docid() :: iodata().
+-type revision() :: {integer(), binary()}.
+
+-record(client, {
+ timestamp :: os:timestamp() | '_',
+ from :: undefined | {pid(), reference()} | '_',
+ lang :: binary() | '_',
+ ddoc :: #doc{} | '_',
+ ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
+}).
+
+-record(proc_int, {
+ pid,
+ lang,
+ client,
+ ddoc_keys = [],
+ prompt_fun,
+ set_timeout_fun,
+ stop_fun,
+ t0 = os:timestamp()
+}).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_proc_count() ->
+ gen_server:call(?MODULE, get_proc_count).
+
+
+get_stale_proc_count() ->
+ gen_server:call(?MODULE, get_stale_proc_count).
+
+
+reload() ->
+ gen_server:call(?MODULE, set_threshold_ts).
+
+
+terminate_stale_procs() ->
+ gen_server:call(?MODULE, terminate_stale_procs).
+
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ok = config:listen_for_changes(?MODULE, undefined),
+
+ TableOpts = [public, named_table, ordered_set],
+ ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
+ ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
+ ets:new(?OPENING, [public, named_table, set]),
+ ets:new(?SERVERS, [public, named_table, set]),
+ ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")),
+ ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")),
+ ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]),
+ maybe_configure_erlang_native_servers(),
+
+ {ok, #state{
+ config = get_proc_config(),
+ counts = dict:new(),
+ threshold_ts = os:timestamp(),
+ hard_limit = get_hard_limit(),
+ soft_limit = get_soft_limit()
+ }}.
+
+
+terminate(_Reason, _State) ->
+ ets:foldl(fun(#proc_int{pid=P}, _) ->
+ couch_util:shutdown_sync(P)
+ end, 0, ?PROCS),
+ ok.
+
+
+handle_call(get_proc_count, _From, State) ->
+ NumProcs = ets:info(?PROCS, size),
+ NumOpening = ets:info(?OPENING, size),
+ {reply, NumProcs + NumOpening, State};
+
+handle_call(get_stale_proc_count, _From, State) ->
+ #state{threshold_ts = T0} = State,
+ MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
+ {reply, ets:select_count(?PROCS, MatchSpec), State};
+
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
+ LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+ Lang = couch_util:to_binary(LangStr),
+ Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+ add_waiting_client(Client),
+ {noreply, flush_waiters(State, Lang)};
+
+handle_call({get_proc, LangStr}, From, State) ->
+ Lang = couch_util:to_binary(LangStr),
+ Client = #client{from=From, lang=Lang},
+ add_waiting_client(Client),
+ {noreply, flush_waiters(State, Lang)};
+
+handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+ erlang:demonitor(Ref, [flush]),
+ NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
+ [#proc_int{}=ProcInt] ->
+ return_proc(State, ProcInt);
+ [] ->
+ % Proc must've died and we already
+ % cleared it out of the table in
+ % the handle_info clause.
+ State
+ end,
+ {reply, true, NewState};
+
+handle_call(set_threshold_ts, _From, State) ->
+ FoldFun = fun
+ (#proc_int{client = undefined} = Proc, StateAcc) ->
+ remove_proc(StateAcc, Proc);
+ (_, StateAcc) ->
+ StateAcc
+ end,
+ NewState = ets:foldl(FoldFun, State, ?PROCS),
+ {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
+
+handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
+ FoldFun = fun
+ (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+ case Ts1 > Ts2 of
+ true ->
+ remove_proc(StateAcc, Proc);
+ false ->
+ StateAcc
+ end;
+ (_, StateAcc) ->
+ StateAcc
+ end,
+ NewState = ets:foldl(FoldFun, State, ?PROCS),
+ {reply, ok, NewState};
+
+handle_call(_Call, _From, State) ->
+ {reply, ignored, State}.
+
+
+handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
+ NewState = case ets:lookup(?PROCS, Pid) of
+ [#proc_int{client=undefined, lang=Lang}=Proc] ->
+ case dict:find(Lang, Counts) of
+ {ok, Count} when Count >= State#state.soft_limit ->
+ couch_log:info("Closing idle OS Process: ~p", [Pid]),
+ remove_proc(State, Proc);
+ {ok, _} ->
+ State
+ end;
+ _ ->
+ State
+ end,
+ {noreply, NewState};
+
+handle_cast(reload_config, State) ->
+ NewState = State#state{
+ config = get_proc_config(),
+ hard_limit = get_hard_limit(),
+ soft_limit = get_soft_limit()
+ },
+ maybe_configure_erlang_native_servers(),
+ {noreply, flush_waiters(NewState)};
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+
+handle_info(shutdown, State) ->
+ {stop, shutdown, State};
+
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
+ ets:delete(?OPENING, Pid),
+ link(Proc0#proc_int.pid),
+ Proc = assign_proc(ClientPid, Proc0),
+ gen_server:reply(From, {ok, Proc, State#state.config}),
+ {noreply, State};
+
+handle_info({'EXIT', Pid, spawn_error}, State) ->
+ [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
+ ets:delete(?OPENING, Pid),
+ NewState = State#state{
+ counts = dict:update_counter(Lang, -1, State#state.counts)
+ },
+ {noreply, flush_waiters(NewState, Lang)};
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
+ case ets:lookup(?PROCS, Pid) of
+ [#proc_int{} = Proc] ->
+ NewState = remove_proc(State, Proc),
+ {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
+ [] ->
+ {noreply, State}
+ end;
+
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+ case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
+ [#proc_int{} = Proc] ->
+ {noreply, return_proc(State0, Proc)};
+ [] ->
+ {noreply, State0}
+ end;
+
+
+handle_info(restart_config_listener, State) ->
+ ok = config:listen_for_changes(?MODULE, nil),
+ {noreply, State};
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+ {ok, State}.
+
+handle_config_terminate(_, stop, _) ->
+ ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+ gen_server:cast(?MODULE, reload_config),
+ erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+
+handle_config_change("native_query_servers", _, _, _, _) ->
+ gen_server:cast(?MODULE, reload_config),
+ {ok, undefined};
+handle_config_change("query_server_config", _, _, _, _) ->
+ gen_server:cast(?MODULE, reload_config),
+ {ok, undefined};
+handle_config_change(_, _, _, _, _) ->
+ {ok, undefined}.
+
+
+find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
+ Pred = fun(_) ->
+ true
+ end,
+ find_proc(Lang, Pred);
+find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
+ Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
+ lists:member(DDocKey, DDocKeys)
+ end,
+ case find_proc(Lang, Pred) of
+ not_found ->
+ case find_proc(Client#client{ddoc_key=undefined}) of
+ {ok, Proc} ->
+ teach_ddoc(DDoc, DDocKey, Proc);
+ Else ->
+ Else
+ end;
+ Else ->
+ Else
+ end.
+
+find_proc(Lang, Fun) ->
+ try iter_procs(Lang, Fun)
+ catch error:Reason ->
+ StackTrace = erlang:get_stacktrace(),
+ couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
+ {error, Reason}
+ end.
+
+
+iter_procs(Lang, Fun) when is_binary(Lang) ->
+ Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
+ MSpec = [{Pattern, [], ['$_']}],
+ case ets:select_reverse(?PROCS, MSpec, 25) of
+ '$end_of_table' ->
+ not_found;
+ Continuation ->
+ iter_procs_int(Continuation, Fun)
+ end.
+
+
+iter_procs_int({[], Continuation0}, Fun) ->
+ case ets:select_reverse(Continuation0) of
+ '$end_of_table' ->
+ not_found;
+ Continuation1 ->
+ iter_procs_int(Continuation1, Fun)
+ end;
+iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
+ case Fun(Proc) of
+ true ->
+ {ok, Proc};
+ false ->
+ iter_procs_int({Rest, Continuation}, Fun)
+ end.
+
+
+spawn_proc(State, Client) ->
+ Pid = spawn_link(?MODULE, new_proc, [Client]),
+ ets:insert(?OPENING, {Pid, Client}),
+ Counts = State#state.counts,
+ Lang = Client#client.lang,
+ State#state{
+ counts = dict:update_counter(Lang, 1, Counts)
+ }.
+
+
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+ #client{from=From, lang=Lang} = Client,
+ Resp = try
+ case new_proc_int(From, Lang) of
+ {ok, Proc} ->
+ {spawn_ok, Proc, From};
+ Error ->
+ gen_server:reply(From, {error, Error}),
+ spawn_error
+ end
+ catch _:_ ->
+ spawn_error
+ end,
+ exit(Resp);
+
+new_proc(Client) ->
+ #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
+ Resp = try
+ case new_proc_int(From, Lang) of
+ {ok, NewProc} ->
+ {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
+ {spawn_ok, Proc, From};
+ Error ->
+ gen_server:reply(From, {error, Error}),
+ spawn_error
+ end
+ catch _:_ ->
+ spawn_error
+ end,
+ exit(Resp).
+
+split_string_if_longer(String, Pos) ->
+ case length(String) > Pos of
+ true -> lists:split(Pos, String);
+ false -> false
+ end.
+
+split_by_char(String, Char) ->
+ %% 17.5 doesn't have string:split
+ %% the function doesn't handle errors
+ %% it is designed to be used only in specific context
+ Pos = string:chr(String, Char),
+ {Key, [_Eq | Value]} = lists:split(Pos - 1, String),
+ {Key, Value}.
+
+get_servers_from_env(Spec) ->
+ SpecLen = length(Spec),
+ % loop over os:getenv(), match SPEC_
+ lists:filtermap(fun(EnvStr) ->
+ case split_string_if_longer(EnvStr, SpecLen) of
+ {Spec, Rest} ->
+ {true, split_by_char(Rest, $=)};
+ _ ->
+ false
+ end
+ end, os:getenv()).
+
+get_query_server(LangStr) ->
+ case ets:lookup(?SERVERS, string:to_upper(LangStr)) of
+ [{_, Command}] -> Command;
+ _ -> undefined
+ end.
+
+native_query_server_enabled() ->
+ % 1. [native_query_server] enable_erlang_query_server = true | false
+ % 2. if [native_query_server] erlang == {couch_native_process, start_link, []} -> pretend true as well
+ NativeEnabled = config:get_boolean("native_query_servers", "enable_erlang_query_server", false),
+ NativeLegacyConfig = config:get("native_query_servers", "erlang", ""),
+ NativeLegacyEnabled = NativeLegacyConfig =:= "{couch_native_process, start_link, []}",
+ NativeEnabled orelse NativeLegacyEnabled.
+
+maybe_configure_erlang_native_servers() ->
+ case native_query_server_enabled() of
+ true ->
+ ets:insert(?SERVERS, [
+ {"ERLANG", {couch_js_native_process, start_link, []}}]);
+ _Else ->
+ ok
+ end.
+
+new_proc_int(From, Lang) when is_binary(Lang) ->
+ LangStr = binary_to_list(Lang),
+ case get_query_server(LangStr) of
+ undefined ->
+ gen_server:reply(From, {unknown_query_language, Lang});
+ {M, F, A} ->
+ {ok, Pid} = apply(M, F, A),
+ make_proc(Pid, Lang, M);
+ Command ->
+ {ok, Pid} = couch_js_os_process:start_link(Command),
+ make_proc(Pid, Lang, couch_js_os_process)
+ end.
+
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = couch_js_query_servers:proc_prompt(
+ export_proc(Proc),
+ [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
+
+
+make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
+ Proc = #proc_int{
+ lang = Lang,
+ pid = Pid,
+ prompt_fun = {Mod, prompt},
+ set_timeout_fun = {Mod, set_timeout},
+ stop_fun = {Mod, stop}
+ },
+ unlink(Pid),
+ {ok, Proc}.
+
+
+assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
+ Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+ ets:insert(?PROCS, Proc),
+ export_proc(Proc);
+assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
+ {Pid, _} = Client#client.from,
+ assign_proc(Pid, Proc).
+
+
+return_proc(#state{} = State, #proc_int{} = ProcInt) ->
+ #proc_int{pid = Pid, lang = Lang} = ProcInt,
+ NewState = case is_process_alive(Pid) of true ->
+ case ProcInt#proc_int.t0 < State#state.threshold_ts of
+ true ->
+ remove_proc(State, ProcInt);
+ false ->
+ gen_server:cast(Pid, garbage_collect),
+ true = ets:update_element(?PROCS, Pid, [
+ {#proc_int.client, undefined}
+ ]),
+ State
+ end;
+ false ->
+ remove_proc(State, ProcInt)
+ end,
+ flush_waiters(NewState, Lang).
+
+
+remove_proc(State, #proc_int{}=Proc) ->
+ ets:delete(?PROCS, Proc#proc_int.pid),
+ case is_process_alive(Proc#proc_int.pid) of true ->
+ unlink(Proc#proc_int.pid),
+ gen_server:cast(Proc#proc_int.pid, stop);
+ false ->
+ ok
+ end,
+ Counts = State#state.counts,
+ Lang = Proc#proc_int.lang,
+ State#state{
+ counts = dict:update_counter(Lang, -1, Counts)
+ }.
+
+
+-spec export_proc(#proc_int{}) -> #proc{}.
+export_proc(#proc_int{} = ProcInt) ->
+ ProcIntList = tuple_to_list(ProcInt),
+ ProcLen = record_info(size, proc),
+ [_ | Data] = lists:sublist(ProcIntList, ProcLen),
+ list_to_tuple([proc | Data]).
+
+
+flush_waiters(State) ->
+ dict:fold(fun(Lang, Count, StateAcc) ->
+ case Count < State#state.hard_limit of
+ true ->
+ flush_waiters(StateAcc, Lang);
+ false ->
+ StateAcc
+ end
+ end, State, State#state.counts).
+
+
+flush_waiters(State, Lang) ->
+ CanSpawn = can_spawn(State, Lang),
+ case get_waiting_client(Lang) of
+ #client{from = From} = Client ->
+ case find_proc(Client) of
+ {ok, ProcInt} ->
+ Proc = assign_proc(Client, ProcInt),
+ gen_server:reply(From, {ok, Proc, State#state.config}),
+ remove_waiting_client(Client),
+ flush_waiters(State, Lang);
+ {error, Error} ->
+ gen_server:reply(From, {error, Error}),
+ remove_waiting_client(Client),
+ flush_waiters(State, Lang);
+ not_found when CanSpawn ->
+ NewState = spawn_proc(State, Client),
+ remove_waiting_client(Client),
+ flush_waiters(NewState, Lang);
+ not_found ->
+ State
+ end;
+ undefined ->
+ State
+ end.
+
+
+add_waiting_client(Client) ->
+ ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+
+-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
+get_waiting_client(Lang) ->
+ case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
+ '$end_of_table' ->
+ undefined;
+ {[#client{}=Client], _} ->
+ Client
+ end.
+
+
+remove_waiting_client(#client{timestamp = Timestamp}) ->
+ ets:delete(?WAITERS, Timestamp).
+
+
+can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
+ case dict:find(Lang, Counts) of
+ {ok, Count} -> Count < HardLimit;
+ error -> true
+ end.
+
+
+get_proc_config() ->
+ Limit = config:get("query_server_config", "reduce_limit", "true"),
+ Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+ {[
+ {<<"reduce_limit">>, list_to_atom(Limit)},
+ {<<"timeout">>, list_to_integer(Timeout)}
+ ]}.
+
+
+get_hard_limit() ->
+ LimStr = config:get("query_server_config", "os_process_limit", "100"),
+ list_to_integer(LimStr).
+
+
+get_soft_limit() ->
+ LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
+ list_to_integer(LimStr).
diff --git a/src/couch_js/src/couch_js_query_servers.erl b/src/couch_js/src/couch_js_query_servers.erl
new file mode 100644
index 000000000..12dc864ea
--- /dev/null
+++ b/src/couch_js/src/couch_js_query_servers.erl
@@ -0,0 +1,683 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_query_servers).
+
+-export([try_compile/4]).
+-export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([filter_docs/5]).
+-export([filter_view/3]).
+-export([finalize/2]).
+-export([rewrite/3]).
+
+-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+
+% For 210-os-proc-pool.t
+-export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(SUMERROR, <<"The _sum function requires that map values be numbers, "
+ "arrays of numbers, or objects. Objects cannot be mixed with other "
+ "data structures. Objects can be arbitrarily nested, provided that the values "
+ "for all fields are themselves numbers, arrays of numbers, or objects.">>).
+
+-define(STATERROR, <<"The _stats function requires that map values be numbers "
+ "or arrays of numbers, not '~p'">>).
+
+
+try_compile(Proc, FunctionType, FunctionName, FunctionSource) ->
+ try
+ proc_prompt(Proc, [<<"add_fun">>, FunctionSource]),
+ ok
+ catch
+ {compilation_error, E} ->
+ Fmt = "Compilation of the ~s function in the '~s' view failed: ~s",
+ Msg = io_lib:format(Fmt, [FunctionType, FunctionName, E]),
+ throw({compilation_error, Msg});
+ {os_process_error, {exit_status, ExitStatus}} ->
+ Fmt = "Compilation of the ~s function in the '~s' view failed with exit status: ~p",
+ Msg = io_lib:format(Fmt, [FunctionType, FunctionName, ExitStatus]),
+ throw({compilation_error, Msg})
+ end.
+
+start_doc_map(Lang, Functions, Lib) ->
+ Proc = get_os_process(Lang),
+ case Lib of
+ {[]} -> ok;
+ Lib ->
+ true = proc_prompt(Proc, [<<"add_lib">>, Lib])
+ end,
+ lists:foreach(fun(FunctionSource) ->
+ true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
+ end, Functions),
+ {ok, Proc}.
+
+map_doc_raw(Proc, Doc) ->
+ Json = couch_doc:to_json_obj(Doc, []),
+ {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
+
+stop_doc_map(nil) ->
+ ok;
+stop_doc_map(Proc) ->
+ ok = ret_os_process(Proc).
+
+group_reductions_results([]) ->
+ [];
+group_reductions_results(List) ->
+ {Heads, Tails} = lists:foldl(
+ fun([H|T], {HAcc,TAcc}) ->
+ {[H|HAcc], [T|TAcc]}
+ end, {[], []}, List),
+ case Tails of
+ [[]|_] -> % no tails left
+ [Heads];
+ _ ->
+ [Heads | group_reductions_results(Tails)]
+ end.
+
+finalize(<<"_approx_count_distinct",_/binary>>, Reduction) ->
+ true = hyper:is_hyper(Reduction),
+ {ok, round(hyper:card(Reduction))};
+finalize(<<"_stats",_/binary>>, Unpacked) ->
+ {ok, pack_stats(Unpacked)};
+finalize(_RedSrc, Reduction) ->
+ {ok, Reduction}.
+
+rereduce(_Lang, [], _ReducedValues) ->
+ {ok, []};
+rereduce(Lang, RedSrcs, ReducedValues) ->
+ Grouped = group_reductions_results(ReducedValues),
+ Results = lists:zipwith(
+ fun
+ (<<"_", _/binary>> = FunSrc, Values) ->
+ {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
+ Result;
+ (FunSrc, Values) ->
+ os_rereduce(Lang, [FunSrc], Values)
+ end, RedSrcs, Grouped),
+ {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+ {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+ {OsRedSrcs, BuiltinReds} = lists:partition(fun
+ (<<"_", _/binary>>) -> false;
+ (_OsFun) -> true
+ end, RedSrcs),
+ {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
+ {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
+ recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
+
+
+recombine_reduce_results([], [], [], Acc) ->
+ {ok, lists:reverse(Acc)};
+recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
+ recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
+recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
+ recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
+
+os_reduce(_Lang, [], _KVs) ->
+ {ok, []};
+os_reduce(Lang, OsRedSrcs, KVs) ->
+ Proc = get_os_process(Lang),
+ OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
+ [true, Reductions] -> Reductions
+ catch
+ throw:{reduce_overflow_error, Msg} ->
+ [{[{reduce_overflow_error, Msg}]} || _ <- OsRedSrcs]
+ after
+ ok = ret_os_process(Proc)
+ end,
+ {ok, OsResults}.
+
+os_rereduce(Lang, OsRedSrcs, KVs) ->
+ case get_overflow_error(KVs) of
+ undefined ->
+ Proc = get_os_process(Lang),
+ try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
+ [true, [Reduction]] -> Reduction
+ catch
+ throw:{reduce_overflow_error, Msg} ->
+ {[{reduce_overflow_error, Msg}]}
+ after
+ ok = ret_os_process(Proc)
+ end;
+ Error ->
+ Error
+ end.
+
+
+get_overflow_error([]) ->
+ undefined;
+get_overflow_error([{[{reduce_overflow_error, _}]} = Error | _]) ->
+ Error;
+get_overflow_error([_ | Rest]) ->
+ get_overflow_error(Rest).
+
+
+builtin_reduce(_Re, [], _KVs, Acc) ->
+ {ok, lists:reverse(Acc)};
+builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
+ Sum = builtin_sum_rows(KVs, 0),
+ Red = check_sum_overflow(?term_size(KVs), ?term_size(Sum), Sum),
+ builtin_reduce(Re, BuiltinReds, KVs, [Red|Acc]);
+builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+ Count = length(KVs),
+ builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+ Count = builtin_sum_rows(KVs, 0),
+ builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
+ Stats = builtin_stats(Re, KVs),
+ builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]);
+builtin_reduce(Re, [<<"_approx_count_distinct",_/binary>>|BuiltinReds], KVs, Acc) ->
+ Distinct = approx_count_distinct(Re, KVs),
+ builtin_reduce(Re, BuiltinReds, KVs, [Distinct|Acc]).
+
+
+builtin_sum_rows([], Acc) ->
+ Acc;
+builtin_sum_rows([[_Key, Value] | RestKVs], Acc) ->
+ try sum_values(Value, Acc) of
+ NewAcc ->
+ builtin_sum_rows(RestKVs, NewAcc)
+ catch
+ throw:{builtin_reduce_error, Obj} ->
+ Obj;
+ throw:{invalid_value, Reason, Cause} ->
+ {[{<<"error">>, <<"builtin_reduce_error">>},
+ {<<"reason">>, Reason}, {<<"caused_by">>, Cause}]}
+ end.
+
+
+sum_values(Value, Acc) when is_number(Value), is_number(Acc) ->
+ Acc + Value;
+sum_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+ sum_arrays(Acc, Value);
+sum_values(Value, Acc) when is_number(Value), is_list(Acc) ->
+ sum_arrays(Acc, [Value]);
+sum_values(Value, Acc) when is_list(Value), is_number(Acc) ->
+ sum_arrays([Acc], Value);
+sum_values({Props}, Acc) ->
+ case lists:keyfind(<<"error">>, 1, Props) of
+ {<<"error">>, <<"builtin_reduce_error">>} ->
+ throw({builtin_reduce_error, {Props}});
+ false ->
+ ok
+ end,
+ case Acc of
+ 0 ->
+ {Props};
+ {AccProps} ->
+ {sum_objects(lists:sort(Props), lists:sort(AccProps))}
+ end;
+sum_values(Else, _Acc) ->
+ throw_sum_error(Else).
+
+sum_objects([{K1, V1} | Rest1], [{K1, V2} | Rest2]) ->
+ [{K1, sum_values(V1, V2)} | sum_objects(Rest1, Rest2)];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 < K2 ->
+ [{K1, V1} | sum_objects(Rest1, [{K2, V2} | Rest2])];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 > K2 ->
+ [{K2, V2} | sum_objects([{K1, V1} | Rest1], Rest2)];
+sum_objects([], Rest) ->
+ Rest;
+sum_objects(Rest, []) ->
+ Rest.
+
+sum_arrays([], []) ->
+ [];
+sum_arrays([_|_]=Xs, []) ->
+ Xs;
+sum_arrays([], [_|_]=Ys) ->
+ Ys;
+sum_arrays([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
+ [X+Y | sum_arrays(Xs,Ys)];
+sum_arrays(Else, _) ->
+ throw_sum_error(Else).
+
+check_sum_overflow(InSize, OutSize, Sum) ->
+ Overflowed = OutSize > 4906 andalso OutSize * 2 > InSize,
+ case config:get("query_server_config", "reduce_limit", "true") of
+ "true" when Overflowed ->
+ Msg = log_sum_overflow(InSize, OutSize),
+ {[
+ {<<"error">>, <<"builtin_reduce_error">>},
+ {<<"reason">>, Msg}
+ ]};
+ "log" when Overflowed ->
+ log_sum_overflow(InSize, OutSize),
+ Sum;
+ _ ->
+ Sum
+ end.
+
+log_sum_overflow(InSize, OutSize) ->
+ Fmt = "Reduce output must shrink more rapidly: "
+ "input size: ~b "
+ "output size: ~b",
+ Msg = iolist_to_binary(io_lib:format(Fmt, [InSize, OutSize])),
+ couch_log:error(Msg, []),
+ Msg.
+
+builtin_stats(_, []) ->
+ {0, 0, 0, 0, 0};
+builtin_stats(_, [[_,First]|Rest]) ->
+ lists:foldl(fun([_Key, Value], Acc) ->
+ stat_values(Value, Acc)
+ end, build_initial_accumulator(First), Rest).
+
+stat_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+ lists:zipwith(fun stat_values/2, Value, Acc);
+stat_values({PreRed}, Acc) when is_list(PreRed) ->
+ stat_values(unpack_stats({PreRed}), Acc);
+stat_values(Value, Acc) when is_number(Value) ->
+ stat_values({Value, 1, Value, Value, Value*Value}, Acc);
+stat_values(Value, Acc) when is_number(Acc) ->
+ stat_values(Value, {Acc, 1, Acc, Acc, Acc*Acc});
+stat_values(Value, Acc) when is_tuple(Value), is_tuple(Acc) ->
+ {Sum0, Cnt0, Min0, Max0, Sqr0} = Value,
+ {Sum1, Cnt1, Min1, Max1, Sqr1} = Acc,
+ {
+ Sum0 + Sum1,
+ Cnt0 + Cnt1,
+ erlang:min(Min0, Min1),
+ erlang:max(Max0, Max1),
+ Sqr0 + Sqr1
+ };
+stat_values(Else, _Acc) ->
+ throw_stat_error(Else).
+
+build_initial_accumulator(L) when is_list(L) ->
+ [build_initial_accumulator(X) || X <- L];
+build_initial_accumulator(X) when is_number(X) ->
+ {X, 1, X, X, X*X};
+build_initial_accumulator({_, _, _, _, _} = AlreadyUnpacked) ->
+ AlreadyUnpacked;
+build_initial_accumulator({Props}) ->
+ unpack_stats({Props});
+build_initial_accumulator(Else) ->
+ Msg = io_lib:format("non-numeric _stats input: ~w", [Else]),
+ throw({invalid_value, iolist_to_binary(Msg)}).
+
+unpack_stats({PreRed}) when is_list(PreRed) ->
+ {
+ get_number(<<"sum">>, PreRed),
+ get_number(<<"count">>, PreRed),
+ get_number(<<"min">>, PreRed),
+ get_number(<<"max">>, PreRed),
+ get_number(<<"sumsqr">>, PreRed)
+ }.
+
+
+pack_stats({Sum, Cnt, Min, Max, Sqr}) ->
+ {[{<<"sum">>,Sum}, {<<"count">>,Cnt}, {<<"min">>,Min}, {<<"max">>,Max}, {<<"sumsqr">>,Sqr}]};
+pack_stats({Packed}) ->
+ % Legacy code path before we had the finalize operation
+ {Packed};
+pack_stats(Stats) when is_list(Stats) ->
+ lists:map(fun pack_stats/1, Stats).
+
+get_number(Key, Props) ->
+ case couch_util:get_value(Key, Props) of
+ X when is_number(X) ->
+ X;
+ undefined when is_binary(Key) ->
+ get_number(binary_to_atom(Key, latin1), Props);
+ undefined ->
+ Msg = io_lib:format("user _stats input missing required field ~s (~p)",
+ [Key, Props]),
+ throw({invalid_value, iolist_to_binary(Msg)});
+ Else ->
+ Msg = io_lib:format("non-numeric _stats input received for ~s: ~w",
+ [Key, Else]),
+ throw({invalid_value, iolist_to_binary(Msg)})
+ end.
+
+% TODO allow customization of precision in the ddoc.
+approx_count_distinct(reduce, KVs) ->
+ lists:foldl(fun([[Key, _Id], _Value], Filter) ->
+ hyper:insert(term_to_binary(Key), Filter)
+ end, hyper:new(11), KVs);
+approx_count_distinct(rereduce, Reds) ->
+ hyper:union([Filter || [_, Filter] <- Reds]).
+
+% use the function stored in ddoc.validate_doc_update to test an update.
+-spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+ DDoc :: ddoc(),
+ EditDoc :: doc(),
+ DiskDoc :: doc() | nil,
+ Ctx :: user_ctx(),
+ SecObj :: sec_obj().
+
+validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+ JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
+ JsonDiskDoc = json_doc(DiskDoc),
+ Resp = ddoc_prompt(
+ DDoc,
+ [<<"validate_doc_update">>],
+ [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]
+ ),
+ if Resp == 1 -> ok; true ->
+ couch_stats:increment_counter([couchdb, query_server, vdu_rejects], 1)
+ end,
+ case Resp of
+ RespCode when RespCode =:= 1; RespCode =:= ok; RespCode =:= true ->
+ ok;
+ {[{<<"forbidden">>, Message}]} ->
+ throw({forbidden, Message});
+ {[{<<"unauthorized">>, Message}]} ->
+ throw({unauthorized, Message});
+ {[{_, Message}]} ->
+ throw({unknown_error, Message});
+ Message when is_binary(Message) ->
+ throw({unknown_error, Message})
+ end.
+
+
+rewrite(Req, Db, DDoc) ->
+ Fields = [F || F <- chttpd_external:json_req_obj_fields(),
+ F =/= <<"info">>, F =/= <<"form">>,
+ F =/= <<"uuid">>, F =/= <<"id">>],
+ JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields),
+ case ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of
+ {[{<<"forbidden">>, Message}]} ->
+ throw({forbidden, Message});
+ {[{<<"unauthorized">>, Message}]} ->
+ throw({unauthorized, Message});
+ [<<"no_dispatch_rule">>] ->
+ undefined;
+ [<<"ok">>, {V}=Rewrite] when is_list(V) ->
+ ok = validate_rewrite_response(Rewrite),
+ Rewrite;
+ [<<"ok">>, _] ->
+ throw_rewrite_error(<<"bad rewrite">>);
+ V ->
+ couch_log:error("bad rewrite return ~p", [V]),
+ throw({unknown_error, V})
+ end.
+
+validate_rewrite_response({Fields}) when is_list(Fields) ->
+ validate_rewrite_response_fields(Fields).
+
+validate_rewrite_response_fields([{Key, Value} | Rest]) ->
+ validate_rewrite_response_field(Key, Value),
+ validate_rewrite_response_fields(Rest);
+validate_rewrite_response_fields([]) ->
+ ok.
+
+validate_rewrite_response_field(<<"method">>, Method) when is_binary(Method) ->
+ ok;
+validate_rewrite_response_field(<<"method">>, _) ->
+ throw_rewrite_error(<<"bad method">>);
+validate_rewrite_response_field(<<"path">>, Path) when is_binary(Path) ->
+ ok;
+validate_rewrite_response_field(<<"path">>, _) ->
+ throw_rewrite_error(<<"bad path">>);
+validate_rewrite_response_field(<<"body">>, Body) when is_binary(Body) ->
+ ok;
+validate_rewrite_response_field(<<"body">>, _) ->
+ throw_rewrite_error(<<"bad body">>);
+validate_rewrite_response_field(<<"headers">>, {Props}=Headers) when is_list(Props) ->
+ validate_object_fields(Headers);
+validate_rewrite_response_field(<<"headers">>, _) ->
+ throw_rewrite_error(<<"bad headers">>);
+validate_rewrite_response_field(<<"query">>, {Props}=Query) when is_list(Props) ->
+ validate_object_fields(Query);
+validate_rewrite_response_field(<<"query">>, _) ->
+ throw_rewrite_error(<<"bad query">>);
+validate_rewrite_response_field(<<"code">>, Code) when is_integer(Code) andalso Code >= 200 andalso Code < 600 ->
+ ok;
+validate_rewrite_response_field(<<"code">>, _) ->
+ throw_rewrite_error(<<"bad code">>);
+validate_rewrite_response_field(K, V) ->
+ couch_log:debug("unknown rewrite field ~p=~p", [K, V]),
+ ok.
+
+validate_object_fields({Props}) when is_list(Props) ->
+ lists:foreach(fun
+ ({Key, Value}) when is_binary(Key) andalso is_binary(Value) ->
+ ok;
+ ({Key, Value}) ->
+ Reason = io_lib:format(
+ "object key/value must be strings ~p=~p", [Key, Value]),
+ throw_rewrite_error(Reason);
+ (Value) ->
+ throw_rewrite_error(io_lib:format("bad value ~p", [Value]))
+ end, Props).
+
+
+throw_rewrite_error(Reason) when is_list(Reason)->
+ throw_rewrite_error(iolist_to_binary(Reason));
+throw_rewrite_error(Reason) when is_binary(Reason) ->
+ throw({rewrite_error, Reason}).
+
+
+json_doc_options() ->
+ json_doc_options([]).
+
+json_doc_options(Options) ->
+ Limit = config:get_integer("query_server_config", "revs_limit", 20),
+ [{revs, Limit} | Options].
+
+json_doc(Doc) ->
+ json_doc(Doc, json_doc_options()).
+
+json_doc(nil, _) ->
+ null;
+json_doc(Doc, Options) ->
+ couch_doc:to_json_obj(Doc, Options).
+
+filter_view(DDoc, VName, Docs) ->
+ Options = json_doc_options(),
+ JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+ [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+ {ok, Passes}.
+
+filter_docs(Req, Db, DDoc, FName, Docs) ->
+ JsonReq = case Req of
+ {json_req, JsonObj} ->
+ JsonObj;
+ #httpd{} = HttpReq ->
+ couch_httpd_external:json_req_obj(HttpReq, Db)
+ end,
+ Options = json_doc_options(),
+ JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+ [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+ [JsonDocs, JsonReq]),
+ {ok, Passes}.
+
+ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
+ proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
+
+ddoc_prompt(DDoc, FunPath, Args) ->
+ with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+ proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
+ end).
+
+with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+ Rev = couch_doc:rev_to_str({Start, DiskRev}),
+ DDocKey = {DDocId, Rev},
+ Proc = get_ddoc_process(DDoc, DDocKey),
+ try Fun({Proc, DDocId})
+ after
+ ok = ret_os_process(Proc)
+ end.
+
+proc_prompt(Proc, Args) ->
+ case proc_prompt_raw(Proc, Args) of
+ {json, Json} ->
+ ?JSON_DECODE(Json);
+ EJson ->
+ EJson
+ end.
+
+proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
+ apply(Mod, Func, [Proc#proc.pid, Args]).
+
+raw_to_ejson({json, Json}) ->
+ ?JSON_DECODE(Json);
+raw_to_ejson(EJson) ->
+ EJson.
+
+proc_stop(Proc) ->
+ {Mod, Func} = Proc#proc.stop_fun,
+ apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+ {Mod, Func} = Proc#proc.set_timeout_fun,
+ apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
+get_os_process_timeout() ->
+ list_to_integer(config:get("couchdb", "os_process_timeout", "5000")).
+
+get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+ % remove this case statement
+ case gen_server:call(couch_js_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of
+ {ok, Proc, {QueryConfig}} ->
+ % process knows the ddoc
+ case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+ true ->
+ proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+ Proc;
+ _ ->
+ catch proc_stop(Proc),
+ get_ddoc_process(DDoc, DDocKey)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+get_os_process(Lang) ->
+ case gen_server:call(couch_js_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of
+ {ok, Proc, {QueryConfig}} ->
+ case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+ true ->
+ proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+ Proc;
+ _ ->
+ catch proc_stop(Proc),
+ get_os_process(Lang)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+ret_os_process(Proc) ->
+ true = gen_server:call(couch_js_proc_manager, {ret_proc, Proc}, infinity),
+ catch unlink(Proc#proc.pid),
+ ok.
+
+throw_sum_error(Else) ->
+ throw({invalid_value, ?SUMERROR, Else}).
+
+throw_stat_error(Else) ->
+ throw({invalid_value, iolist_to_binary(io_lib:format(?STATERROR, [Else]))}).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+builtin_sum_rows_negative_test() ->
+ A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+ E = {[{<<"error">>, <<"builtin_reduce_error">>}]},
+ ?assertEqual(E, builtin_sum_rows([["K", E]], [])),
+ % The below case is where the value is invalid, but no error because
+ % it's only one document.
+ ?assertEqual(A, builtin_sum_rows([["K", A]], [])),
+ {Result} = builtin_sum_rows([["K", A]], [1, 2, 3]),
+ ?assertEqual({<<"error">>, <<"builtin_reduce_error">>},
+ lists:keyfind(<<"error">>, 1, Result)).
+
+sum_values_test() ->
+ ?assertEqual(3, sum_values(1, 2)),
+ ?assertEqual([2,4,6], sum_values(1, [1,4,6])),
+ ?assertEqual([3,5,7], sum_values([3,2,4], [0,3,3])),
+ X = {[{<<"a">>,1}, {<<"b">>,[1,2]}, {<<"c">>, {[{<<"d">>,3}]}},
+ {<<"g">>,1}]},
+ Y = {[{<<"a">>,2}, {<<"b">>,3}, {<<"c">>, {[{<<"e">>, 5}]}},
+ {<<"f">>,1}, {<<"g">>,1}]},
+ Z = {[{<<"a">>,3}, {<<"b">>,[4,2]}, {<<"c">>, {[{<<"d">>,3},{<<"e">>,5}]}},
+ {<<"f">>,1}, {<<"g">>,2}]},
+ ?assertEqual(Z, sum_values(X, Y)),
+ ?assertEqual(Z, sum_values(Y, X)).
+
+sum_values_negative_test() ->
+ % invalid value
+ A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+ B = ["error 1", "error 2"],
+ C = [<<"error 3">>, <<"error 4">>],
+ KV = {[{<<"error">>, <<"builtin_reduce_error">>},
+ {<<"reason">>, ?SUMERROR}, {<<"caused_by">>, <<"some cause">>}]},
+ ?assertThrow({invalid_value, _, _}, sum_values(A, [1, 2, 3])),
+ ?assertThrow({invalid_value, _, _}, sum_values(A, 0)),
+ ?assertThrow({invalid_value, _, _}, sum_values(B, [1, 2])),
+ ?assertThrow({invalid_value, _, _}, sum_values(C, [0])),
+ ?assertThrow({builtin_reduce_error, KV}, sum_values(KV, [0])).
+
+stat_values_test() ->
+ ?assertEqual({1, 2, 0, 1, 1}, stat_values(1, 0)),
+ ?assertEqual({11, 2, 1, 10, 101}, stat_values(1, 10)),
+ ?assertEqual([{9, 2, 2, 7, 53},
+ {14, 2, 3, 11, 130},
+ {18, 2, 5, 13, 194}
+ ], stat_values([2,3,5], [7,11,13])).
+
+reduce_stats_test() ->
+ ?assertEqual([
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ], test_reduce(<<"_stats">>, [[[null, key], 2]])),
+
+ ?assertEqual([[
+ {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ]], test_reduce(<<"_stats">>, [[[null, key],[1,2]]])),
+
+ ?assertEqual(
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ , element(2, finalize(<<"_stats">>, {2, 1, 2, 2, 4}))),
+
+ ?assertEqual([
+ {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ], element(2, finalize(<<"_stats">>, [
+ {1, 1, 1, 1, 1},
+ {2, 1, 2, 2, 4}
+ ]))),
+
+ ?assertEqual([
+ {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ], element(2, finalize(<<"_stats">>, [
+ {1, 1, 1, 1, 1},
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ]))),
+
+ ?assertEqual([
+ {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+ {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+ ], element(2, finalize(<<"_stats">>, [
+ {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+ {2, 1, 2, 2, 4}
+ ]))),
+ ok.
+
+test_reduce(Reducer, KVs) ->
+ ?assertMatch({ok, _}, reduce(<<"javascript">>, [Reducer], KVs)),
+ {ok, Reduced} = reduce(<<"javascript">>, [Reducer], KVs),
+ {ok, Finalized} = finalize(Reducer, Reduced),
+ Finalized.
+
+-endif.
diff --git a/src/couch_js/src/couch_js_sup.erl b/src/couch_js/src/couch_js_sup.erl
new file mode 100644
index 000000000..e87546127
--- /dev/null
+++ b/src/couch_js/src/couch_js_sup.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_sup).
+-behaviour(supervisor).
+
+
+-export([
+ start_link/0
+]).
+
+-export([
+ init/1
+]).
+
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+ Flags = #{
+ strategy => one_for_one,
+ intensity => 50,
+ period => 3600
+ },
+ Children = [
+ #{
+ id => couch_js_proc_manager,
+ restart => permanent,
+ shutdown => brutal_kill,
+ start => {couch_js_proc_manager, start_link, []}
+ }
+ ],
+ {ok, {Flags, Children}}.