diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-02-23 11:30:55 -0800 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-02-23 11:30:55 -0800 |
commit | 9c3c941b7453b3318d237788ab31875c8913d1e2 (patch) | |
tree | 6332e91471090379907a8cf3768295eb9f095c52 | |
parent | e1f2ad1aea499f512993d28b7e459d058ebafa87 (diff) | |
download | rvi_core-9c3c941b7453b3318d237788ab31875c8913d1e2.tar.gz |
Renamed functions to better match what they do. Fixed crash reported by rudi where a websocket-based locally connected service would trigger a crash when another service\s availability was reported to it
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 103 |
1 files changed, 67 insertions, 36 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index d132d4a..036efd9 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -68,51 +68,67 @@ init_rvi_component() -> end. -register_service(Service, Address) -> +register_service(Service, ServiceAddress) -> ?debug("service_edge_rpc:register_service(): service: ~p ", [Service]), - ?debug("service_edge_rpc:register_service(): address: ~p ", [Address]), + ?debug("service_edge_rpc:register_service(): address: ~p ", [ServiceAddress]), case %% Register the service at service discovery rvi_common:send_component_request(service_discovery, register_local_service, [ {service, Service}, - {network_address, Address} + {network_address, ServiceAddress} ], [service]) of { ok, JSONStatus, [ FullSvcName ]} -> - %% Announce the new service + %% Announce the new service to all connected nodes. rvi_common:send_component_request(data_link, announce_new_local_service, [ %% Convert /some/svc to jlr.com/some/svc {service, rvi_common:local_service_to_string(Service)} ], [service]), - { ok, [ {service, FullSvcName}, - {status, rvi_common:json_rpc_status(JSONStatus)} ] }; + %% Retrieve addresses of all locally registered services. + {ok, _JSONStatus, [ LocalAddresses ]} = + rvi_common:send_component_request(service_discovery, get_local_network_addresses, + [], [addresses]), + %% Send out an announcement to all locally connected services, but skip + %% the one that made the registration call + announce_service_availability(services_available, LocalAddresses, [Service], ServiceAddress), + + %% Return ok. + { ok, [ {service, FullSvcName}, + {status, rvi_common:json_rpc_status(JSONStatus)} ] }; + Err -> ?debug("service_edge_rpc:register_service() Failed at service_discovery(): ~p", [ Err ]), Err end. -manage_remote_service(Cmd, LocalServiceAddress, Services) -> - ?info("service_edge_rpc:manage_remote_service(~p, ~p, ~p): Called.", - [ Cmd, LocalServiceAddress, Services ]), - - dispatch_to_local_service(LocalServiceAddress, Cmd, - [ { services, Services }]), - ok. -%% Register the services listed in Services wit all + +announce_service_availability(Cmd, LocalServiceAddresses, Services) -> + announce_service_availability(Cmd, LocalServiceAddresses, Services, undefined). + + +%% Announces the services listed in Services wit all %% local services listed under LocalServices -manage_remote_services(Cmd, LocalServiceAddresses, Services) -> - ?info("service_edge_rpc:manage_remote_services(~p, ~p, ~p): Called.", +%% SkipAddress is a single address that, if found in LocalServiceAddresses, +%% will not receive an announcement. +%% This is to avoid that a local service registering itself will get a callback +%% about its own availability. +announce_service_availability(Cmd, LocalServiceAddresses, Services, SkipAddress) -> + ?info("service_edge_rpc:announce_service_availability(~p, ~p, ~p): Called.", [ Cmd, LocalServiceAddresses, Services ]), - lists:map(fun(LocalServiceAddress) -> - manage_remote_service(Cmd, LocalServiceAddress, Services) + lists:map(fun(LocalServiceAddress) when LocalServiceAddress =:= SkipAddress -> + ok; + + (LocalServiceAddress) -> + dispatch_to_local_service(LocalServiceAddress, Cmd, + [ { services, Services }]) end, LocalServiceAddresses), { ok, [ { status, rvi_common:json_rpc_status(ok)} ] }. @@ -235,24 +251,44 @@ flatten_ws_args(Args) -> flatten_ws_args(Args, []). -dispatch_to_local_service([ $w, $s, $: | WSPidStr], Command, + + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, + [{ services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_available): Websocket!: ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_available", + [ "services", Services ]), + ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, + [{ services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_unavailable): Websocket!: ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_unavailable", + [ "services", Services ]), + ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, [{ service_name, SvcName}, { parameters, Args}] ) -> - ?info("service_edge:dispatch_to_local_service(): Websocket!: ~p, ~p", [ Command, Args]), - %% wse:call(list_to_pid(WSPidStr), wse:window(), - %% Command, flatten_ws_args(Args)), + ?info("service_edge:dispatch_to_local_service(message): Websocket!:~p", [Args]), wse:call(list_to_pid(WSPidStr), wse:window(), - Command, + "message", [ "service_name", SvcName ] ++ flatten_ws_args(Args)), ok; %% Dispatch to regular JSON-RPC over HTTP. dispatch_to_local_service(NetworkAddress, Command, Args) -> - Res = rvi_common:send_http_request(NetworkAddress, Command, Args), - ?debug("dispatch_to_local_service(): Command: ~p",[ Command]), + CmdStr = atom_to_list(Command), + Res = rvi_common:send_http_request(NetworkAddress, CmdStr, Args), + ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]), ?debug("dispatch_to_local_service(): Args: ~p",[ Args]), ?debug("dispatch_to_local_service(): Result: ~p",[ Res]). +%% Forward a message to a specific locally connected service. +%% Called by forward_message_to_local_service/2. +%% forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) -> ?debug("service_edge:forward_to_local(): URL: ~p", [NetworkAddress]), ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), @@ -270,7 +306,7 @@ forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) -> %% be either a wse websocket, or a regular HTTP JSON-RPC call case rvi_common:get_request_result( dispatch_to_local_service(NetworkAddress, - "message", + message, [ { service_name, SvcName }, { parameters, Parameters }])) of @@ -296,6 +332,8 @@ forward_message_to_local_service(ServiceName, NetworkAddress, Parameters) -> Err end. +%% A message is to targeting a service that is connected to the local RVI +%% node. We can just bounce the messsage straight over to the target service. forward_message_to_local_service(ServiceName, Parameters) -> %% %% Resolve the local service name to an URL that we can send the @@ -358,13 +396,7 @@ handle_rpc("register_service", Args) -> handle_rpc("register_remote_services", Args) -> {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args), - manage_remote_services("services_available", LocalServiceAddresses, Services), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ] }; - -handle_rpc("unregister_remote_services", Args) -> - {ok, Services} = rvi_common:get_json_element(["services"], Args), - {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args), - manage_remote_services("services_unavailable", LocalServiceAddresses, Services), + announce_service_availability(services_available, LocalServiceAddresses, Services), { ok, [ { status, rvi_common:json_rpc_status(ok)} ] }; handle_rpc("message", Args) -> @@ -393,7 +425,6 @@ wse_register_service(Ws, Service ) -> ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]), register_service(Service, "ws:" ++ pid_to_list(Ws)). - wse_message(Ws, ServiceName, Timeout, JSONParameters, CallingService) -> %% Parameters are delivered as JSON. Decode into tuple { ok, Parameters } = exo_json:decode_string(JSONParameters), @@ -414,13 +445,13 @@ wse_message(Ws, ServiceName, Timeout, JSONParameters, CallingService) -> handle_call({rvi_call, register_remote_services, Args}, _From, State) -> {_, Services} = lists:keyfind(services, 1, Args), {_, LocalServiceAddresses} = lists:keyfind(local_service_addresses, 1, Args), - manage_remote_services("services_available", LocalServiceAddresses, Services), + announce_service_availability(services_available, LocalServiceAddresses, Services), { reply, { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}, State }; handle_call({rvi_call, unregister_remote_services, Args}, _From, State) -> {_, Services} = lists:keyfind(services, 1, Args), {_, LocalServiceAddresses} = lists:keyfind(local_service_addresses, 1, Args), - manage_remote_services("services_unavailable", LocalServiceAddresses, Services), + announce_service_availability(services_unavailable, LocalServiceAddresses, Services), { reply, {ok, [ { status, rvi_common:json_rpc_status(ok)} ] }, State }; |