diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-13 15:26:24 -0800 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-13 15:26:24 -0800 |
commit | 1841256773d9ddee800c5eeacec69025019ccf05 (patch) | |
tree | a7a064cf45194febc2e5dbe383c1b673199822e4 /components/service_edge | |
parent | 8d5217fe0fbd2d1555bb7a7a96187029f19e1794 (diff) | |
download | rvi_core-1841256773d9ddee800c5eeacec69025019ccf05.tar.gz |
spawn helpers for service_edge
Diffstat (limited to 'components/service_edge')
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 129 |
1 files changed, 79 insertions, 50 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 498c36d..42e15fb 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -51,7 +51,8 @@ -record(st, { %% Component specification - cs = #component_spec{} + cs = #component_spec{}, + pending = [] }). -define(SERVICE_TABLE, rvi_local_services). @@ -451,7 +452,8 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] handle_call({ rvi, handle_local_message, - [SvcName, TimeoutArg, Parameters | Tail] = Args }, _From, St) -> + [SvcName, TimeoutArg, Parameters | Tail] = Args }, From, + #st{pending = Pend} = St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), @@ -461,18 +463,12 @@ handle_call({ rvi, handle_local_message, %% that will be accepted by the receiving node that will deliver %% the messaage to its locally connected service_name service. %% - ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), - case authorize_rpc:authorize_local_message( - CS, SvcName, [{service_name, SvcName}, - {timeout, TimeoutArg}, - %% {parameters, Parameters}, - {parameters, Parameters} - ]) of - [ok] -> - handle_local_message_(Args, CS, St); - [not_found] -> - {reply, [not_found], St} - end; + {Pid, Ref} = + spawn_monitor( + fun() -> + exit({deferred_reply, handle_local_message_(Args, CS)}) + end), + {noreply, St#st{pending = [{Pid, Ref, From}|Pend]}}; handle_call(Other, _From, St) -> ?warning("service_edge_rpc:handle_call(~p): unknown", [ Other ]), @@ -502,8 +498,51 @@ handle_cast({rvi, handle_remote_message, SvcName, Timeout, Parameters - ] }, St) -> + ] }, #st{cs = CS} = St) -> + spawn(fun() -> + handle_remote_message_( + IP, Port, SvcName, Timeout, Parameters, CS) + end), + {noreply, St}; + +handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> + %% FIXME: Should be forwarded to service. + ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", + [SvcName, TransactionID]), + + { noreply, St}; + +handle_cast(Other, St) -> + ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]), + {noreply, St}. + +handle_info({'DOWN', Ref, _, _, {deferred_reply, Reply}}, + #st{pending = Pend} = St) -> + case lists:keyfind(Ref, 2, Pend) of + {_Pid, Ref, From} = P -> + gen_server:reply(From, Reply), + {noreply, St#st{pending = Pend -- [P]}}; + false -> + {noreply, St} + end; +handle_info({'DOWN', Ref, _, _, Reason}, #st{pending = Pend} = St) -> + case lists:keyfind(Ref, 2, Pend) of + {Pid, _, From} = P -> + ?error("~p died: ~p", [Pid, Reason]), + gen_server:reply(From, [internal]), + {noreply, St#st{pending = Pend -- [P]}}; + false -> + {noreply, St} + end; +handle_info(_Info, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. +code_change(_OldVsn, St, _Extra) -> + {ok, St}. +handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> ?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]), ?debug("service_edge:remote_msg(): remote_port: ~p", [Port]), ?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]), @@ -515,7 +554,7 @@ handle_cast({rvi, handle_remote_message, [ #service_entry { url = URL }] -> %% This is a local message Parameters1 = Parameters, case authorize_rpc:authorize_remote_message( - St#st.cs, + CS, SvcName, [{remote_ip, IP}, {remote_port, Port}, @@ -523,43 +562,33 @@ handle_cast({rvi, handle_remote_message, {timeout, Timeout}, {parameters, Parameters1}]) of [ ok ] -> - forward_message_to_local_service(URL, SvcName, - Parameters, St#st.cs), - { noreply, St}; - + forward_message_to_local_service( + URL, SvcName, Parameters, CS); [ _Other ] -> - ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)", - [SvcName, _Other]), - { noreply, St} - end; + ?warning("service_entry:remote_msg(): " + "Failed to authenticate ~p (~p)", + [SvcName, _Other]) + end; [] -> ?notice("service_entry:remote_msg(): Service Disappeared ~p", - [SvcName]), - { noreply, St} - - end; - - -handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> - %% FIXME: Should be forwarded to service. - ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", - [SvcName, TransactionID]), - - { noreply, St}; - -handle_cast(Other, St) -> - ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]), - {noreply, St}. - -handle_info(_Info, St) -> - {noreply, St}. + [SvcName]) + end. -terminate(_Reason, _St) -> - ok. -code_change(_OldVsn, St, _Extra) -> - {ok, St}. +handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> + ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), + case authorize_rpc:authorize_local_message( + CS, SvcName, [{service_name, SvcName}, + {timeout, TimeoutArg}, + %% {parameters, Parameters}, + {parameters, Parameters} + ]) of + [ok] -> + do_handle_local_message_(Args, CS); + [not_found] -> + [not_found] + end. -handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> +do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> %% %% Slick but ugly. %% If the timeout is more than 24 hrs old when parsed as unix time, @@ -592,7 +621,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> SvcName, Parameters, CS), - { reply, Res , St}; + Res; _ -> %% SvcName is remote %% Ask Schedule the request to resolve the network address @@ -602,7 +631,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> SvcName, Timeout, Parameters), - { reply, [ok, TID ], St} + [ok, TID ] end. json_rpc_notification(Method, Parameters) -> |