summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_native_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_native_process.erl')
-rw-r--r--src/couch/src/couch_native_process.erl294
1 files changed, 165 insertions, 129 deletions
diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl
index eee8b2860..feea00c3a 100644
--- a/src/couch/src/couch_native_process.erl
+++ b/src/couch/src/couch_native_process.erl
@@ -41,8 +41,15 @@
-behaviour(gen_server).
-vsn(1).
--export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
- handle_info/2]).
+-export([
+ start_link/0,
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ code_change/3,
+ handle_info/2
+]).
-export([set_timeout/2, prompt/2]).
-define(STATE, native_proc_state).
@@ -74,15 +81,15 @@ prompt(Pid, Data) when is_list(Data) ->
init([]) ->
V = config:get("query_server_config", "os_process_idle_limit", "300"),
Idle = list_to_integer(V) * 1000,
- {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}.
+ {ok, #evstate{ddocs = dict:new(), idle = Idle}, Idle}.
handle_call({set_timeout, TimeOut}, _From, State) ->
- {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle};
-
+ {reply, ok, State#evstate{timeout = TimeOut}, State#evstate.idle};
handle_call({prompt, Data}, _From, State) ->
- couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
- {NewState, Resp} = try run(State, to_binary(Data)) of
- {S, R} -> {S, R}
+ couch_log:debug("Prompt native qs: ~s", [?JSON_ENCODE(Data)]),
+ {NewState, Resp} =
+ try run(State, to_binary(Data)) of
+ {S, R} -> {S, R}
catch
throw:{error, Why} ->
{State, [<<"error">>, Why, Why]}
@@ -118,14 +125,14 @@ handle_info(timeout, State) ->
gen_server:cast(couch_proc_manager, {os_proc_idle, self()}),
erlang:garbage_collect(),
{noreply, State, State#evstate.idle};
-handle_info({'EXIT',_,normal}, State) ->
+handle_info({'EXIT', _, normal}, State) ->
{noreply, State, State#evstate.idle};
-handle_info({'EXIT',_,Reason}, State) ->
+handle_info({'EXIT', _, Reason}, State) ->
{stop, Reason, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVersion, State, _Extra) -> {ok, State}.
-run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+run(#evstate{list_pid = Pid} = State, [<<"list_row">>, Row]) when is_pid(Pid) ->
Pid ! {self(), list_row, Row},
receive
{Pid, chunks, Data} ->
@@ -137,124 +144,137 @@ run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
throw({timeout, list_cleanup})
end,
process_flag(trap_exit, erlang:get(do_trap)),
- {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+ {State#evstate{list_pid = nil}, [<<"end">>, Data]}
after State#evstate.timeout ->
throw({timeout, list_row})
end;
-run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+run(#evstate{list_pid = Pid} = State, [<<"list_end">>]) when is_pid(Pid) ->
Pid ! {self(), list_end},
Resp =
- receive
- {Pid, list_end, Data} ->
- receive
- {'EXIT', Pid, normal} -> ok
- after State#evstate.timeout ->
- throw({timeout, list_cleanup})
- end,
- [<<"end">>, Data]
- after State#evstate.timeout ->
- throw({timeout, list_end})
- end,
+ receive
+ {Pid, list_end, Data} ->
+ receive
+ {'EXIT', Pid, normal} -> ok
+ after State#evstate.timeout ->
+ throw({timeout, list_cleanup})
+ end,
+ [<<"end">>, Data]
+ after State#evstate.timeout ->
+ throw({timeout, list_end})
+ end,
process_flag(trap_exit, erlang:get(do_trap)),
- {State#evstate{list_pid=nil}, Resp};
-run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+ {State#evstate{list_pid = nil}, Resp};
+run(#evstate{list_pid = Pid} = State, _Command) when is_pid(Pid) ->
{State, [<<"error">>, list_error, list_error]};
-run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
- {#evstate{ddocs=DDocs}, true};
-run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) ->
+run(#evstate{ddocs = DDocs}, [<<"reset">>]) ->
+ {#evstate{ddocs = DDocs}, true};
+run(#evstate{ddocs = DDocs, idle = Idle}, [<<"reset">>, QueryConfig]) ->
NewState = #evstate{
ddocs = DDocs,
query_config = QueryConfig,
idle = Idle
},
{NewState, true};
-run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+run(#evstate{funs = Funs} = State, [<<"add_fun">>, BinFunc]) ->
FunInfo = makefun(State, BinFunc),
- {State#evstate{funs=Funs ++ [FunInfo]}, true};
-run(State, [<<"map_doc">> , Doc]) ->
- Resp = lists:map(fun({Sig, Fun}) ->
- erlang:put(Sig, []),
- Fun(Doc),
- lists:reverse(erlang:get(Sig))
- end, State#evstate.funs),
+ {State#evstate{funs = Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">>, Doc]) ->
+ Resp = lists:map(
+ fun({Sig, Fun}) ->
+ erlang:put(Sig, []),
+ Fun(Doc),
+ lists:reverse(erlang:get(Sig))
+ end,
+ State#evstate.funs
+ ),
{State, Resp};
run(State, [<<"reduce">>, Funs, KVs]) ->
{Keys, Vals} =
- lists:foldl(fun([K, V], {KAcc, VAcc}) ->
- {[K | KAcc], [V | VAcc]}
- end, {[], []}, KVs),
+ lists:foldl(
+ fun([K, V], {KAcc, VAcc}) ->
+ {[K | KAcc], [V | VAcc]}
+ end,
+ {[], []},
+ KVs
+ ),
Keys2 = lists:reverse(Keys),
Vals2 = lists:reverse(Vals),
{State, catch reduce(State, Funs, Keys2, Vals2, false)};
run(State, [<<"rereduce">>, Funs, Vals]) ->
{State, catch reduce(State, Funs, null, Vals, true)};
-run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
- {State#evstate{ddocs=DDocs2}, true};
-run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+ {State#evstate{ddocs = DDocs2}, true};
+run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, DDocId | Rest]) ->
DDoc = load_ddoc(DDocs, DDocId),
ddoc(State, DDoc, Rest);
run(_, Unknown) ->
couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]),
throw({error, unknown_command}).
-
+
ddoc(State, {DDoc}, [FunPath, Args]) ->
% load fun from the FunPath
- BFun = lists:foldl(fun
- (Key, {Props}) when is_list(Props) ->
- couch_util:get_value(Key, Props, nil);
- (_Key, Fun) when is_binary(Fun) ->
- Fun;
- (_Key, nil) ->
- throw({error, not_found});
- (_Key, _Fun) ->
- throw({error, malformed_ddoc})
- end, {DDoc}, FunPath),
+ BFun = lists:foldl(
+ fun
+ (Key, {Props}) when is_list(Props) ->
+ couch_util:get_value(Key, Props, nil);
+ (_Key, Fun) when is_binary(Fun) ->
+ Fun;
+ (_Key, nil) ->
+ throw({error, not_found});
+ (_Key, _Fun) ->
+ throw({error, malformed_ddoc})
+ end,
+ {DDoc},
+ FunPath
+ ),
ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
{State, (catch apply(Fun, Args))};
ddoc(State, {_, Fun}, [<<"rewrites">>], Args) ->
{State, (catch apply(Fun, Args))};
-ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
+ddoc(State, {_, Fun}, [<<"filters">> | _], [Docs, Req]) ->
FilterFunWrapper = fun(Doc) ->
case catch Fun(Doc, Req) of
- true -> true;
- false -> false;
- {'EXIT', Error} -> couch_log:error("~p", [Error])
+ true -> true;
+ false -> false;
+ {'EXIT', Error} -> couch_log:error("~p", [Error])
end
end,
Resp = lists:map(FilterFunWrapper, Docs),
{State, [true, Resp]};
-ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) ->
+ddoc(State, {_, Fun}, [<<"views">> | _], [Docs]) ->
MapFunWrapper = fun(Doc) ->
case catch Fun(Doc) of
- undefined -> true;
- ok -> false;
- false -> false;
- [_|_] -> true;
- {'EXIT', Error} -> couch_log:error("~p", [Error])
+ undefined -> true;
+ ok -> false;
+ false -> false;
+ [_ | _] -> true;
+ {'EXIT', Error} -> couch_log:error("~p", [Error])
end
end,
Resp = lists:map(MapFunWrapper, Docs),
{State, [true, Resp]};
-ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
- Resp = case (catch apply(Fun, Args)) of
- FunResp when is_list(FunResp) ->
- FunResp;
- {FunResp} ->
- [<<"resp">>, {FunResp}];
- FunResp ->
- FunResp
- end,
+ddoc(State, {_, Fun}, [<<"shows">> | _], Args) ->
+ Resp =
+ case (catch apply(Fun, Args)) of
+ FunResp when is_list(FunResp) ->
+ FunResp;
+ {FunResp} ->
+ [<<"resp">>, {FunResp}];
+ FunResp ->
+ FunResp
+ end,
{State, Resp};
-ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
- Resp = case (catch apply(Fun, Args)) of
- [JsonDoc, JsonResp] ->
- [<<"up">>, JsonDoc, JsonResp]
- end,
+ddoc(State, {_, Fun}, [<<"updates">> | _], Args) ->
+ Resp =
+ case (catch apply(Fun, Args)) of
+ [JsonDoc, JsonResp] ->
+ [<<"up">>, JsonDoc, JsonResp]
+ end,
{State, Resp};
-ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
+ddoc(State, {Sig, Fun}, [<<"lists">> | _], Args) ->
Self = self(),
SpawnFun = fun() ->
LastChunk = (catch apply(Fun, Args)),
@@ -270,22 +290,22 @@ ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
ok
end,
LastChunks =
- case erlang:get(Sig) of
- undefined -> [LastChunk];
- OtherChunks -> [LastChunk | OtherChunks]
- end,
+ case erlang:get(Sig) of
+ undefined -> [LastChunk];
+ OtherChunks -> [LastChunk | OtherChunks]
+ end,
Self ! {self(), list_end, lists:reverse(LastChunks)}
end,
erlang:put(do_trap, process_flag(trap_exit, true)),
Pid = spawn_link(SpawnFun),
Resp =
- receive
- {Pid, start, Chunks, JsonResp} ->
- [<<"start">>, Chunks, JsonResp]
- after State#evstate.timeout ->
- throw({timeout, list_start})
- end,
- {State#evstate{list_pid=Pid}, Resp}.
+ receive
+ {Pid, start, Chunks, JsonResp} ->
+ [<<"start">>, Chunks, JsonResp]
+ after State#evstate.timeout ->
+ throw({timeout, list_start})
+ end,
+ {State#evstate{list_pid = Pid}, Resp}.
store_ddoc(DDocs, DDocId, DDoc) ->
dict:store(DDocId, DDoc, DDocs).
@@ -293,7 +313,11 @@ load_ddoc(DDocs, DDocId) ->
try dict:fetch(DDocId, DDocs) of
{DDoc} -> {DDoc}
catch
- _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+ _:_Else ->
+ throw(
+ {error,
+ ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s", [DDocId]))}
+ )
end.
bindings(State, Sig) ->
@@ -316,10 +340,10 @@ bindings(State, Sig, DDoc) ->
Send = fun(Chunk) ->
Curr =
- case erlang:get(Sig) of
- undefined -> [];
- Else -> Else
- end,
+ case erlang:get(Sig) of
+ undefined -> [];
+ Else -> Else
+ end,
erlang:put(Sig, [Chunk | Curr])
end,
@@ -329,10 +353,10 @@ bindings(State, Sig, DDoc) ->
ok;
_ ->
Chunks =
- case erlang:get(Sig) of
- undefined -> [];
- CurrChunks -> CurrChunks
- end,
+ case erlang:get(Sig) of
+ undefined -> [];
+ CurrChunks -> CurrChunks
+ end,
Self ! {self(), chunks, lists:reverse(Chunks)}
end,
erlang:put(Sig, []),
@@ -343,7 +367,7 @@ bindings(State, Sig, DDoc) ->
throw({timeout, list_pid_getrow})
end
end,
-
+
FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
Bindings = [
@@ -357,7 +381,8 @@ bindings(State, Sig, DDoc) ->
case DDoc of
{_Props} ->
Bindings ++ [{'DDoc', DDoc}];
- _Else -> Bindings
+ _Else ->
+ Bindings
end.
% thanks to erlview, via:
@@ -373,30 +398,41 @@ makefun(State, Source, {DDoc}) ->
makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
FunStr = binary_to_list(Source),
{ok, Tokens, _} = erl_scan:string(FunStr),
- Form = case (catch erl_parse:parse_exprs(Tokens)) of
- {ok, [ParsedForm]} ->
- ParsedForm;
- {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
- couch_log:error("Syntax error on line: ~p~n~s~p~n",
- [LineNum, Mesg, Params]),
- throw(Error)
- end,
- Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
- erl_eval:add_binding(Name, Fun, Acc)
- end, erl_eval:new_bindings(), BindFuns),
+ Form =
+ case (catch erl_parse:parse_exprs(Tokens)) of
+ {ok, [ParsedForm]} ->
+ ParsedForm;
+ {error, {LineNum, _Mod, [Mesg, Params]}} = Error ->
+ couch_log:error(
+ "Syntax error on line: ~p~n~s~p~n",
+ [LineNum, Mesg, Params]
+ ),
+ throw(Error)
+ end,
+ Bindings = lists:foldl(
+ fun({Name, Fun}, Acc) ->
+ erl_eval:add_binding(Name, Fun, Acc)
+ end,
+ erl_eval:new_bindings(),
+ BindFuns
+ ),
{value, Fun, _} = erl_eval:expr(Form, Bindings),
Fun.
reduce(State, BinFuns, Keys, Vals, ReReduce) ->
- Funs = case is_list(BinFuns) of
- true ->
- lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
- _ ->
- [makefun(State, BinFuns)]
- end,
- Reds = lists:map(fun({_Sig, Fun}) ->
- Fun(Keys, Vals, ReReduce)
- end, Funs),
+ Funs =
+ case is_list(BinFuns) of
+ true ->
+ lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+ _ ->
+ [makefun(State, BinFuns)]
+ end,
+ Reds = lists:map(
+ fun({_Sig, Fun}) ->
+ Fun(Keys, Vals, ReReduce)
+ end,
+ Funs
+ ),
[true, Reds].
foldrows(GetRow, ProcRow, Acc) ->
@@ -416,15 +452,15 @@ start_list_resp(Self, Sig) ->
case erlang:get(list_started) of
undefined ->
Headers =
- case erlang:get(list_headers) of
- undefined -> {[{<<"headers">>, {[]}}]};
- CurrHdrs -> CurrHdrs
- end,
+ case erlang:get(list_headers) of
+ undefined -> {[{<<"headers">>, {[]}}]};
+ CurrHdrs -> CurrHdrs
+ end,
Chunks =
- case erlang:get(Sig) of
- undefined -> [];
- CurrChunks -> CurrChunks
- end,
+ case erlang:get(Sig) of
+ undefined -> [];
+ CurrChunks -> CurrChunks
+ end,
Self ! {self(), start, lists:reverse(Chunks), Headers},
erlang:put(list_started, true),
erlang:put(Sig, []),