summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-13 15:26:24 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-13 15:26:24 -0800
commit1841256773d9ddee800c5eeacec69025019ccf05 (patch)
treea7a064cf45194febc2e5dbe383c1b673199822e4
parent8d5217fe0fbd2d1555bb7a7a96187029f19e1794 (diff)
downloadrvi_core-1841256773d9ddee800c5eeacec69025019ccf05.tar.gz
spawn helpers for service_edge
-rw-r--r--components/service_edge/src/service_edge_rpc.erl129
-rw-r--r--test/rvi_core_SUITE.erl41
2 files changed, 116 insertions, 54 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 498c36d..42e15fb 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -51,7 +51,8 @@
-record(st, {
%% Component specification
- cs = #component_spec{}
+ cs = #component_spec{},
+ pending = []
}).
-define(SERVICE_TABLE, rvi_local_services).
@@ -451,7 +452,8 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
handle_call({ rvi, handle_local_message,
- [SvcName, TimeoutArg, Parameters | Tail] = Args }, _From, St) ->
+ [SvcName, TimeoutArg, Parameters | Tail] = Args }, From,
+ #st{pending = Pend} = St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
@@ -461,18 +463,12 @@ handle_call({ rvi, handle_local_message,
%% that will be accepted by the receiving node that will deliver
%% the messaage to its locally connected service_name service.
%%
- ?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
- case authorize_rpc:authorize_local_message(
- CS, SvcName, [{service_name, SvcName},
- {timeout, TimeoutArg},
- %% {parameters, Parameters},
- {parameters, Parameters}
- ]) of
- [ok] ->
- handle_local_message_(Args, CS, St);
- [not_found] ->
- {reply, [not_found], St}
- end;
+ {Pid, Ref} =
+ spawn_monitor(
+ fun() ->
+ exit({deferred_reply, handle_local_message_(Args, CS)})
+ end),
+ {noreply, St#st{pending = [{Pid, Ref, From}|Pend]}};
handle_call(Other, _From, St) ->
?warning("service_edge_rpc:handle_call(~p): unknown", [ Other ]),
@@ -502,8 +498,51 @@ handle_cast({rvi, handle_remote_message,
SvcName,
Timeout,
Parameters
- ] }, St) ->
+ ] }, #st{cs = CS} = St) ->
+ spawn(fun() ->
+ handle_remote_message_(
+ IP, Port, SvcName, Timeout, Parameters, CS)
+ end),
+ {noreply, St};
+
+handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) ->
+ %% FIXME: Should be forwarded to service.
+ ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
+ [SvcName, TransactionID]),
+
+ { noreply, St};
+
+handle_cast(Other, St) ->
+ ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]),
+ {noreply, St}.
+
+handle_info({'DOWN', Ref, _, _, {deferred_reply, Reply}},
+ #st{pending = Pend} = St) ->
+ case lists:keyfind(Ref, 2, Pend) of
+ {_Pid, Ref, From} = P ->
+ gen_server:reply(From, Reply),
+ {noreply, St#st{pending = Pend -- [P]}};
+ false ->
+ {noreply, St}
+ end;
+handle_info({'DOWN', Ref, _, _, Reason}, #st{pending = Pend} = St) ->
+ case lists:keyfind(Ref, 2, Pend) of
+ {Pid, _, From} = P ->
+ ?error("~p died: ~p", [Pid, Reason]),
+ gen_server:reply(From, [internal]),
+ {noreply, St#st{pending = Pend -- [P]}};
+ false ->
+ {noreply, St}
+ end;
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) ->
?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]),
?debug("service_edge:remote_msg(): remote_port: ~p", [Port]),
?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]),
@@ -515,7 +554,7 @@ handle_cast({rvi, handle_remote_message,
[ #service_entry { url = URL }] -> %% This is a local message
Parameters1 = Parameters,
case authorize_rpc:authorize_remote_message(
- St#st.cs,
+ CS,
SvcName,
[{remote_ip, IP},
{remote_port, Port},
@@ -523,43 +562,33 @@ handle_cast({rvi, handle_remote_message,
{timeout, Timeout},
{parameters, Parameters1}]) of
[ ok ] ->
- forward_message_to_local_service(URL, SvcName,
- Parameters, St#st.cs),
- { noreply, St};
-
+ forward_message_to_local_service(
+ URL, SvcName, Parameters, CS);
[ _Other ] ->
- ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)",
- [SvcName, _Other]),
- { noreply, St}
- end;
+ ?warning("service_entry:remote_msg(): "
+ "Failed to authenticate ~p (~p)",
+ [SvcName, _Other])
+ end;
[] ->
?notice("service_entry:remote_msg(): Service Disappeared ~p",
- [SvcName]),
- { noreply, St}
-
- end;
-
-
-handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) ->
- %% FIXME: Should be forwarded to service.
- ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
- [SvcName, TransactionID]),
-
- { noreply, St};
-
-handle_cast(Other, St) ->
- ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]),
- {noreply, St}.
-
-handle_info(_Info, St) ->
- {noreply, St}.
+ [SvcName])
+ end.
-terminate(_Reason, _St) ->
- ok.
-code_change(_OldVsn, St, _Extra) ->
- {ok, St}.
+handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) ->
+ ?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
+ case authorize_rpc:authorize_local_message(
+ CS, SvcName, [{service_name, SvcName},
+ {timeout, TimeoutArg},
+ %% {parameters, Parameters},
+ {parameters, Parameters}
+ ]) of
+ [ok] ->
+ do_handle_local_message_(Args, CS);
+ [not_found] ->
+ [not_found]
+ end.
-handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
+do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) ->
%%
%% Slick but ugly.
%% If the timeout is more than 24 hrs old when parsed as unix time,
@@ -592,7 +621,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
SvcName,
Parameters,
CS),
- { reply, Res , St};
+ Res;
_ -> %% SvcName is remote
%% Ask Schedule the request to resolve the network address
@@ -602,7 +631,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
SvcName,
Timeout,
Parameters),
- { reply, [ok, TID ], St}
+ [ok, TID ]
end.
json_rpc_notification(Method, Parameters) ->
diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl
index b907584..bd76d91 100644
--- a/test/rvi_core_SUITE.erl
+++ b/test/rvi_core_SUITE.erl
@@ -36,6 +36,7 @@
t_register_sota_service/1,
t_call_lock_service/1,
t_call_sota_service/1,
+ t_multicall_sota_service/1,
t_remote_call_lock_service/1,
t_check_rvi_log/1,
t_no_errors/1
@@ -84,6 +85,7 @@ groups() ->
t_register_sota_service,
t_call_lock_service,
t_call_sota_service,
+ t_multicall_sota_service,
t_remote_call_lock_service,
t_no_errors
]},
@@ -96,6 +98,7 @@ groups() ->
t_call_lock_service,
t_remote_call_lock_service,
t_call_sota_service,
+ t_multicall_sota_service,
t_check_rvi_log,
t_no_errors
]},
@@ -107,6 +110,7 @@ groups() ->
t_register_sota_service,
t_call_lock_service,
t_call_sota_service,
+ t_multicall_sota_service,
t_remote_call_lock_service,
t_no_errors
]},
@@ -118,6 +122,7 @@ groups() ->
t_register_sota_service,
t_call_lock_service,
t_call_sota_service,
+ t_multicall_sota_service,
t_remote_call_lock_service,
t_no_errors
]}
@@ -281,10 +286,38 @@ t_register_sota_service(_Config) ->
timer:sleep(2000).
t_call_sota_service(_Config) ->
+ call_sota_service_(sota_client, sota_bin()).
+
+t_multicall_sota_service(_Config) ->
+ Data = <<"abc">>,
+ Pids = [spawn_monitor(fun() ->
+ exit({ok, call_sota_service_(N, Data)})
+ end)
+ || N <- [client1,
+ client2,
+ client3,
+ client4,
+ client5]],
+ collect(Pids).
+
+collect([{Pid, Ref} | T]) ->
+ receive
+ {'DOWN', Ref, _, _, {ok, ok}} ->
+ collect(T);
+ {'DOWN', Ref, _, _, Reason} ->
+ [exit(P, kill) || {P,_} <- T],
+ error(Reason)
+ after 30000 ->
+ error(timeout)
+ end;
+collect([]) ->
+ ok.
+
+
+call_sota_service_(RegName, Data) ->
{Mega, Secs, _} = os:timestamp(),
Timeout = Mega * 1000000 + Secs + 60,
- register(sota_client, self()),
- Data = sota_bin(),
+ register(RegName, self()),
json_rpc_request(service_edge("backend"),
<<"message">>,
[
@@ -293,7 +326,7 @@ t_call_sota_service(_Config) ->
{<<"parameters">>,
[
{<<"data">>, Data},
- {<<"sendto">>, <<"sota_client">>},
+ {<<"sendto">>, atom_to_binary(RegName, latin1)},
{<<"rvi.max_msg_size">>, 100}
]}
]),
@@ -305,7 +338,7 @@ t_call_sota_service(_Config) ->
{message, Other} ->
ct:log("wrong message: ~p", [Other]),
error({unmatched, Other})
- after 10000 ->
+ after 30000 ->
error(timeout)
end.