diff options
Diffstat (limited to 'src/couch/src/couch_native_process.erl')
-rw-r--r-- | src/couch/src/couch_native_process.erl | 294 |
1 files changed, 165 insertions, 129 deletions
diff --git a/src/couch/src/couch_native_process.erl b/src/couch/src/couch_native_process.erl index eee8b2860..feea00c3a 100644 --- a/src/couch/src/couch_native_process.erl +++ b/src/couch/src/couch_native_process.erl @@ -41,8 +41,15 @@ -behaviour(gen_server). -vsn(1). --export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3, - handle_info/2]). +-export([ + start_link/0, + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + code_change/3, + handle_info/2 +]). -export([set_timeout/2, prompt/2]). -define(STATE, native_proc_state). @@ -74,15 +81,15 @@ prompt(Pid, Data) when is_list(Data) -> init([]) -> V = config:get("query_server_config", "os_process_idle_limit", "300"), Idle = list_to_integer(V) * 1000, - {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}. + {ok, #evstate{ddocs = dict:new(), idle = Idle}, Idle}. handle_call({set_timeout, TimeOut}, _From, State) -> - {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle}; - + {reply, ok, State#evstate{timeout = TimeOut}, State#evstate.idle}; handle_call({prompt, Data}, _From, State) -> - couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]), - {NewState, Resp} = try run(State, to_binary(Data)) of - {S, R} -> {S, R} + couch_log:debug("Prompt native qs: ~s", [?JSON_ENCODE(Data)]), + {NewState, Resp} = + try run(State, to_binary(Data)) of + {S, R} -> {S, R} catch throw:{error, Why} -> {State, [<<"error">>, Why, Why]} @@ -118,14 +125,14 @@ handle_info(timeout, State) -> gen_server:cast(couch_proc_manager, {os_proc_idle, self()}), erlang:garbage_collect(), {noreply, State, State#evstate.idle}; -handle_info({'EXIT',_,normal}, State) -> +handle_info({'EXIT', _, normal}, State) -> {noreply, State, State#evstate.idle}; -handle_info({'EXIT',_,Reason}, State) -> +handle_info({'EXIT', _, Reason}, State) -> {stop, Reason, State}. terminate(_Reason, _State) -> ok. code_change(_OldVersion, State, _Extra) -> {ok, State}. -run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> +run(#evstate{list_pid = Pid} = State, [<<"list_row">>, Row]) when is_pid(Pid) -> Pid ! {self(), list_row, Row}, receive {Pid, chunks, Data} -> @@ -137,124 +144,137 @@ run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) -> throw({timeout, list_cleanup}) end, process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, [<<"end">>, Data]} + {State#evstate{list_pid = nil}, [<<"end">>, Data]} after State#evstate.timeout -> throw({timeout, list_row}) end; -run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) -> +run(#evstate{list_pid = Pid} = State, [<<"list_end">>]) when is_pid(Pid) -> Pid ! {self(), list_end}, Resp = - receive - {Pid, list_end, Data} -> - receive - {'EXIT', Pid, normal} -> ok - after State#evstate.timeout -> - throw({timeout, list_cleanup}) - end, - [<<"end">>, Data] - after State#evstate.timeout -> - throw({timeout, list_end}) - end, + receive + {Pid, list_end, Data} -> + receive + {'EXIT', Pid, normal} -> ok + after State#evstate.timeout -> + throw({timeout, list_cleanup}) + end, + [<<"end">>, Data] + after State#evstate.timeout -> + throw({timeout, list_end}) + end, process_flag(trap_exit, erlang:get(do_trap)), - {State#evstate{list_pid=nil}, Resp}; -run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) -> + {State#evstate{list_pid = nil}, Resp}; +run(#evstate{list_pid = Pid} = State, _Command) when is_pid(Pid) -> {State, [<<"error">>, list_error, list_error]}; -run(#evstate{ddocs=DDocs}, [<<"reset">>]) -> - {#evstate{ddocs=DDocs}, true}; -run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) -> +run(#evstate{ddocs = DDocs}, [<<"reset">>]) -> + {#evstate{ddocs = DDocs}, true}; +run(#evstate{ddocs = DDocs, idle = Idle}, [<<"reset">>, QueryConfig]) -> NewState = #evstate{ ddocs = DDocs, query_config = QueryConfig, idle = Idle }, {NewState, true}; -run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) -> +run(#evstate{funs = Funs} = State, [<<"add_fun">>, BinFunc]) -> FunInfo = makefun(State, BinFunc), - {State#evstate{funs=Funs ++ [FunInfo]}, true}; -run(State, [<<"map_doc">> , Doc]) -> - Resp = lists:map(fun({Sig, Fun}) -> - erlang:put(Sig, []), - Fun(Doc), - lists:reverse(erlang:get(Sig)) - end, State#evstate.funs), + {State#evstate{funs = Funs ++ [FunInfo]}, true}; +run(State, [<<"map_doc">>, Doc]) -> + Resp = lists:map( + fun({Sig, Fun}) -> + erlang:put(Sig, []), + Fun(Doc), + lists:reverse(erlang:get(Sig)) + end, + State#evstate.funs + ), {State, Resp}; run(State, [<<"reduce">>, Funs, KVs]) -> {Keys, Vals} = - lists:foldl(fun([K, V], {KAcc, VAcc}) -> - {[K | KAcc], [V | VAcc]} - end, {[], []}, KVs), + lists:foldl( + fun([K, V], {KAcc, VAcc}) -> + {[K | KAcc], [V | VAcc]} + end, + {[], []}, + KVs + ), Keys2 = lists:reverse(Keys), Vals2 = lists:reverse(Vals), {State, catch reduce(State, Funs, Keys2, Vals2, false)}; run(State, [<<"rereduce">>, Funs, Vals]) -> {State, catch reduce(State, Funs, null, Vals, true)}; -run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> +run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) -> DDocs2 = store_ddoc(DDocs, DDocId, DDoc), - {State#evstate{ddocs=DDocs2}, true}; -run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) -> + {State#evstate{ddocs = DDocs2}, true}; +run(#evstate{ddocs = DDocs} = State, [<<"ddoc">>, DDocId | Rest]) -> DDoc = load_ddoc(DDocs, DDocId), ddoc(State, DDoc, Rest); run(_, Unknown) -> couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]), throw({error, unknown_command}). - + ddoc(State, {DDoc}, [FunPath, Args]) -> % load fun from the FunPath - BFun = lists:foldl(fun - (Key, {Props}) when is_list(Props) -> - couch_util:get_value(Key, Props, nil); - (_Key, Fun) when is_binary(Fun) -> - Fun; - (_Key, nil) -> - throw({error, not_found}); - (_Key, _Fun) -> - throw({error, malformed_ddoc}) - end, {DDoc}, FunPath), + BFun = lists:foldl( + fun + (Key, {Props}) when is_list(Props) -> + couch_util:get_value(Key, Props, nil); + (_Key, Fun) when is_binary(Fun) -> + Fun; + (_Key, nil) -> + throw({error, not_found}); + (_Key, _Fun) -> + throw({error, malformed_ddoc}) + end, + {DDoc}, + FunPath + ), ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args). ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) -> {State, (catch apply(Fun, Args))}; ddoc(State, {_, Fun}, [<<"rewrites">>], Args) -> {State, (catch apply(Fun, Args))}; -ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) -> +ddoc(State, {_, Fun}, [<<"filters">> | _], [Docs, Req]) -> FilterFunWrapper = fun(Doc) -> case catch Fun(Doc, Req) of - true -> true; - false -> false; - {'EXIT', Error} -> couch_log:error("~p", [Error]) + true -> true; + false -> false; + {'EXIT', Error} -> couch_log:error("~p", [Error]) end end, Resp = lists:map(FilterFunWrapper, Docs), {State, [true, Resp]}; -ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) -> +ddoc(State, {_, Fun}, [<<"views">> | _], [Docs]) -> MapFunWrapper = fun(Doc) -> case catch Fun(Doc) of - undefined -> true; - ok -> false; - false -> false; - [_|_] -> true; - {'EXIT', Error} -> couch_log:error("~p", [Error]) + undefined -> true; + ok -> false; + false -> false; + [_ | _] -> true; + {'EXIT', Error} -> couch_log:error("~p", [Error]) end end, Resp = lists:map(MapFunWrapper, Docs), {State, [true, Resp]}; -ddoc(State, {_, Fun}, [<<"shows">>|_], Args) -> - Resp = case (catch apply(Fun, Args)) of - FunResp when is_list(FunResp) -> - FunResp; - {FunResp} -> - [<<"resp">>, {FunResp}]; - FunResp -> - FunResp - end, +ddoc(State, {_, Fun}, [<<"shows">> | _], Args) -> + Resp = + case (catch apply(Fun, Args)) of + FunResp when is_list(FunResp) -> + FunResp; + {FunResp} -> + [<<"resp">>, {FunResp}]; + FunResp -> + FunResp + end, {State, Resp}; -ddoc(State, {_, Fun}, [<<"updates">>|_], Args) -> - Resp = case (catch apply(Fun, Args)) of - [JsonDoc, JsonResp] -> - [<<"up">>, JsonDoc, JsonResp] - end, +ddoc(State, {_, Fun}, [<<"updates">> | _], Args) -> + Resp = + case (catch apply(Fun, Args)) of + [JsonDoc, JsonResp] -> + [<<"up">>, JsonDoc, JsonResp] + end, {State, Resp}; -ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> +ddoc(State, {Sig, Fun}, [<<"lists">> | _], Args) -> Self = self(), SpawnFun = fun() -> LastChunk = (catch apply(Fun, Args)), @@ -270,22 +290,22 @@ ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) -> ok end, LastChunks = - case erlang:get(Sig) of - undefined -> [LastChunk]; - OtherChunks -> [LastChunk | OtherChunks] - end, + case erlang:get(Sig) of + undefined -> [LastChunk]; + OtherChunks -> [LastChunk | OtherChunks] + end, Self ! {self(), list_end, lists:reverse(LastChunks)} end, erlang:put(do_trap, process_flag(trap_exit, true)), Pid = spawn_link(SpawnFun), Resp = - receive - {Pid, start, Chunks, JsonResp} -> - [<<"start">>, Chunks, JsonResp] - after State#evstate.timeout -> - throw({timeout, list_start}) - end, - {State#evstate{list_pid=Pid}, Resp}. + receive + {Pid, start, Chunks, JsonResp} -> + [<<"start">>, Chunks, JsonResp] + after State#evstate.timeout -> + throw({timeout, list_start}) + end, + {State#evstate{list_pid = Pid}, Resp}. store_ddoc(DDocs, DDocId, DDoc) -> dict:store(DDocId, DDoc, DDocs). @@ -293,7 +313,11 @@ load_ddoc(DDocs, DDocId) -> try dict:fetch(DDocId, DDocs) of {DDoc} -> {DDoc} catch - _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))}) + _:_Else -> + throw( + {error, + ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s", [DDocId]))} + ) end. bindings(State, Sig) -> @@ -316,10 +340,10 @@ bindings(State, Sig, DDoc) -> Send = fun(Chunk) -> Curr = - case erlang:get(Sig) of - undefined -> []; - Else -> Else - end, + case erlang:get(Sig) of + undefined -> []; + Else -> Else + end, erlang:put(Sig, [Chunk | Curr]) end, @@ -329,10 +353,10 @@ bindings(State, Sig, DDoc) -> ok; _ -> Chunks = - case erlang:get(Sig) of - undefined -> []; - CurrChunks -> CurrChunks - end, + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, Self ! {self(), chunks, lists:reverse(Chunks)} end, erlang:put(Sig, []), @@ -343,7 +367,7 @@ bindings(State, Sig, DDoc) -> throw({timeout, list_pid_getrow}) end end, - + FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end, Bindings = [ @@ -357,7 +381,8 @@ bindings(State, Sig, DDoc) -> case DDoc of {_Props} -> Bindings ++ [{'DDoc', DDoc}]; - _Else -> Bindings + _Else -> + Bindings end. % thanks to erlview, via: @@ -373,30 +398,41 @@ makefun(State, Source, {DDoc}) -> makefun(_State, Source, BindFuns) when is_list(BindFuns) -> FunStr = binary_to_list(Source), {ok, Tokens, _} = erl_scan:string(FunStr), - Form = case (catch erl_parse:parse_exprs(Tokens)) of - {ok, [ParsedForm]} -> - ParsedForm; - {error, {LineNum, _Mod, [Mesg, Params]}}=Error -> - couch_log:error("Syntax error on line: ~p~n~s~p~n", - [LineNum, Mesg, Params]), - throw(Error) - end, - Bindings = lists:foldl(fun({Name, Fun}, Acc) -> - erl_eval:add_binding(Name, Fun, Acc) - end, erl_eval:new_bindings(), BindFuns), + Form = + case (catch erl_parse:parse_exprs(Tokens)) of + {ok, [ParsedForm]} -> + ParsedForm; + {error, {LineNum, _Mod, [Mesg, Params]}} = Error -> + couch_log:error( + "Syntax error on line: ~p~n~s~p~n", + [LineNum, Mesg, Params] + ), + throw(Error) + end, + Bindings = lists:foldl( + fun({Name, Fun}, Acc) -> + erl_eval:add_binding(Name, Fun, Acc) + end, + erl_eval:new_bindings(), + BindFuns + ), {value, Fun, _} = erl_eval:expr(Form, Bindings), Fun. reduce(State, BinFuns, Keys, Vals, ReReduce) -> - Funs = case is_list(BinFuns) of - true -> - lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); - _ -> - [makefun(State, BinFuns)] - end, - Reds = lists:map(fun({_Sig, Fun}) -> - Fun(Keys, Vals, ReReduce) - end, Funs), + Funs = + case is_list(BinFuns) of + true -> + lists:map(fun(BF) -> makefun(State, BF) end, BinFuns); + _ -> + [makefun(State, BinFuns)] + end, + Reds = lists:map( + fun({_Sig, Fun}) -> + Fun(Keys, Vals, ReReduce) + end, + Funs + ), [true, Reds]. foldrows(GetRow, ProcRow, Acc) -> @@ -416,15 +452,15 @@ start_list_resp(Self, Sig) -> case erlang:get(list_started) of undefined -> Headers = - case erlang:get(list_headers) of - undefined -> {[{<<"headers">>, {[]}}]}; - CurrHdrs -> CurrHdrs - end, + case erlang:get(list_headers) of + undefined -> {[{<<"headers">>, {[]}}]}; + CurrHdrs -> CurrHdrs + end, Chunks = - case erlang:get(Sig) of - undefined -> []; - CurrChunks -> CurrChunks - end, + case erlang:get(Sig) of + undefined -> []; + CurrChunks -> CurrChunks + end, Self ! {self(), start, lists:reverse(Chunks), Headers}, erlang:put(list_started, true), erlang:put(Sig, []), |