summaryrefslogtreecommitdiff
path: root/components/service_edge
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-13 15:26:24 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-13 15:26:24 -0800
commit1841256773d9ddee800c5eeacec69025019ccf05 (patch)
treea7a064cf45194febc2e5dbe383c1b673199822e4 /components/service_edge
parent8d5217fe0fbd2d1555bb7a7a96187029f19e1794 (diff)
downloadrvi_core-1841256773d9ddee800c5eeacec69025019ccf05.tar.gz
spawn helpers for service_edge
Diffstat (limited to 'components/service_edge')
-rw-r--r--components/service_edge/src/service_edge_rpc.erl129
1 files changed, 79 insertions, 50 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 498c36d..42e15fb 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -51,7 +51,8 @@
-record(st, {
%% Component specification
- cs = #component_spec{}
+ cs = #component_spec{},
+ pending = []
}).
-define(SERVICE_TABLE, rvi_local_services).
@@ -451,7 +452,8 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
handle_call({ rvi, handle_local_message,
- [SvcName, TimeoutArg, Parameters | Tail] = Args }, _From, St) ->
+ [SvcName, TimeoutArg, Parameters | Tail] = Args }, From,
+ #st{pending = Pend} = St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
@@ -461,18 +463,12 @@ handle_call({ rvi, handle_local_message,
%% that will be accepted by the receiving node that will deliver
%% the messaage to its locally connected service_name service.
%%
- ?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
- case authorize_rpc:authorize_local_message(
- CS, SvcName, [{service_name, SvcName},
- {timeout, TimeoutArg},
- %% {parameters, Parameters},
- {parameters, Parameters}
- ]) of
- [ok] ->
- handle_local_message_(Args, CS, St);
- [not_found] ->
- {reply, [not_found], St}
- end;
+ {Pid, Ref} =
+ spawn_monitor(
+ fun() ->
+ exit({deferred_reply, handle_local_message_(Args, CS)})
+ end),
+ {noreply, St#st{pending = [{Pid, Ref, From}|Pend]}};
handle_call(Other, _From, St) ->
?warning("service_edge_rpc:handle_call(~p): unknown", [ Other ]),
@@ -502,8 +498,51 @@ handle_cast({rvi, handle_remote_message,
SvcName,
Timeout,
Parameters
- ] }, St) ->
+ ] }, #st{cs = CS} = St) ->
+ spawn(fun() ->
+ handle_remote_message_(
+ IP, Port, SvcName, Timeout, Parameters, CS)
+ end),
+ {noreply, St};
+
+handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) ->
+ %% FIXME: Should be forwarded to service.
+ ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
+ [SvcName, TransactionID]),
+
+ { noreply, St};
+
+handle_cast(Other, St) ->
+ ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]),
+ {noreply, St}.
+
+handle_info({'DOWN', Ref, _, _, {deferred_reply, Reply}},
+ #st{pending = Pend} = St) ->
+ case lists:keyfind(Ref, 2, Pend) of
+ {_Pid, Ref, From} = P ->
+ gen_server:reply(From, Reply),
+ {noreply, St#st{pending = Pend -- [P]}};
+ false ->
+ {noreply, St}
+ end;
+handle_info({'DOWN', Ref, _, _, Reason}, #st{pending = Pend} = St) ->
+ case lists:keyfind(Ref, 2, Pend) of
+ {Pid, _, From} = P ->
+ ?error("~p died: ~p", [Pid, Reason]),
+ gen_server:reply(From, [internal]),
+ {noreply, St#st{pending = Pend -- [P]}};
+ false ->
+ {noreply, St}
+ end;
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+handle_remote_message_(IP, Port, SvcName, Timeout, Parameters, CS) ->
?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]),
?debug("service_edge:remote_msg(): remote_port: ~p", [Port]),
?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]),
@@ -515,7 +554,7 @@ handle_cast({rvi, handle_remote_message,
[ #service_entry { url = URL }] -> %% This is a local message
Parameters1 = Parameters,
case authorize_rpc:authorize_remote_message(
- St#st.cs,
+ CS,
SvcName,
[{remote_ip, IP},
{remote_port, Port},
@@ -523,43 +562,33 @@ handle_cast({rvi, handle_remote_message,
{timeout, Timeout},
{parameters, Parameters1}]) of
[ ok ] ->
- forward_message_to_local_service(URL, SvcName,
- Parameters, St#st.cs),
- { noreply, St};
-
+ forward_message_to_local_service(
+ URL, SvcName, Parameters, CS);
[ _Other ] ->
- ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)",
- [SvcName, _Other]),
- { noreply, St}
- end;
+ ?warning("service_entry:remote_msg(): "
+ "Failed to authenticate ~p (~p)",
+ [SvcName, _Other])
+ end;
[] ->
?notice("service_entry:remote_msg(): Service Disappeared ~p",
- [SvcName]),
- { noreply, St}
-
- end;
-
-
-handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) ->
- %% FIXME: Should be forwarded to service.
- ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
- [SvcName, TransactionID]),
-
- { noreply, St};
-
-handle_cast(Other, St) ->
- ?warning("service_edge_rpc:handle_cast(~p): unknown", [ Other ]),
- {noreply, St}.
-
-handle_info(_Info, St) ->
- {noreply, St}.
+ [SvcName])
+ end.
-terminate(_Reason, _St) ->
- ok.
-code_change(_OldVsn, St, _Extra) ->
- {ok, St}.
+handle_local_message_([SvcName, TimeoutArg, Parameters | _] = Args, CS) ->
+ ?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
+ case authorize_rpc:authorize_local_message(
+ CS, SvcName, [{service_name, SvcName},
+ {timeout, TimeoutArg},
+ %% {parameters, Parameters},
+ {parameters, Parameters}
+ ]) of
+ [ok] ->
+ do_handle_local_message_(Args, CS);
+ [not_found] ->
+ [not_found]
+ end.
-handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
+do_handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS) ->
%%
%% Slick but ugly.
%% If the timeout is more than 24 hrs old when parsed as unix time,
@@ -592,7 +621,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
SvcName,
Parameters,
CS),
- { reply, Res , St};
+ Res;
_ -> %% SvcName is remote
%% Ask Schedule the request to resolve the network address
@@ -602,7 +631,7 @@ handle_local_message_([SvcName, TimeoutArg, Parameters | _Tail], CS, St) ->
SvcName,
Timeout,
Parameters),
- { reply, [ok, TID ], St}
+ [ok, TID ]
end.
json_rpc_notification(Method, Parameters) ->