diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-13 15:26:24 -0800 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-13 15:26:24 -0800 |
commit | 1841256773d9ddee800c5eeacec69025019ccf05 (patch) | |
tree | a7a064cf45194febc2e5dbe383c1b673199822e4 | |
parent | 8d5217fe0fbd2d1555bb7a7a96187029f19e1794 (diff) | |
download | rvi_core-1841256773d9ddee800c5eeacec69025019ccf05.tar.gz |
spawn helpers for service_edge
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 129 | ||||
-rw-r--r-- | test/rvi_core_SUITE.erl | 41 |
2 files changed, 116 insertions, 54 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 498c36d..42e15fb 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -51,7 +51,8 @@ -record(st, { %% Component specification - cs = #component_spec{} + cs = #component_spec{}, + pending = [] }). -define(SERVICE_TABLE, rvi_local_services). @@ -451,7 +452,8 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] handle_call({ rvi, handle_local_message, - [SvcName, TimeoutArg, Parameters | Tail] = Args }, _From, St) -> + [SvcName, TimeoutArg, Parameters | Tail] = Args }, From, + #st{pending = Pend} = St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), @@ -461,18 +463,12 @@ handle_call({ rvi, handle_local_message, %% that will be accepted by the receiving node that will deliver %% the messaage to its locally connected service_name service. %% - ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), - case authorize_rpc:authorize_local_message( - CS, SvcName, [{service_name, SvcName}, - {timeout, TimeoutArg}, - %% {parameters, Parameters}, - {parameters, Parameters} - ]) of - [ok] -> - handle_local_message_(Args, CS, St); - [not_found] -> - {reply, [not_found], St} - end; + {Pid, Ref} = + spawn_monitor( + fun() -> + exit({deferred_reply, handle_local_message_(Args, CS)}) + end), + {noreply, St#st{pending = [{Pid, Ref, From}|Pend]}}; handle_call(Other, _From, St) -> ?warning("service_edge_rpc:handle_call(~p): unknown", [ Other ]), @@ -502,8 +498,51 @@ handle_cast({rvi, handle_remote_message, SvcName, Timeout, Parameters - ] }, St) -> + ] }, #st{cs = CS} = St) -> + spawn(fun() -> + handle_remote_message_( + IP, Port, SvcName, Timeout, Parameters, CS) + end), + {noreply, St}; + +handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> + %% FIXME: Should be forwarded to service. + ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", + [SvcName, TransactionID]), + + { noreply, St}; + +handle_cast(Other, St) -> + ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]), + {noreply, St}. + +handle_info({'DOWN', Ref, _, _, {deferred_reply, Reply}}, + #st{pending = Pend} = St) -> + case lists:keyfind(Ref, 2, Pend) of + {_Pid, Ref, From} = P -> + gen_server:reply(From, Reply), + {noreply, St#st{pending = Pend -- [P]}}; + false -> + {noreply, St} + end; +handle_info({'DOWN', Ref, _, _, Reason}, #st{pending = Pend} = St) -> + case lists:keyfind(Ref, 2, Pend) of + {Pid, _, From} = P -> + ?error("~p died: ~p", [Pid, Reason]), + gen_server:reply(From, [internal]), + {noreply, St#st{pending = Pend -- [P]}}; + false -> + {noreply, St} + end; +handle_info(_Info, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. +code_change(_OldVsn, St, _Extra) -> + {ok, St}. +handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) -> ?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]), ?debug("service_edge:remote_msg(): remote_port: ~p", [Port]), ?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]), @@ -515,7 +554,7 @@ handle_cast({rvi, handle_remote_message, [ #service_entry { url = URL }] -> %% This is a local message Parameters1 = Parameters, case authorize_rpc:authorize_remote_message( - St#st.cs, + CS, SvcName, [{remote_ip, IP}, {remote_port, Port}, @@ -523,43 +562,33 @@ handle_cast({rvi, handle_remote_message, {timeout, Timeout}, {parameters, Parameters1}]) of [ ok ] -> - forward_message_to_local_service(URL, SvcName, - Parameters, St#st.cs), - { noreply, St}; - + forward_message_to_local_service( + URL, SvcName, Parameters, CS); [ _Other ] -> - ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)", - [SvcName, _Other]), - { noreply, St} - end; + ?warning("service_entry:remote_msg(): " + "Failed to authenticate ~p (~p)", + [SvcName, _Other]) + end; [] -> ?notice("service_entry:remote_msg(): Service Disappeared ~p", - [SvcName]), - { noreply, St} - - end; - - -handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> - %% FIXME: Should be forwarded to service. - ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", - [SvcName, TransactionID]), - - { noreply, St}; - -handle_cast(Other, St) -> - ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]), - {noreply, St}. - -handle_info(_Info, St) -> - {noreply, St}. + [SvcName]) + end. -terminate(_Reason, _St) -> - ok. -code_change(_OldVsn, St, _Extra) -> - {ok, St}. +handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) -> + ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), + case authorize_rpc:authorize_local_message( + CS, SvcName, [{service_name, SvcName}, + {timeout, TimeoutArg}, + %% {parameters, Parameters}, + {parameters, Parameters} + ]) of + [ok] -> + do_handle_local_message_(Args, CS); + [not_found] -> + [not_found] + end. -handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> +do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) -> %% %% Slick but ugly. %% If the timeout is more than 24 hrs old when parsed as unix time, @@ -592,7 +621,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> SvcName, Parameters, CS), - { reply, Res , St}; + Res; _ -> %% SvcName is remote %% Ask Schedule the request to resolve the network address @@ -602,7 +631,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) -> SvcName, Timeout, Parameters), - { reply, [ok, TID ], St} + [ok, TID ] end. json_rpc_notification(Method, Parameters) -> diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl index b907584..bd76d91 100644 --- a/test/rvi_core_SUITE.erl +++ b/test/rvi_core_SUITE.erl @@ -36,6 +36,7 @@ t_register_sota_service/1, t_call_lock_service/1, t_call_sota_service/1, + t_multicall_sota_service/1, t_remote_call_lock_service/1, t_check_rvi_log/1, t_no_errors/1 @@ -84,6 +85,7 @@ groups() -> t_register_sota_service, t_call_lock_service, t_call_sota_service, + t_multicall_sota_service, t_remote_call_lock_service, t_no_errors ]}, @@ -96,6 +98,7 @@ groups() -> t_call_lock_service, t_remote_call_lock_service, t_call_sota_service, + t_multicall_sota_service, t_check_rvi_log, t_no_errors ]}, @@ -107,6 +110,7 @@ groups() -> t_register_sota_service, t_call_lock_service, t_call_sota_service, + t_multicall_sota_service, t_remote_call_lock_service, t_no_errors ]}, @@ -118,6 +122,7 @@ groups() -> t_register_sota_service, t_call_lock_service, t_call_sota_service, + t_multicall_sota_service, t_remote_call_lock_service, t_no_errors ]} @@ -281,10 +286,38 @@ t_register_sota_service(_Config) -> timer:sleep(2000). t_call_sota_service(_Config) -> + call_sota_service_(sota_client, sota_bin()). + +t_multicall_sota_service(_Config) -> + Data = <<"abc">>, + Pids = [spawn_monitor(fun() -> + exit({ok, call_sota_service_(N, Data)}) + end) + || N <- [client1, + client2, + client3, + client4, + client5]], + collect(Pids). + +collect([{Pid, Ref} | T]) -> + receive + {'DOWN', Ref, _, _, {ok, ok}} -> + collect(T); + {'DOWN', Ref, _, _, Reason} -> + [exit(P, kill) || {P,_} <- T], + error(Reason) + after 30000 -> + error(timeout) + end; +collect([]) -> + ok. + + +call_sota_service_(RegName, Data) -> {Mega, Secs, _} = os:timestamp(), Timeout = Mega * 1000000 + Secs + 60, - register(sota_client, self()), - Data = sota_bin(), + register(RegName, self()), json_rpc_request(service_edge("backend"), <<"message">>, [ @@ -293,7 +326,7 @@ t_call_sota_service(_Config) -> {<<"parameters">>, [ {<<"data">>, Data}, - {<<"sendto">>, <<"sota_client">>}, + {<<"sendto">>, atom_to_binary(RegName, latin1)}, {<<"rvi.max_msg_size">>, 100} ]} ]), @@ -305,7 +338,7 @@ t_call_sota_service(_Config) -> {message, Other} -> ct:log("wrong message: ~p", [Other]), error({unmatched, Other}) - after 10000 -> + after 30000 -> error(timeout) end. |