summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-02-23 11:30:55 -0800
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-02-23 11:30:55 -0800
commit9c3c941b7453b3318d237788ab31875c8913d1e2 (patch)
tree6332e91471090379907a8cf3768295eb9f095c52
parente1f2ad1aea499f512993d28b7e459d058ebafa87 (diff)
downloadrvi_core-9c3c941b7453b3318d237788ab31875c8913d1e2.tar.gz
Renamed functions to better match what they do. Fixed crash reported by rudi where a websocket-based locally connected service would trigger a crash when another service\s availability was reported to it
-rw-r--r--components/service_edge/src/service_edge_rpc.erl103
1 files changed, 67 insertions, 36 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index d132d4a..036efd9 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -68,51 +68,67 @@ init_rvi_component() ->
end.
-register_service(Service, Address) ->
+register_service(Service, ServiceAddress) ->
?debug("service_edge_rpc:register_service(): service: ~p ", [Service]),
- ?debug("service_edge_rpc:register_service(): address: ~p ", [Address]),
+ ?debug("service_edge_rpc:register_service(): address: ~p ", [ServiceAddress]),
case
%% Register the service at service discovery
rvi_common:send_component_request(service_discovery, register_local_service,
[
{service, Service},
- {network_address, Address}
+ {network_address, ServiceAddress}
], [service]) of
{ ok, JSONStatus, [ FullSvcName ]} ->
- %% Announce the new service
+ %% Announce the new service to all connected nodes.
rvi_common:send_component_request(data_link, announce_new_local_service,
[
%% Convert /some/svc to jlr.com/some/svc
{service, rvi_common:local_service_to_string(Service)}
], [service]),
- { ok, [ {service, FullSvcName},
- {status, rvi_common:json_rpc_status(JSONStatus)} ] };
+ %% Retrieve addresses of all locally registered services.
+ {ok, _JSONStatus, [ LocalAddresses ]} =
+ rvi_common:send_component_request(service_discovery, get_local_network_addresses,
+ [], [addresses]),
+ %% Send out an announcement to all locally connected services, but skip
+ %% the one that made the registration call
+ announce_service_availability(services_available, LocalAddresses, [Service], ServiceAddress),
+
+ %% Return ok.
+ { ok, [ {service, FullSvcName},
+ {status, rvi_common:json_rpc_status(JSONStatus)} ] };
+
Err ->
?debug("service_edge_rpc:register_service() Failed at service_discovery(): ~p",
[ Err ]),
Err
end.
-manage_remote_service(Cmd, LocalServiceAddress, Services) ->
- ?info("service_edge_rpc:manage_remote_service(~p, ~p, ~p): Called.",
- [ Cmd, LocalServiceAddress, Services ]),
-
- dispatch_to_local_service(LocalServiceAddress, Cmd,
- [ { services, Services }]),
- ok.
-%% Register the services listed in Services wit all
+
+announce_service_availability(Cmd, LocalServiceAddresses, Services) ->
+ announce_service_availability(Cmd, LocalServiceAddresses, Services, undefined).
+
+
+%% Announces the services listed in Services wit all
%% local services listed under LocalServices
-manage_remote_services(Cmd, LocalServiceAddresses, Services) ->
- ?info("service_edge_rpc:manage_remote_services(~p, ~p, ~p): Called.",
+%% SkipAddress is a single address that, if found in LocalServiceAddresses,
+%% will not receive an announcement.
+%% This is to avoid that a local service registering itself will get a callback
+%% about its own availability.
+announce_service_availability(Cmd, LocalServiceAddresses, Services, SkipAddress) ->
+ ?info("service_edge_rpc:announce_service_availability(~p, ~p, ~p): Called.",
[ Cmd, LocalServiceAddresses, Services ]),
- lists:map(fun(LocalServiceAddress) ->
- manage_remote_service(Cmd, LocalServiceAddress, Services)
+ lists:map(fun(LocalServiceAddress) when LocalServiceAddress =:= SkipAddress ->
+ ok;
+
+ (LocalServiceAddress) ->
+ dispatch_to_local_service(LocalServiceAddress, Cmd,
+ [ { services, Services }])
end, LocalServiceAddresses),
{ ok, [ { status, rvi_common:json_rpc_status(ok)} ] }.
@@ -235,24 +251,44 @@ flatten_ws_args(Args) ->
flatten_ws_args(Args, []).
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], Command,
+
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
+ [{ services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_available): Websocket!: ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_available",
+ [ "services", Services ]),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
+ [{ services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_unavailable): Websocket!: ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_unavailable",
+ [ "services", Services ]),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
[{ service_name, SvcName}, { parameters, Args}] ) ->
- ?info("service_edge:dispatch_to_local_service(): Websocket!: ~p, ~p", [ Command, Args]),
- %% wse:call(list_to_pid(WSPidStr), wse:window(),
- %% Command, flatten_ws_args(Args)),
+ ?info("service_edge:dispatch_to_local_service(message): Websocket!:~p", [Args]),
wse:call(list_to_pid(WSPidStr), wse:window(),
- Command,
+ "message",
[ "service_name", SvcName ] ++ flatten_ws_args(Args)),
ok;
%% Dispatch to regular JSON-RPC over HTTP.
dispatch_to_local_service(NetworkAddress, Command, Args) ->
- Res = rvi_common:send_http_request(NetworkAddress, Command, Args),
- ?debug("dispatch_to_local_service(): Command: ~p",[ Command]),
+ CmdStr = atom_to_list(Command),
+ Res = rvi_common:send_http_request(NetworkAddress, CmdStr, Args),
+ ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]),
?debug("dispatch_to_local_service(): Args: ~p",[ Args]),
?debug("dispatch_to_local_service(): Result: ~p",[ Res]).
+%% Forward a message to a specific locally connected service.
+%% Called by forward_message_to_local_service/2.
+%%
forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) ->
?debug("service_edge:forward_to_local(): URL: ~p", [NetworkAddress]),
?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]),
@@ -270,7 +306,7 @@ forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) ->
%% be either a wse websocket, or a regular HTTP JSON-RPC call
case rvi_common:get_request_result(
dispatch_to_local_service(NetworkAddress,
- "message",
+ message,
[ { service_name, SvcName },
{ parameters, Parameters }])) of
@@ -296,6 +332,8 @@ forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) ->
Err
end.
+%% A message is to targeting a service that is connected to the local RVI
+%% node. We can just bounce the messsage straight over to the target service.
forward_message_to_local_service(ServiceName, Parameters) ->
%%
%% Resolve the local service name to an URL that we can send the
@@ -358,13 +396,7 @@ handle_rpc("register_service", Args) ->
handle_rpc("register_remote_services", Args) ->
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args),
- manage_remote_services("services_available", LocalServiceAddresses, Services),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ] };
-
-handle_rpc("unregister_remote_services", Args) ->
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args),
- manage_remote_services("services_unavailable", LocalServiceAddresses, Services),
+ announce_service_availability(services_available, LocalServiceAddresses, Services),
{ ok, [ { status, rvi_common:json_rpc_status(ok)} ] };
handle_rpc("message", Args) ->
@@ -393,7 +425,6 @@ wse_register_service(Ws, Service ) ->
?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]),
register_service(Service, "ws:" ++ pid_to_list(Ws)).
-
wse_message(Ws, ServiceName, Timeout, JSONParameters, CallingService) ->
%% Parameters are delivered as JSON. Decode into tuple
{ ok, Parameters } = exo_json:decode_string(JSONParameters),
@@ -414,13 +445,13 @@ wse_message(Ws, ServiceName, Timeout, JSONParameters, CallingService) ->
handle_call({rvi_call, register_remote_services, Args}, _From, State) ->
{_, Services} = lists:keyfind(services, 1, Args),
{_, LocalServiceAddresses} = lists:keyfind(local_service_addresses, 1, Args),
- manage_remote_services("services_available", LocalServiceAddresses, Services),
+ announce_service_availability(services_available, LocalServiceAddresses, Services),
{ reply, { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}, State };
handle_call({rvi_call, unregister_remote_services, Args}, _From, State) ->
{_, Services} = lists:keyfind(services, 1, Args),
{_, LocalServiceAddresses} = lists:keyfind(local_service_addresses, 1, Args),
- manage_remote_services("services_unavailable", LocalServiceAddresses, Services),
+ announce_service_availability(services_unavailable, LocalServiceAddresses, Services),
{ reply, {ok, [ { status, rvi_common:json_rpc_status(ok)} ] }, State };