diff options
23 files changed, 1128 insertions, 1661 deletions
diff --git a/components/dlink_tcp/priv/setup.config b/components/dlink_tcp/priv/setup.config deleted file mode 100644 index 9fe04fc..0000000 --- a/components/dlink_tcp/priv/setup.config +++ /dev/null @@ -1,42 +0,0 @@ -%% -*- erlang -*- -[ - %% Put include first, making it possible to override any defaults below - {include_lib, "exoport/priv/setup.config"}, - %% - %% Add our own app(s) - {add_apps, [asn1, - ssl, - { rvi_common, load}, - data_link_device]}, - %% - %% Custom environment settings - {env, - [ - {setup, [{data_dir, "db"}]}, - %% Tell exoport where to find our config file - {exoport, - [ - {config, filename:join(CWD, "exoport.config")}, - {access, - [{redirect, [{data_link, data_link_device_rpc}]}, - {accept, data_link_device_rpc} - ]}, - {kvdb_databases, - [{kvdb_conf, - [{file,"$DATA_DIR/kvdb_conf.db"}, - {backend,ets}, - {log_dir, "$DATA_DIR/kvdb_conf.log"}, - {log_threshold, [{writes, 1000}]}, - {save_mode, [on_switch, on_close]}, - {tables,[data]}, - {encoding,{raw,term,term}}, - {schema,kvdb_schema_events}]} - ]} - ]} - %% %% We run with a logging ETS backend in the database - %% {kvdb, - %% [ - %% ]} - %% ]} - ]} -]. diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 3cb27a5..9ff68f0 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -22,9 +22,12 @@ -export([start_json_server/0]). -export([start_connection_manager/0]). --export([announce_available_local_service/2, - announce_unavailable_local_service/2, - setup_data_link/2, +%% Invoked by service discovery +%% FIXME: Should be rvi_service_discovery behavior +-export([service_available/4, + service_unavailable/4]). + +-export([setup_data_link/3, disconnect_data_link/2, send_data/3]). @@ -32,11 +35,21 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). +-define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BERT_RPC_PORT, 9999). -define(DEFAULT_RECONNECT_INTERVAL, 5000). -define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). + +-define(CONNECTION_TABLE, rvi_dlink_tcp_connections). +-define(SERVICE_TABLE, rvi_dlink_tcp_services). + +-record(service_entry, { + service = [], %% Name of service + connection = undefined %% PID of connection that can reach this service + }). + -record(st, { cs = #component_spec{} }). @@ -46,9 +59,15 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> - ?info("data_link_bert:init(): Called"), + ?info("dlink_tcp:init(): Called"), %% Dig out the bert rpc server setup + ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table, + { keypos, #service_entry.service }]), + + ets:new(?CONNECTION_TABLE, [ duplicate_bag, public, named_table, + { keypos, #service_entry.connection }]), + {ok, #st { cs = rvi_common:get_component_specification() } @@ -68,12 +87,12 @@ start_connection_manager() -> IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS), Port = proplists:get_value(port, BertOpts, ?DEFAULT_BERT_RPC_PORT), - ?info("data_link_bert:init_rvi_component(~p): Starting listener.", [self()]), + ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener connection_manager:start_link(), {ok,Pid} = listener:start_link(), - ?info("data_link_bert:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), + ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), %% Add listener port. case listener:add_listener(Pid, IP, Port, CompSpec) of @@ -82,39 +101,54 @@ start_connection_manager() -> [ application:get_env(rvi, node_address, undefined)]); Err -> - ?error("data_link_bert:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), + ?error("dlink_tcp:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), ok end, - ?info("data_link_bert:init_rvi_component(): Setting up static nodes."), - setup_static_node_data_links_(rvi_common:static_nodes(), CompSpec), + ?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."), + + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + ?PERSISTENT_CONNECTIONS, + [], + CompSpec), + + + setup_persistent_connections_(PersistentConnections, CompSpec), ok. -setup_static_node_data_links_([ ], _CompSpec) -> - ok; +setup_persistent_connections_([ ], _CompSpec) -> + ok; -setup_static_node_data_links_([ { Prefix, NetworkAddress} | T], CompSpec) -> - ?debug("~p: Will connect static node ~p -> ~p", [self(), Prefix, NetworkAddress]), +setup_persistent_connections_([ NetworkAddress | T], CompSpec) -> + ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]), [ IP, Port] = string:tokens(NetworkAddress, ":"), - connect_and_retry_remote(Prefix, IP, Port, CompSpec), - setup_static_node_data_links_(T, CompSpec), + connect_and_retry_remote(IP, Port, CompSpec), + setup_persistent_connections_(T, CompSpec), ok. -%% Behavior implementation -announce_available_local_service(CompSpec, Service) -> - rvi_common:notification(data_link, ?MODULE, announce_available_local_service, - [ {service, Service }], CompSpec). +service_available(CompSpec, SvcName, DataLinkModule, Address) -> + rvi_common:notification(data_link, ?MODULE, + service_available, + [{ service, SvcName }, + { data_link_module, DataLinkModule }, + { address, Address }], + CompSpec). +service_unavailable(CompSpec, SvcName, DataLinkModule, Address) -> + rvi_common:notification(data_link, ?MODULE, + service_unavailable, + [{ service, SvcName }, + { data_link_module, DataLinkModule }, + { address, Address }], + CompSpec). -announce_unavailable_local_service(CompSpec, Service) -> - rvi_common:notification(data_link, ?MODULE, announce_unavailable_local_service, - [ {service, Service }], CompSpec). - -setup_data_link(CompSpec, NetworkAddress) -> +setup_data_link(CompSpec, Service, Opts) -> rvi_common:request(data_link, ?MODULE, setup_data_link, - [ { network_address, NetworkAddress }], - [status], CompSpec). + [ { service, Service }, + { opts, Opts }], + [status, timeout], CompSpec). disconnect_data_link(CompSpec, NetworkAddress) -> rvi_common:request(data_link, ?MODULE, disconnect_data_link, @@ -122,12 +156,13 @@ disconnect_data_link(CompSpec, NetworkAddress) -> [status], CompSpec). -send_data(CompSpec, NetworkAddress, Data) -> +send_data(CompSpec, Service, Data) -> rvi_common:request(data_link, ?MODULE, send_data, - [ { network_address, NetworkAddress }, + [ { service, Service }, { data, Data } ], [status], CompSpec). + %% End of behavior %% @@ -140,12 +175,12 @@ connect_remote(IP, Port, CompSpec) -> not_found -> %% Setup a new outbound connection - ?info("data_link_bert:connect_remote(): Connecting ~p:~p", + ?info("dlink_tcp:connect_remote(): Connecting ~p:~p", [IP, Port]), case gen_tcp:connect(IP, Port, [binary, {packet, 4}]) of { ok, Sock } -> - ?info("data_link_bert:connect_remote(): Connected ~p:~p", + ?info("dlink_tcp:connect_remote(): Connected ~p:~p", [IP, Port]), %% Setup a genserver around the new connection. @@ -161,36 +196,35 @@ connect_remote(IP, Port, CompSpec) -> ok; {error, Err } -> - ?info("data_link_bert:connect_remote(): Failed ~p:~p: ~p", + ?info("dlink_tcp:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), not_available end end. -connect_and_retry_remote(Prefix, IP, Port, CompSpec) -> - ?info("data_link_bert:setup_static(): Connecting ~p -> ~p:~p", - [Prefix, IP, Port]), +connect_and_retry_remote( IP, Port, CompSpec) -> + ?info("dlink_tcp:connect_and_retry_remote(): ~p:~p", + [ IP, Port]), case connect_remote(IP, list_to_integer(Port), CompSpec) of ok -> ok; Err -> %% Failed to connect. Sleep and try again - ?notice("data_link_bert:setup_static_node_data_link(~p:~p): Failed: ~p", + ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Failed: ~p", [IP, Port, Err]), - ?notice("data_link_bert:setup_static_node_data_link(~p:~p): Will try again in ~p sec", + ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Will try again in ~p sec", [IP, Port, ?DEFAULT_RECONNECT_INTERVAL]), - setup_static_node_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, - Prefix, IP, Port, CompSpec), + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec), not_available end. announce_local_service_(CompSpec, Service, Availability) -> - ?debug("data_link_bert:announce_local_service(~p): Service: ~p", [Availability, Service]), + ?debug("dlink_tcp:announce_local_service(~p): Service: ~p", [Availability, Service]), %% Grab our local address. { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), @@ -205,7 +239,7 @@ announce_local_service_(CompSpec, Service, Availability) -> %% Loop over all returned addresses lists:map( fun(Address) -> - ?info("data_link_bert:announce_local_service(~p): Announcing ~p to ~p", + ?info("dlink_tcp:announce_local_service(~p): Announcing ~p to ~p", [ Availability, Service, Address]), %% Split the address into host and port @@ -216,13 +250,13 @@ announce_local_service_(CompSpec, Service, Availability) -> Res = connection:send(RemoteAddress, list_to_integer(RemotePort), {service_announce, 3, Availability, [Service], { signature, {}}}), - ?debug("data_link_bert:announce_local_service(~p): Res ~p", + ?debug("dlink_tcp:announce_local_service(~p): Res ~p", [ Availability, Res]) end, Addresses), ok. handle_socket(_FromPid, PeerIP, PeerPort, data, ping, [_CompSpec]) -> - ?info("data_link_bert:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]), + ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]), ok; handle_socket(FromPid, PeerIP, PeerPort, data, @@ -234,12 +268,12 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Certificate, Signature}, [CompSpec]) -> - ?info("data_link_bert:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), - ?info("data_link_bert:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), - ?info("data_link_bert:authorize(): Protocol: ~p", [ Protocol ]), - ?debug("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]), - ?debug("data_link_bert:authorize(): Signature: ~p", [ Signature ]), + ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), + ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), + ?info("dlink_tcp:authorize(): Protocol: ~p", [ Protocol ]), + ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), + ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]), + ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]), { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), @@ -252,7 +286,7 @@ handle_socket(FromPid, PeerIP, PeerPort, data, case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> - ?info("data_link_bert:authorize(): Remote is behind firewall. Will use ~p:~p", + ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", [ PeerIP, PeerPort]), { PeerIP, PeerPort }; @@ -267,14 +301,14 @@ handle_socket(FromPid, PeerIP, PeerPort, data, %% FIXME: Validate certificate and signature before continuing. case connection_manager:find_connection_by_pid(FromPid) of not_found -> - ?info("data_link_bert:authorize(): New connection!"), + ?info("dlink_tcp:authorize(): New connection!"), connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid), - ?debug("data_link_bert:authorize(): Sending authorize."), + ?debug("dlink_tcp:authorize(): Sending authorize."), Res = connection:send(FromPid, { authorize, 1, LocalAddress, LocalPort, rvi_binary, {certificate, {}}, { signature, {}}}), - ?debug("data_link_bert:authorize(): Sending authorize: ~p", [ Res]), + ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]), ok; _ -> ok end, @@ -299,7 +333,7 @@ handle_socket(FromPid, PeerIP, PeerPort, data, { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), %% Send an authorize back to the remote node - ?info("data_link_bert:authorize(): Announcing local services: ~p to remote ~p:~p", + ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", [LocalServices, NRemoteAddress, NRemotePort]), connection:send(FromPid, @@ -310,145 +344,216 @@ handle_socket(FromPid, PeerIP, PeerPort, data, gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }), ok; -handle_socket(_FromPid, RemoteIP, RemotePort, data, +handle_socket(FromPid, RemoteIP, RemotePort, data, { service_announce, - TransactionID, + TransactionID, available, - Services, - Signature}, [CompSpec]) -> - ?debug("data_link_bert:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]), - ?debug("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]), - ?debug("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]), - ?debug("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]), - ?debug("data_link_bert:service_announce(available): Service: ~p", [ Services ]), - + Services, + Signature }, [CompSpec]) -> + ?debug("dlink_tcp:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]), + ?debug("dlink_tcp:service_announce(available): Remote Port: ~p", [ RemotePort ]), + ?debug("dlink_tcp:service_announce(available): TransactionID: ~p", [ TransactionID ]), + ?debug("dlink_tcp:service_announce(available): Signature: ~p", [ Signature ]), + ?debug("dlink_tcp:service_announce(available): Service: ~p", [ Services ]), - %% Register the received services with all relevant components - NetworkAddress = RemoteIP ++ ":" ++ integer_to_list(RemotePort), - service_discovery_rpc:register_remote_services(CompSpec, Services, NetworkAddress), + %% Insert into our own tables + [ ets:insert(?SERVICE_TABLE, + #service_entry { + service = SvcName, + connection = FromPid }) || SvcName <- Services ], + + [ ets:insert(?CONNECTION_TABLE, + #service_entry { + service = SvcName, + connection = FromPid }) || SvcName <- Services ], + + service_discovery_rpc:register_services(CompSpec, Services, ?MODULE), ok; -handle_socket(_FromPid, RemoteIP, RemotePort, data, +handle_socket(FromPid, RemoteIP, RemotePort, data, { service_announce, TransactionID, unavailable, Services, Signature}, [CompSpec]) -> - ?debug("data_link_bert:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]), - ?debug("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]), - ?debug("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]), - ?debug("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]), - ?debug("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]), + ?debug("dlink_tcp:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]), + ?debug("dlink_tcp:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]), + ?debug("dlink_tcp:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]), + ?debug("dlink_tcp:service_announce(unavailable): Signature: ~p", [ Signature ]), + ?debug("dlink_tcp:service_announce(unavailable): Service: ~p", [ Services ]), %% Register the received services with all relevant components - service_discovery_rpc:unregister_remote_services_by_name(CompSpec, Services), + + %% Delete from our own tables. + + [ ets:delete(?SERVICE_TABLE, SvcName ) || SvcName <- Services ], + + [ ets:match_delete(?CONNECTION_TABLE, + #service_entry { + service = SvcName, + connection = FromPid }) || SvcName <- Services ], + + service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE), ok; handle_socket(_FromPid, SetupIP, SetupPort, data, { receive_data, Data}, [CompSpec]) -> -%% ?info("data_link_bert:receive_data(): ~p", [ Data ]), - ?debug("data_link_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), +%% ?info("dlink_tcp:receive_data(): ~p", [ Data ]), + ?debug("dlink_tcp:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), protocol_rpc:receive_message(CompSpec, Data), ok; handle_socket(_FromPid, SetupIP, SetupPort, data, Data, [_CompSpec]) -> - ?warning("data_link_bert:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), - ?warning("data_link_bert:unknown_data(): Unknown data: ~p", [ Data]), + ?warning("dlink_tcp:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?warning("dlink_tcp:unknown_data(): Unknown data: ~p", [ Data]), ok. + %% We lost the socket connection. %% Unregister all services that were routed to the remote end that just died. -handle_socket(_FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> - ?info("data_link_bert:socket_closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), +handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> + ?info("dlink_tcp:socket_closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), - service_discovery_rpc: - unregister_remote_services_by_address(CompSpec, NetworkAddress), - %% Check if this is a static node. If so, setup a timer for a reconnect - case lists:keyfind(NetworkAddress, 2, rvi_common:static_nodes()) of - false -> - true; + %% Get all service records associated with the given connection and + %% extract the service name from them + % - { StaticPrefix, StaticNetworkAddress } -> - ?info("data_link_bert:socket_closed(): Reconnect service: ~p", [ StaticPrefix ]), - ?info("data_link_bert:socket_closed(): Reconnect address: ~p", [ StaticNetworkAddress ]), - ?info("data_link_bert:socket_closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]), - [ IP, Port] = string:tokens(StaticNetworkAddress, ":"), - setup_static_node_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, - StaticPrefix, - IP, Port, CompSpec) - + Services = [ SvcName || #service_entry { service = SvcName } <- + ets:lookup(?CONNECTION_TABLE, FromPid) ], + ?debug("dlink_tcp:close(): Deleting ~p", [ Services]), + + %% Step through all services and delete their corresponding record + %% from the service table. + %% We do this instead of match_delete because it is much faster since + %% service is the key in ?SERVICE_TABLE. No linear search and delete + %% needed. + [ ets:delete(?SERVICE_TABLE, SvcName) || SvcName <- Services ], + + %% Delete all entries in the connection table that matches the closed + %% connection. + ets:delete(?CONNECTION_TABLE, FromPid), + + + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + persistent_connections, + [], + CompSpec), + %% Check if this is a static node. If so, setup a timer for a reconnect + case lists:member(NetworkAddress, PersistentConnections) of + true -> + ?info("dlink_tcp:socket_closed(): Reconnect address: ~p", [ NetworkAddress ]), + ?info("dlink_tcp:socket_closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]), + [ IP, Port] = string:tokens(NetworkAddress, ":"), + + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, + IP, Port, CompSpec); + false -> ok end, ok; handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> - ?info("data_link_bert:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), ok. %% JSON-RPC entry point %% CAlled by local exo http server -handle_notification("announce_available_local_service", Args) -> - { ok, Service } = rvi_common:get_json_element(["service"], Args), - gen_server:cast(?SERVER, { rvi, announce_available_local_service, [Service]}), +handle_notification("service_available", Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + {ok, Address} = rvi_common:get_json_element(["address"], Args), + + gen_server:cast(?SERVER, { rvi, service_available, + [ SvcName, + DataLinkModule, + Address ]}), + ok; +handle_notification("service_unavailable", Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + {ok, Address} = rvi_common:get_json_element(["address"], Args), -handle_notification("announce_unavailable_local_service", Args) -> - { ok, Service } = rvi_common:get_json_element(["service"], Args), + gen_server:cast(?SERVER, { rvi, service_unavailable, + [ SvcName, + DataLinkModule, + Address ]}), - gen_server:cast(?SERVER, { rvi, announce_unavailable_local_service, [Service]}), ok; handle_notification(Other, _Args) -> - ?info("data_link_bert:handle_notification(~p): unknown", [ Other ]), + ?info("dlink_tcp:handle_notification(~p): unknown", [ Other ]), ok. handle_rpc("setup_data_link", Args) -> - { ok, Address } = rvi_common:get_json_element(["network_address"], Args), - Res = gen_server:call(?SERVER, { rvi, setup_data_link, - [ Address]}), - {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; + { ok, Service } = rvi_common:get_json_element(["service"], Args), + + { ok, Opts } = rvi_common:get_json_element(["opts"], Args), + + [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link, + [ Service, Opts ] }), + + {ok, [ {status, rvi_common:json_rpc_status(Res)} , { timeout, Timeout }]}; handle_rpc("disconenct_data_link", Args) -> { ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), - Res = gen_server:call(?SERVER, { rvi, disconnect_data_link, [NetworkAddress]}), + [Res] = gen_server:call(?SERVER, { rvi, disconnect_data_link, [NetworkAddress]}), {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; handle_rpc("send_data", Args) -> - {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), - { ok, Data} = rvi_common:get_json_element(["data"], Args), - [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [NetworkAddress, Data]}), + { ok, Service } = rvi_common:get_json_element(["service"], Args), + { ok, Data } = rvi_common:get_json_element(["data"], Args), + [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [Service, Data]}), {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; - + handle_rpc(Other, _Args) -> - ?info("data_link_bert:handle_rpc(~p): unknown", [ Other ]), + ?info("dlink_tcp:handle_rpc(~p): unknown", [ Other ]), { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. -handle_cast({rvi, announce_available_local_service, [Service]}, St) -> - announce_local_service_(St#st.cs, Service, available), +handle_cast( {rvi, service_available, [SvcName, local, _Address]}, St) -> + announce_local_service_(St#st.cs, SvcName, available), {noreply, St}; -handle_cast({rvi, announce_unavailable_local_service, [Service]}, St) -> - announce_local_service_(St#st.cs, Service, unavailable), + +handle_cast( {rvi, service_unavailable, [SvcName, local, _Address]}, St) -> + announce_local_service_(St#st.cs, SvcName, unavailable), {noreply, St}; handle_cast(Other, St) -> - ?warning("data_link_bert:handle_cast(~p): unknown", [ Other ]), + ?warning("dlink_tcp:handle_cast(~p): unknown", [ Other ]), {noreply, St}. -handle_call({rvi, setup_data_link, [ NetworkAddress ]}, _From, St) -> - [ RemoteAddress, RemotePort] = string:tokens(NetworkAddress, ":"), - Res = connect_remote(RemoteAddress, list_to_integer(RemotePort), St#st.cs), - { reply, [Res], St }; + +handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> + case proplists:get_value(target, Opts, undefined) of + undefined -> + ?info("dlink_tcp:setup_data_link(~p) Failed: no target given in options.", + [Service]), + { reply, [ no_route, 0 ], St }; + + Addr -> + [ Address, Port] = string:tokens(Addr, ":"), + + case connect_remote(Address, list_to_integer(Port), St#st.cs) of + ok -> + { reply, [ok, 2000], St }; %% 2 second timeout + + Err -> + { reply, [Err, 0], St } + end + end; handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> @@ -457,23 +562,31 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> { reply, [ Res ], St }; -handle_call({rvi, send_data, [NetworkAddress, Data]}, _From, St) -> - [ RemoteAddress, RemotePortStr] = string:tokens(NetworkAddress, ":"), - RemotePort = list_to_integer(RemotePortStr), - ?info("data_link_bert:send_data(): Remote: ~p:~p", [ RemoteAddress, RemotePort]), - Res = connection:send(RemoteAddress, RemotePort, {receive_data, Data}), - { reply, [ Res ], St}; +handle_call({rvi, send_data, [Service, Data]}, _From, St) -> + + %% Resolve connection pid from service + case ets:lookup(?SERVICE_TABLE, Service) of + [ #service_entry { connection = ConnPid } ] -> + ?debug("dlink_tcp:send_data(): ~p -> ~p", [ Service, ConnPid]), + Res = connection:send(ConnPid, {receive_data, Data}), + { reply, [ Res ], St}; + + [] -> %% Service disappeared during send. + { reply, [ no_route ], St} + end; + + handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) -> %% Create a timer to handle periodic pings. {ok, ServerOpts } = rvi_common:get_module_config(data_link, - dlink_tcp, + ?MODULE, bert_rpc_server, [], St#st.cs), Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), - ?info("data_link_bert:setup_ping(): ~p:~p will be pinged every ~p msec", + ?info("dlink_tcp:setup_ping(): ~p:~p will be pinged every ~p msec", [ Address, Port, Timeout] ), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }), @@ -481,7 +594,7 @@ handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) -> {reply, ok, St}; handle_call(Other, _From, St) -> - ?warning("data_link_bert:handle_rpc(~p): unknown", [ Other ]), + ?warning("dlink_tcp:handle_rpc(~p): unknown", [ Other ]), { reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, St}. @@ -492,7 +605,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> %% Check that connection is up case connection:is_connection_up(Pid) of true -> - ?info("data_link_bert:ping(): Pinging: ~p:~p", [Address, Port]), + ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]), connection:send(Pid, ping), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); @@ -503,13 +616,13 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> {noreply, St}; %% Setup static nodes -handle_info({ rvi_setup_static_node_data_link, Prefix, IP, Port, CompSpec }, St) -> - connect_and_retry_remote(Prefix, IP, Port, CompSpec), +handle_info({ rvi_setup_persitent_connection, IP, Port, CompSpec }, St) -> + connect_and_retry_remote(IP, Port, CompSpec), { noreply, St }; handle_info(Info, St) -> - ?notice("data_link_bert(): Unkown message: ~p", [ Info]), + ?notice("dlink_tcp(): Unkown message: ~p", [ Info]), {noreply, St}. terminate(_Reason, _St) -> @@ -517,9 +630,11 @@ terminate(_Reason, _St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. -setup_static_node_reconnect_timer(MSec, Prefix, IP, Port, CompSpec) -> +setup_reconnect_timer(MSec, IP, Port, CompSpec) -> erlang:send_after(MSec, ?MODULE, - { rvi_setup_static_node_data_link, - Prefix, IP, Port, CompSpec }), + { rvi_setup_persitent_connection, + IP, Port, CompSpec }), ok. + + diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index eadec8b..4ec5038 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -74,7 +74,7 @@ new_connection(IP, Port, Sock, State) -> %% first data. %% Provide component spec as extra arg. {ok, _P} = connection:setup(undefined, 0, Sock, - data_link_bert_rpc_rpc, + dlink_tcp_rpc, handle_socket, [gen_nb_server:get_cb_state(State)]), {ok, State}. diff --git a/components/proto_bert/src/proto_bert.app.src b/components/proto_bert/src/proto_bert.app.src index d3ab76f..0fec506 100644 --- a/components/proto_bert/src/proto_bert.app.src +++ b/components/proto_bert/src/proto_bert.app.src @@ -8,7 +8,7 @@ %% -*- erlang -*- -{application, proto_bert_rpc, +{application, proto_bert, [ {description, ""}, {vsn, "0.1"}, diff --git a/components/proto_bert/src/proto_bert_app.erl b/components/proto_bert/src/proto_bert_app.erl index 101e0c4..68ee09e 100644 --- a/components/proto_bert/src/proto_bert_app.erl +++ b/components/proto_bert/src/proto_bert_app.erl @@ -21,14 +21,14 @@ %% =================================================================== start(_StartType, _StartArgs) -> - protocol_sup:start_link(). + proto_bert_sup:start_link(). start_phase(init, _, _) -> - protocol_rpc:init_rvi_component(), + proto_bert_rpc:init_rvi_component(), ok; start_phase(json_rpc, _, _) -> - protocol_rpc:start_json_server(), + proto_bert_rpc:start_json_server(), ok. stop(_State) -> diff --git a/components/proto_bert/src/proto_bert_rpc.erl b/components/proto_bert/src/proto_bert_rpc.erl index 805ba63..3baf356 100644 --- a/components/proto_bert/src/proto_bert_rpc.erl +++ b/components/proto_bert/src/proto_bert_rpc.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -export([start_json_server/0]). --export([send_message/10, +-export([send_message/9, receive_message/2]). -record(st, { @@ -34,11 +34,11 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> - ?debug("protocol_rpc:init(): called."), + ?debug("proto_bert_rpc:init(): called."), {ok, #st { cs = rvi_common:get_component_specification() } }. start_json_server() -> - rvi_common:start_json_rpc_server(protocol, ?MODULE, protocol_sup). + rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_bert_sup). @@ -48,7 +48,6 @@ send_message(CompSpec, ProtoOpts, DataLinkMod, DataLinkOpts, - NetworkAddress, Parameters, Signature, Certificate) -> @@ -58,7 +57,6 @@ send_message(CompSpec, { protocol_opts, ProtoOpts }, { data_link_mod, DataLinkMod }, { data_link_opts, DataLinkOpts }, - { network_address, NetworkAddress }, { parameters, Parameters }, { signature, Signature }, { certificate, Certificate }], @@ -78,7 +76,6 @@ handle_rpc("send_message", Args) -> {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args), {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), - {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), {ok, Signature} = rvi_common:get_json_element(["signature"], Args), {ok, Certificate} = rvi_common:get_json_element(["certificate"], Args), @@ -88,7 +85,6 @@ handle_rpc("send_message", Args) -> ProtoOpts, DataLinkMod, DataLinkOpts, - NetworkAddress, Parameters, Signature, Certificate]}), @@ -97,7 +93,7 @@ handle_rpc("send_message", Args) -> handle_rpc(Other, _Args) -> - ?warning("protocol_rpc:handle_rpc(~p): Unknown~n", [ Other ]), + ?warning("proto_bert_rpc:handle_rpc(~p): Unknown~n", [ Other ]), { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. @@ -108,7 +104,7 @@ handle_notification("receive_message", Args) -> ok; handle_notification(Other, _Args) -> - ?debug("protocol_rpc:handle_other(~p): unknown", [ Other ]), + ?debug("proto_bert_rpc:handle_other(~p): unknown", [ Other ]), ok. @@ -118,31 +114,28 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, - NetworkAddress, Parameters, Signature, Certificate]}, _From, St) -> ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), - ?debug(" protocol:send(): opts: ~p~n", [Opts]), + ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]), ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), - ?debug(" protocol:send(): network_address: ~p~n", [NetworkAddress]), %% ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), ?debug(" protocol:send(): signature: ~p~n", [Signature]), ?debug(" protocol:send(): certificate: ~p~n", [Certificate]), - Data = term_to_binary({ ServiceName, Timeout, NetworkAddress, - Parameters, Signature, Certificate }), + Data = term_to_binary({ ServiceName, Timeout, Parameters, Signature, Certificate }), - Res = DataLinkMod:send_data(St#st.cs, NetworkAddress, DataLinkOpts, Data), + Res = DataLinkMod:send_data(St#st.cs, ServiceName, DataLinkOpts, Data), { reply, Res, St }; handle_call(Other, _From, St) -> - ?warning("protocol_rpc:handle_call(~p): unknown", [ Other ]), + ?warning("proto_bert_rpc:handle_call(~p): unknown", [ Other ]), { reply, [ invalid_command ], St}. %% Convert list-based data to binary. @@ -152,13 +145,11 @@ handle_cast({rvi, receive_message, [Data]}, St) when is_list(Data)-> handle_cast({rvi, receive_message, [Data]}, St) -> { ServiceName, Timeout, - NetworkAddress, Parameters, Signature, Certificate } = binary_to_term(Data), ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), - ?debug(" protocol:rcv(): network_address: ~p~n", [NetworkAddress]), %% ?debug(" protocol:rcv(): parameters: ~p~n", [Parameters]), ?debug(" protocol:rcv(): signature: ~p~n", [Signature]), ?debug(" protocol:rcv(): certificate: ~p~n", [Certificate]), @@ -166,7 +157,6 @@ handle_cast({rvi, receive_message, [Data]}, St) -> service_edge_rpc:handle_remote_message(St#st.cs, ServiceName, Timeout, - NetworkAddress, Parameters, Signature, Certificate), @@ -174,7 +164,7 @@ handle_cast({rvi, receive_message, [Data]}, St) -> handle_cast(Other, St) -> - ?warning("protocol_rpc:handle_cast(~p): unknown", [ Other ]), + ?warning("proto_bert_rpc:handle_cast(~p): unknown", [ Other ]), {noreply, St}. handle_info(_Info, St) -> diff --git a/components/proto_bert/src/proto_bert_sup.erl b/components/proto_bert/src/proto_bert_sup.erl index 9c23f9a..449553c 100644 --- a/components/proto_bert/src/proto_bert_sup.erl +++ b/components/proto_bert/src/proto_bert_sup.erl @@ -34,6 +34,6 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ - ?CHILD(proto_bert_rpc_rpc, worker) + ?CHILD(proto_bert_rpc, worker) ]} }. diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index c28bb8c..e50f9ec 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -26,8 +26,6 @@ -export([remote_service_to_string/1]). -export([remote_service_to_string/2]). -export([local_service_prefix/0]). --export([get_static_node/1]). --export([static_nodes/0]). -export([node_address_string/0]). -export([node_address_tuple/0]). -export([get_request_result/1]). @@ -46,7 +44,6 @@ -define(NODE_SERVICE_PREFIX, node_service_prefix). -define(NODE_ADDRESS, node_address). --define(STATIC_NODES, static_nodes). json_rpc_status(0) -> @@ -85,6 +82,10 @@ json_rpc_status(5) -> json_rpc_status("5") -> already_connected; + +json_rpc_status(6) -> + no_route; + json_rpc_status("6") -> no_route; @@ -403,62 +404,6 @@ local_service_prefix() -> _ -> Prefix ++ "/" end. -static_nodes() -> - case application:get_env(rvi, ?STATIC_NODES) of - - {ok, NodeList} -> - NodeList; - - undefined -> - not_found - end. - - -%% Locate the statically configured node whose service(s) prefix- -%% matches the provided service. -%% FIXME: Longest prefix match. -get_static_node(Service) -> - case application:get_env(rvi, ?STATIC_NODES) of - - {ok, NodeList} when is_list(NodeList) -> - get_static_node(Service, NodeList); - - undefined -> - ?debug("No ~p configured under rvi.", [?STATIC_NODES]), - not_found - end. - -get_static_node(_Service, []) -> - not_found; - -%% Validate that argumenst are all lists. -get_static_node(Service, [{ SvcPrefix, NetworkAddress} | T ]) when - not is_list(Service); not is_list(SvcPrefix); not is_list(NetworkAddress) -> - ?warning("rvi_common:get_static_node(): Could not resolve ~p against {~p, ~p}:" - "One or more elements not strings.", [ Service, SvcPrefix, NetworkAddress]), - get_static_node(Service, T ); - - -%% If the service we are trying to resolve has a shorter name than -%% the prefix we are comparing with, ignore. -get_static_node(Service, [{ SvcPrefix, _NetworkAddress } | T ]) when - length(Service) < length(SvcPrefix) -> - ?debug("rvi_common:get_static_node(): Service: ~p is shorter than prefix ~p. Ignore.", - [ Service, SvcPrefix]), - get_static_node(Service, T ); - -get_static_node(Service, [{ SvcPrefix, NetworkAddress} | T] ) -> - case string:str(Service, SvcPrefix) of - 1 -> - ?debug("rvi_common:get_static_node(): Service: ~p -> { ~p, ~p}.", - [ Service, SvcPrefix, NetworkAddress]), - NetworkAddress; - _ -> - ?debug("rvi_common:get_static_node(): Service: ~p != { ~p, ~p}.", - [ Service, SvcPrefix, NetworkAddress]), - get_static_node(Service, T ) - end. - node_address_string() -> case application:get_env(rvi, ?NODE_ADDRESS) of diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl index d5b0548..0e6dfbe 100644 --- a/components/schedule/src/rvi_routing.erl +++ b/components/schedule/src/rvi_routing.erl @@ -9,6 +9,9 @@ -behaviour(gen_server). + +-include_lib("lager/include/log.hrl"). + %% API -export([get_service_routes/1]). -export([start_link/0]). @@ -88,7 +91,7 @@ init([]) -> %% @end %%-------------------------------------------------------------------- handle_call( { rvi_get_service_routes, Service }, _From, St) -> - {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t}; + {reply, normalize_routes_(find_routes_( St#st.routes, Service), []), t}; handle_call(_Request, _From, St) -> Reply = ok, @@ -179,8 +182,13 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) false -> %% Continue with the old routes and matching len installed find_routes_(T, Service, CurRoutes, CurMatchLen) - end. + end; +find_routes_(Rt, Svc, CurRoutes, CurMatchLen) -> + ?info("----------------Failed on ~p", [Rt]), + { x, y}. + + find_routes_(Routes, Service) -> case find_routes_(Routes, Service, undefined, 0) of { undefined, 0 } -> @@ -203,6 +211,5 @@ normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) -> normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) -> normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]); -normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) -> - normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]). - +normalize_routes_({ServicePrefix, [ {Pr, DL} | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, [] } } | Acc]). diff --git a/components/schedule/src/rvi_schedule.erl b/components/schedule/src/rvi_schedule.erl index 00a142a..2b4606e 100644 --- a/components/schedule/src/rvi_schedule.erl +++ b/components/schedule/src/rvi_schedule.erl @@ -7,6 +7,7 @@ %% -module(rvi_schedule). + -include_lib("rvi_common/include/rvi_common.hrl"). -callback schedule_message(CompSpec :: #component_spec{}, diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 310823c..7a1b23a 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -8,20 +8,17 @@ -module(schedule_rpc). -behaviour(gen_server). --behaviour(rvi_schedule). -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). %% API -export([start_link/0]). --export([schedule_message/6, - register_remote_services/3, - unregister_remote_services/2]). +-export([schedule_message/6]). %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior -export([service_available/3, - service_unavailable/3] + service_unavailable/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -40,12 +37,10 @@ %% %% Service -> ETS -> Messages -record(service, { - name = "", %% Fully qualified service name. - %% Set to { IP, Port} when service is available, else unknown_network_address. - address = unknown, + key = { "", unknown }, %% Service name and data link modu - %% Targeted data link module - data_link_module = unknown + %% Is this service currently available on the data link module + available = false, %% Table containing #message records, %% indexed by their transaction ID (and sequence of delivery) @@ -117,7 +112,7 @@ init([]) -> cs = rvi_common:get_component_specification(), services_tid = ets:new(rvi_schedule_services, [ set, private, - { keypos, #service.name } ])}}. + { keypos, #service.key } ])}}. start_json_server() -> rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup). @@ -139,35 +134,19 @@ schedule_message(CompSpec, [status, transaction_id], CompSpec). -register_remote_services(CompSpec, NetworkAddress, Services) -> - rvi_common:notification(schedule, ?MODULE, - register_remote_services, - [{ network_address, NetworkAddress} , - { services, Services }], - CompSpec). - - -unregister_remote_services(CompSpec, ServiceNames) -> - rvi_common:notification(schedule, ?MODULE, - unregister_remote_services, - [{ services, ServiceNames }], - CompSpec). - -service_available(Service, DataLinkModule, Address) -> +service_available(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(schedule, ?MODULE, service_available, - [{ service, Service }, - { data_link_module, DataLinkModule }, - { address, Address }], + [{ service, SvcName }, + { data_link_mod, DataLinkModule } ], CompSpec). -service_unavailable(Service, DataLinkModule, Address) -> +service_unavailable(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(schedule, ?MODULE, service_unavailable, - [{ service, Service }, - { data_link_module, DataLinkModule }, - { address, Address }], + [{ service, SvcName }, + { data_link_mod, DataLinkModule } ], CompSpec). %% JSON-RPC entry point @@ -197,54 +176,28 @@ handle_rpc("schedule_message", Args) -> { transaction_id, TransID } ] }; - - handle_rpc(Other, _Args) -> ?debug("schedule_rpc:handle_rpc(~p): unknown", [ Other ]), {ok, [ {status, rvi_common:json_rpc_status(invalid_command)}]}. -handle_notification("register_remote_services", Args) -> - {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), - {ok, Services} = rvi_common:get_json_element(["services"], Args), - ?debug("schedule_notification:register_remote_services(): network_address: ~p", [ NetworkAddress]), - ?debug("schedule_notification:register_remote_services(): services: ~p", [ Services]), - - gen_server:cast(?SERVER, { rvi, register_remote_services, - [ NetworkAddress, - Services ]}), - - ok; - -handle_notification("unregister_remote_services", Args) -> - {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args), - ?debug("schedule_notification:unregister_remote_services(): services ~p", - [ DiscountinuedServices]), - - gen_server:cast(?SERVER, { rvi, unregister_remote_services, - [ DiscountinuedServices ]}), - ok; handle_notification("service_available", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), gen_server:cast(?SERVER, { rvi, service_available, - [ Service, - DataLinkModule, - Address ]}), + [ SvcName, + DataLinkModule ]}), ok; handle_notification("service_unavailable", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), gen_server:cast(?SERVER, { rvi, service_unavailable, - [ Service, - DataLinkModule, - Address ]}), + [ SvcName, + DataLinkModule ]}), ok; @@ -264,20 +217,28 @@ handle_call( { rvi, schedule_message, ?debug("schedule:sched_msg(): parameters: ~p", [Parameters]), ?debug("schedule:sched_msg(): signature: ~p", [Signature]), ?debug("schedule:sched_msg(): certificate ~p", [Certificate]), - ?debug("schedule:sched_msg(): St: ~p", [St]), + %%?debug("schedule:sched_msg(): St: ~p", [St]), %% %% Retrieve the routes that we should try for this message %% - case rvi_routing:get_service_routing(SvcName) of + case rvi_routing:get_service_routes(SvcName) of not_found -> {reply, [ no_route ], St }; Routes -> - { NewTransID, NSt1} = create_transaction_id(St), - NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature); - - { reply, [ok, TransID], NS2 } + { TransID, NSt1} = create_transaction_id(St), + NSt2 = queue_message(SvcName, + TransID, + Routes, + Timeout, + calc_relative_tout(Timeout), + Parameters, + Signature, + Certificate, + NSt1), + + { reply, [ok, TransID], NSt2 } end; @@ -297,71 +258,40 @@ handle_call(Other, _From, St) -> %% @end %%-------------------------------------------------------------------- -handle_cast( {rvi, register_remote_services, - [ NetworkAddress, Services]}, St) -> - - ?info("schedule:register_remote_services(): services(~p) -> ~p", - [Services, NetworkAddress]), - - {ok, NSt} = multiple_services_available(Services, NetworkAddress, St), - {noreply, NSt}; +handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) -> + %% Find or create the service. + ?debug("schedule:service_available(): ~p:~p", [ DataLinkModule, SvcName ]), -handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) -> - ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]), - {ok, NSt} = multiple_services_unavailable(ServiceNames, St), - {noreply, NSt }; + SvcRec = update_service(SvcName, DataLinkModule, available, St), -handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) -> - %% Find the service - case ets:lookup(SvcTid, {Service, DataLinkModule}) of - [] -> %% No service found - Just unsubscribe. Shouldn't really happen - service_discovery_rpc: - unsubscribe_to_service_availability({Service, DataLinkModule}, - ?MODULE); - - { noreply, St }; + %% Try to send any pending messages. + { _, NSt} = try_sending_messages(SvcRec, St), - [ Svc ] -> - %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} - ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.", - [ DataLinkModule, Service, Address ]), - update_service_network_address(Svc, Address, St), - { _, NSt2 } = try_sending_messages(Svc, NSt1), - { noreply, NSt2} - end. + { noreply, NSt}; -handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]}, +handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, #st { services_tid = SvcTid } = St) -> - %% Find the service - case ets:lookup(SvcTid, {Service, DataLinkModule}) of + %% Grab the service + case ets:lookup(SvcTid, {SvcName, DataLinkModule}) of [] -> %% No service found - no op. {noreply, St}; - [ Svc ] -> - %% Delete service if it is unused. - case delete_unused_service(SvcTid, Svc) of + [ SvcRec ] -> + %% Delete service if it does not have any pending messages. + case delete_unused_service(SvcTid, SvcRec) of true -> %% service was deleted - - %% Unsubscribe from service availablility notifications - service_discovery_rpc: - unsubscribe_to_service_availability({ Service, DataLinkModule}, - ?MODULE); - { noreply, St}; - false -> %% Service was not deleted, update its network address to unknown - { _, _, _, NSt} = - update_service_network_address(Svc, unknown_network_address, St), - - { noreply, NSt } + false -> %% SvcName was not deleted, set it to not available + update_service(SvcName, DataLinkModule, unavailable, St), + + { noreply, St } end - end. - { noreply, St}; + end; handle_cast(Other, St) -> @@ -381,24 +311,41 @@ handle_cast(Other, St) -> %% Handle timeouts handle_info({ rvi_message_timeout, SvcName, DLMod, TransID}, - #st { cs = CompSpec, services_tid = SvcTid } = St) -> + #st { services_tid = SvcTid } = St) -> + %% Look up the service / DataLink mod case ets:lookup(SvcTid, {SvcName, DLMod}) of - [ Svc ] -> - %% Delete from ets. - case ets:lookup(Svc#service.messages_tid, TransID) of + [ SvcRec ] -> %% Found service for specific data link + + %% Delete message from service queue + case ets:lookup(SvcRec#service.messages_tid, TransID) of [ Msg ] -> ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), - ets:delete(Svc#service.messages_tid, TransID), - queue_message( + ets:delete(SvcRec#service.messages_tid, TransID), + + %% Try to requeue message + { _Res, NSt } = + queue_message(SvcName, + Msg#message.transaction_id, + Msg#message.routes, + Msg#message.timeout, + calc_relative_tout(Msg#message.timeout), + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate, + St), + {noreply, NSt}; + _ -> ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), - ok + + {noreply, St} + end; - _-> ok - end, - {noreply, St}; + _ -> {noreply, St} + + end; handle_info(_Info, St) -> @@ -435,289 +382,234 @@ code_change(_OldVsn, St, _Extra) -> +%% %% No more routes to try -queue_message(_SvcName, +%% +queue_message(SvcName, _TransID, - _MessageTimeout [ ], + _MessageTimeout, + _RelativeTimeout, _Parameters, _Signature, _Certificate, St) -> %% FIXME: Handle route failure + ?notice("schedule:queue_message(): Ran out of routes to try for ~p", [SvcName]), { route_failed, St }; +%% +%% The message has timed out +%% +queue_message(SvcName, + TransID, + _Routes, + _MessageTimeout, + -1, %% We are timed out + _Parameters, + _Signature, + _Certificate, + St) -> + do_timeout_callback(St#st.cs, SvcName, TransID), + { timeout, St }; + + +%% Try to queue message queue_message(SvcName, TransID, - [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ], - MessageTimeout, + [ { { _Pr, _PrOp }, { DLMod, DLOp } } | RemainingRoutes ], + Timeout, + RelativeTimeout, Parameters, Signature, Certificate, St) -> + %% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), %% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), %% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), %% ?info("schedule:sched_msg(): signature: ~p", [Signature]), %% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), - %% Calculate how many msec we have until message times out + SvcRec = find_or_create_service(SvcName, DLMod, St), - RelativeTimeout = calc_relative_tout(MessageTimeout), - - %% Did we run out of time for the message? - case RelativeTimeout > 1 of - %% Nope - true -> - {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St), - - Msg = #message { - service = SvcName, - transaction_id = TransID, - data_link = { DLMod, DLOp }, - routes = RemaingRoutes, - message_timeout = MessageTimeout, - route_timeout_ref = TRef, - parameters = Parameters, - signature = Signature, - certificate = Certificate - }, - - - %% Add to ets table - TransID = ets:insert(Service#service.messages_tid, Msg), - - %% If this is a new service, subscribe to updates on this service's availablity. - %% If service is already available, we will get service_available/3 notification - %% immediately. - case IsNewService of - true -> - service_discovery_rpc: - subscribe_to_service_availability({SvcName, DLMod}, - ?MODULE); - false -> - ok - end, + %% The service may already be available, give it a shot. + case try_sending_messages(SvcRec, St) of + { ok, NSt } -> %% We managed to send the message. Done. + { ok, NSt }; + { _ , NSt } -> %% Failed to send message. Setup data link + %% %% Bring up the relevant data link for the given route. %% Once up, the data link will invoke service_availble() %% to indicate that the service is available for the given DL. %% - case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of + case DLMod:setup_data_link(NSt#st.cs, SvcName, DLOp) of { ok, DLTimeout } -> - %% Setup a timeout to be - TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout), + + %% Setup a timeout that triggers on whatever + %% comes first of the message's general timeout + %% or the timeout of the data link bringup + TRef = erlang:send_after(min(RelativeTimeout, DLTimeout), self(), - { rvi_message_timeout, SvcName, DLMod, TransID }); - + { rvi_message_timeout, SvcName, DLMod, TransID }), + + %% Setup a message to be added to the service / dl table. + Msg = #message { + transaction_id = TransID, + data_link = { DLMod, DLOp }, + routes = RemainingRoutes, + timeout = Timeout, + timeout_tref = TRef, + parameters = Parameters, + signature = Signature, + certificate = Certificate + }, + + + %% Add to ets table + TransID = ets:insert(SvcRec#service.messages_tid, Msg), + {ok, NSt}; + %% We failed with this route. Try the next one { error, _Reason} -> queue_message(SvcName, TransID, - RelativeTimeout, + Timeout, + calc_relative_tout(Timeout), RemainingRoutes, Parameters, Signature, Certificate, - St) - end; - - %% We are out of time - false -> - do_timeout_callback(CompSpec, Svc, Msg), - { timeout, St} + NSt) + end end. -forward_to_protocol(Svc, Msg, St) -> - { DataLinkMod, DataLinkOpts } = Msg#message.data_link, - { ProtoMod, ProtoOpts } = Msg#messge.protocol, - - case ProtoMod:send_message( - St#st.cs, - SvcName, - Msg#message.timeout, - ProtoOpts, - DataLinkMod, - DataLinkOpts, - NetworkAddress, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate) of - - %% Success - [ok] -> - %% Send the rest. - try_sending_messages(Service, St); - - %% Failed - [Err] -> - ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p", - [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]), - - %% Requeue this message with the next route - { _, St1} = queue_message(SvcName, - TransID, - Msg#message.timeout, - Msg#message.routes, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate, - St), - - %% Send the rest of the messgages targeting - %% the same service through the same data link - try_sending_messages(Svc, St) - end. - - - -%% Check if we can send messages queued up under the given service. +%% The service is not available try_sending_messages(#service { - name = SvcName, - network_address = unknown_network_address, - messages_tid = _Tid } = _Service, St) -> + key = { SvcName, _ }, + available = false, + messages_tid = _Tid } = _SvcRec, St) -> + ?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]), - {not_available, St}; + { not_available, St }; try_sending_messages(#service { - name = SvcName, - data_link_module = DataLinkModule, - network_address = NetworkAddress, - messages_tid = Tid } = Service, St) -> + key = { SvcName, DataLinkMod }, + available = true, + messages_tid = Tid } = SvcRec, St) -> - ?debug("schedule:try_send(): SvcName: ~p", [SvcName]), - ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]), + ?debug("schedule:try_send(): Service: ~p:~p", [DataLinkMod, SvcName]), %% Extract the first message of the queue. - case ets:first(Tid) of - %% No more messages to send. - '$end_of_table' -> + case first_service_message(SvcRec) of + empty -> ?debug("schedule:try_send(): Nothing to send"), { ok, St }; - Key -> - St1 = forward_to_protocol - ?debug("schedule:try_send(): Sending: ~p", [Key]), - %% Extract first message and try to send it - case ets:lookup(Tid, Key) of - [ Msg ] -> - %% Wipe from ets table and cancel timer - ets:delete(Tid, Key), - erlang:cancel_timer(Msg#message.timeout_tref), - - %% Forward to protocol and resend - forward_to_protocol(Service, Msg, St); - - _ -> - ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), - { ok, St} - + yanked -> + ?info("schedule:try_send(): Message was yanked while trying to send: ~p", + [SvcRec#service.key]), + { ok, St}; + + Msg -> + %% Wipe from ets table and cancel timer + ets:delete(Tid, Msg#message.transaction_id), + + erlang:cancel_timer(Msg#message.timeout_tref), + + %% Forward to protocol. + { _, DataLinkOpts } = Msg#message.data_link, + { ProtoMod, ProtoOpts } = Msg#message.protocol, + + %% Send off message to the correct protocol module + case ProtoMod:send_message( + St#st.cs, + SvcName, + Msg#message.timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate) of + + %% Success + [ok] -> + %% Send the rest of the messages associated with this service/dl + try_sending_messages(SvcRec, St); + + %% Failed + [Err] -> + ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p", + [ProtoMod, DataLinkMod, SvcName, Err]), + + %% Requeue this message with the next route + { _, St1} = queue_message(SvcName, + Msg#message.transaction_id, + Msg#message.timeout, + calc_relative_tout(Msg#message.timeout), + Msg#message.routes, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate, + St), + + %% Send the rest of the messgages targeting + %% the same service through the same data link + %% If they all fail due to some data link error, + %% they will all be requeued to the next route. + try_sending_messages(SvcRec, St1) end end. -%% -%% data_link_up has reported that multiple services are now -%% available at NetworkAddress. -%% Iterate through all services and mark them as available, -%% possibly sending any pending message targeting the newly activated -%% service. -%% -multiple_services_available([], _NetworkAddress, St) -> - {ok, St}; - -multiple_services_available([ Svc | T], NetworkAddress, St) -> - {ok, NSt} = service_available(Svc, NetworkAddress, St), - multiple_services_available(T, NetworkAddress, NSt). - -multiple_services_unavailable([], St) -> - {ok, St}; - -multiple_services_unavailable([ SvcName | T], St) -> - {ok, NSt} = service_unavailable(SvcName, St), - multiple_services_unavailable(T, NSt). +find_or_create_service(SvcName, DataLinkMod, #st { services_tid = SvcTid } = St) -> + ?debug("schedule:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]), - -find_or_create_service(ServiceName, St) -> - %% Invoke with retain to keep any exising network addresses already - %% in place in an existing service. - find_or_create_service(ServiceName, retain_existing_address, St). - -find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) -> - ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]), - - case ets:lookup(SvcTid, ServiceName) of + case ets:lookup(SvcTid, { SvcName, DataLinkMod }) of [] -> %% The given service does not exist, create it. - ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]), - create_service(ServiceName, NetworkAddress, St); - - [ Svc1 ] when NetworkAddress =:= retain_existing_address -> - %% We found a service, and are instructed not to touch - %% its network address. Return it. - { ok, Svc1, false, St}; + ?debug("schedule:find_or_create_service(): Creating new ~p", [ SvcName]), + update_service(SvcName, DataLinkMod, unavailable, St); - [ Svc2 ] -> + [ SvcRec ] -> %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} - ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]), - update_service_network_address(Svc2, NetworkAddress, St) + %% the new service / State as {ok, NSvcRec, false, NSt} + ?debug("schedule:find_or_create_service(): Updating existing ~p", [ SvcName]), + SvcRec end. -%% -%% Catch where no network address update is necessary. -%% -update_service_network_address(#service { network_address = NetworkAddress } = Service, - NetworkAddress, St) -> - { ok, Service, false, St}; %% False indicates that the service exists. - - -%% -%% Update the service in the ets table with a new network address. -%% -update_service_network_address(#service {} = Service, NetworkAddress, St) -> - %% Create a new service. - NewService = Service#service { network_address = NetworkAddress}, - %% Replace existing serviceo in the ets table of services. - ets:insert(St#st.services_tid, [ NewService ]), - { ok, NewService, false, St}. %% False indicates that the service exists. - - - - -%% If we are creating a new service, we need to remap the -%% retain_existing_address to unknown_network_address in order to get the correct -%% initial state, which is a created service with an unknown network address. -create_service(ServiceName, retain_existing_address, St) -> - create_service(ServiceName, unknown_network_address, St); - - -%% Create a new service and return the new state (not currently modified) -%% and the newly initialized service revord. +%% Create a new service, or update an existing one, and return the new +%% state (not currently modified) and the newly initialized service +%% revord. %% -create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) -> - Svc = #service { - name = ServiceName, - network_address = NetworkAddress, - messages_tid = ets:new(rvi_messages, - [ ordered_set, private, - { keypos, #message.transaction_id } ]) - }, +update_service(SvcName, DataLinkMod, Available, + #st { services_tid = SvcsTid, cs = CS }) -> + %% Return new service and existing state. + ?debug("schedule:create_service(): ~p:~p ", [ DataLinkMod, SvcName]), + SvcRec = #service { + key = { SvcName, DataLinkMod }, + available = Available, + messages_tid = ets:new(rvi_messages, + [ ordered_set, private, + { keypos, #message.transaction_id } ]) + }, %% Insert new service to ets table. - ets:insert(SvcsTid, Svc), - - %% Return new service and existing state. - ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]), - ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), - { ok, Svc, true, St}. %% True indicates that the service is newly created. + ets:insert(SvcsTid, SvcRec), + + %% Subscribe to updates on the availbility of this service. + service_discovery_rpc:subscribe(CS, SvcName, DataLinkMod), + + SvcRec. %% Create a new and unique transaction id @@ -748,7 +640,7 @@ calc_relative_tout(UnixTime) -> case TOut =< 0 of true -> - 1; %% One millisec is the smallest value we will time out on + -1; %% We have timed out false -> TOut * 1000 @@ -756,30 +648,45 @@ calc_relative_tout(UnixTime) -> %% Handle a callback for a timed out message. -do_timeout_callback(CompSpec, Service, - #message {transaction_id = TransID}) -> - service_edge_rpc:handle_local_timeout(CompSpec, Service, TransID), - ok; +do_timeout_callback(CompSpec, SvcName, TransID) -> + service_edge_rpc:handle_local_timeout(CompSpec, SvcName, TransID), + ok. -%% callback element of #message is not an {M,F,A} format, ignore. -do_timeout_callback(_,_,_) -> - ok. %% Kill off a service that is no longer used. %% -delete_unused_service(SvcTid, Svc) -> +delete_unused_service(SvcTid, SvcRec) -> %% Do we have messages waiting for this service? - case ets:first(Svc#service.messages_tid) of + case ets:first(SvcRec#service.messages_tid) of %% Nope. '$end_of_table' -> - ets:delete(Svc#messages_tid), - ets:delete(SvcTid, { Service, DataLinkModule}) + ets:delete(SvcRec#service.messages_tid), + ets:delete(SvcTid, SvcRec#service.key), + + { SvcName, DataLinkMod} = SvcRec#service.key, + %% Unsubscribe from service availablility notifications + service_discovery_rpc: + unsubscribe_to_service_availability(SvcName, DataLinkMod, ?MODULE), %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} + %% the new service / State as {ok, NSvcRec, false, NSt} ?debug("schedule:service_unavailable(): Service ~p:~p now has no address.", - [ DataLinkModule, Service ]), + [ SvcRec#service.key ]), true; _ -> false end. + +first_service_message(#service { messages_tid = Tid }) -> + case ets:first(Tid) of + '$end_of_table' -> + empty; + + Key -> + case ets:lookup(Tid, Key) of + [ Msg ] -> Msg; + [] -> yanked + end + end. + + diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl index 8589df0..76c403e 100644 --- a/components/schedule/src/schedule_sup.erl +++ b/components/schedule/src/schedule_sup.erl @@ -34,7 +34,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ - ?CHILD(schedule_rpc, worker) - ?CHILD(router, worker) + ?CHILD(schedule_rpc, worker), + ?CHILD(rvi_routing, worker) ]} }. diff --git a/components/service_discovery/curl_scripts/register_service.sh b/components/service_discovery/curl_scripts/register_service.sh deleted file mode 100644 index 7441eb4..0000000 --- a/components/service_discovery/curl_scripts/register_service.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/sh -# Create new accounts, like the ga account -# But other accounts may be create -. $HOME/.exodmrc - - #if [ $# != 2 ] -#then -# echo "Usage: $0 temperature" -# exit 255 -#fi -# the password (actually erlang node cookie) must be 100% hidden -# so this is only for testing!!!!! - -URL=http://localhost:8801/exodm/rpc -curl -u $USER_AUTH -k -X POST $URL -d @- << EOF -{ - "jsonrpc": "2.0", - - "method": "service_discovery:register_service", - "id": "1", - "params": - { - "service": "hvac", - "address": "http://localhost:8901", - "methods": [ - { "access_type": "rpc", "method": "set_temperature" }, - { "access_type": "rpc", "method": "set_fan_speed" } - ] - } -} -EOF diff --git a/components/service_discovery/curl_scripts/resolve_service.sh b/components/service_discovery/curl_scripts/resolve_service.sh deleted file mode 100644 index d480a7d..0000000 --- a/components/service_discovery/curl_scripts/resolve_service.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/sh -# Create new accounts, like the ga account -# But other accounts may be create -. $HOME/.exodmrc - - #if [ $# != 2 ] -#then -# echo "Usage: $0 temperature" -# exit 255 -#fi -# the password (actually erlang node cookie) must be 100% hidden -# so this is only for testing!!!!! - -URL=http://localhost:8801/exodm/rpc -curl -u $USER_AUTH -k -X POST $URL -d @- << EOF -{ - "jsonrpc": "2.0", - - "method": "service_discovery:resolve_remote_service", - "id": "1", - "params": - { - "service": "rpc://jaguarlandrover.com/vin/1234/services/hvac/set_speed" - } -} -EOF diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index 0819cb1..8b1622a 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -18,42 +18,35 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([get_all_services/1, - get_local_addresses/1, - get_remote_addresses/1, - get_local_services/1, - get_remote_services/1, - subscribe_to_service_availability/2, - unsubscribe_from_service_availability/2, - resolve_local_service/2, - resolve_remote_service/2, - register_remote_services/3, - register_local_service/3, - unregister_remote_services_by_address/2, - unregister_remote_services_by_name/2, - unregister_local_service/2]). +-export([get_services/1, + subscribe/3, + unsubscribe/3, + register_services/3, + unregister_services/3]). -export([start_json_server/0]). -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --define(LOCAL_SERVICE_TABLE, rvi_local_services). --define(REMOTE_SERVICE_TABLE, rvi_remote_services). --define(REMOTE_ADDRESS_TABLE, rvi_address_services). --define(NOTIFICATION_SUBS_TABLE, rvi_notification_subs). +-define(SERVICE_TABLE, rvi_services). +-define(MODULE_TABLE, rvi_modules). +-define(NOTIFICATION_TABLE, rvi_notification_subs). -record(service_entry, { - service = [], - address = {undefined,undefined} %% Module and Address where service can be found + service = [], %% Servie handled by this entry. + data_link_mod = undefined %% Module handling service, 'local' if local service }). + -record(notification_subs, { service = [], modules = [] %% List of modules subscribing to this service }). + -define(SERVER, ?MODULE). + -record(st, { %% Component specification cs = #component_spec{} @@ -64,11 +57,14 @@ start_link() -> init([]) -> ?debug("service_discovery_rpc:init(): called."), - ets:new(?REMOTE_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]), - ets:new(?REMOTE_ADDRESS_TABLE, [duplicate_bag, public, named_table, - {keypos, #service_entry.address}]), + ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table, + { keypos, #service_entry.service }]), + + ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table, + { keypos, #service_entry.data_link_mod }]), - ets:new(?NOTIFICATION_SUBS_TABLE, [set, public, named_table, { keypos, #notification_subs.service }]), + ets:new(?NOTIFICATION_TABLE, [set, public, named_table, + { keypos, #notification_subs.service }]), {ok, #st { cs = rvi_common:get_component_specification() } }. @@ -76,80 +72,33 @@ init([]) -> start_json_server() -> rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup). -get_all_services(CompSpec) -> - rvi_common:request(service_discovery, ?MODULE, - get_all_services, [], [status, services], CompSpec). - - -get_local_services(CompSpec) -> - rvi_common:request(service_discovery, ?MODULE, - get_local_services, [], [status, services], CompSpec). - -get_remote_services(CompSpec) -> - rvi_common:request(service_discovery, ?MODULE, - get_remote_services, [], [status, services], CompSpec). - -get_local_addresses(CompSpec) -> - rvi_common:request(service_discovery, ?MODULE, - get_local_addresses, [], [status], CompSpec). - -get_remote_addresses(CompSpec) -> +get_services(CompSpec) -> rvi_common:request(service_discovery, ?MODULE, - get_remote_addresses, [], [status], CompSpec). + get_services, [], [status, services], CompSpec). -resolve_local_service(CompSpec, RawService) -> - rvi_common:request(service_discovery, ?MODULE, resolve_local_service, - [{ service, RawService }], - [status, full_service], CompSpec). - -resolve_remote_service(CompSpec, RawService) -> - rvi_common:request(service_discovery, ?MODULE, resolve_remote_service, - [{ service, RawService }], - [status], CompSpec). - -register_remote_services(CompSpec, DataLinkModule, Address, Services) -> - rvi_common:notification(service_discovery, ?MODULE, register_remote_services, - [{ data_link_module, DataLinkModule }, - { address, Address }, - { services, Services }], +register_services(CompSpec, Services, DataLinkModule) -> + rvi_common:notification(service_discovery, ?MODULE, register_services, + [{ services, Services }, + { data_link_module, DataLinkModule }], CompSpec). - -register_local_service(CompSpec, Address, Services) -> - rvi_common:request(service_discovery, ?MODULE, register_local_service, - [{ address, Address }, - { service, Services }], - [status], CompSpec). - - -unregister_remote_services_by_address(CompSpec, DataLinkModule, Address) -> - rvi_common:notification(service_discovery, ?MODULE, unregister_remote_services_by_address, +unregister_services(CompSpec, Services, DataLinkModule) -> + rvi_common:notification(service_discovery, ?MODULE, unregister_service, [{ data_link_module, DataLinkModule}, - { address, Address }], - CompSpec). - -unregister_remote_services_by_name(CompSpec, Service) -> - rvi_common:notification(service_discovery, ?MODULE, unregister_remote_services_by_name, - [{ services, Service }], - CompSpec). + { services, Services }], + CompSpec). -unregister_local_service(CompSpec, Service) -> - rvi_common:notification(service_discovery, ?MODULE, unregister_local_service, - [{ service, Service }], +subscribe(CompSpec, Service, SubscribingMod) -> + rvi_common:notification(service_discovery, ?MODULE, subscribe, + [ { service, Service }], + [ { subscribing_module, SubscribingMod }], CompSpec). - -subscribe_to_service_availability(Service, Module) -> - rvi_common:notification(service_discovery, ?MODULE, subscribe_to_service_availability, - [{ service, Service }], - [{ module, Module }], - CompSpec). - -unsubscribe_from_service_availability(Service, Module) -> +unsubscribe(CompSpec, Service, SubscribingMod) -> rvi_common:notification(service_discovery, ?MODULE, unsubscribe_from_service_availability, - [{ service, Service }], - [{ module, Module }], + [ { service, Service }], + [ { subscribing_module, SubscribingMod }], CompSpec). @@ -157,162 +106,55 @@ unsubscribe_from_service_availability(Service, Module) -> %% Called by local exo http server %% Register remote services -handle_notification("register_remote_services", Args) -> +handle_notification("register_services", Args) -> {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), - gen_server:cast(?SERVER, { rvi, register_remote_services, - [ Services, DataLinkModule, Address ]}), + gen_server:cast(?SERVER, { rvi, register_services, + [ Services, DataLinkModule ]}), ok; - -handle_notification("unregister_remote_services_by_address", Args) -> - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), - gen_server:cast(?SERVER, { rvi, unregister_remote_services_by_address, - [ DataLinkModule, Address ]}), - ok; - - -handle_notification("unregister_remote_services_by_name", Args) -> +handle_notification("unregister_services", Args) -> {ok, Services} = rvi_common:get_json_element(["services"], Args), - gen_server:cast(?SERVER, { rvi, unregister_remote_service_by_Name, - [ Services ]}), - ok; - - - -handle_notification("unregister_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - %% De-register service - gen_server:cast(?SERVER, { rvi, unregister_local_service, - [ Service ]}), + {ok, DataLinkModule } = rvi_common:get_json_element(["data_link_module"], Args), + gen_server:cast(?SERVER, { rvi, unregister_services, + [ Services, DataLinkModule ]}), ok; -handle_notification( Other, _Args) -> - ?info("service_discovery_rpc:handle_notification(~p): unknown", [ Other ]), - ok. - -handle_notification("subscribe_to_service_availability", Args) -> +handle_notification("subscribe", Args) -> {ok, Service } = rvi_common:get_json_element(["service"], Args), - {ok, Module } = rvi_common:get_json_element(["module"], Args), + {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args), %% De-register service - gen_server:cast(?SERVER, { rvi, subscribe_to_service_availability, + gen_server:cast(?SERVER, { rvi, subscribe, [ Service, Module ]}), ok; handle_notification("unsubscribe_from_service", Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, Module } = rvi_common:get_json_element(["module"], Args), + {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args), %% De-register service gen_server:cast(?SERVER, { rvi, unsubscribe_from_service, [ Service, Module ]}), ok; +handle_notification( Other, _Args) -> + ?info("service_discovery_rpc:handle_notification(~p): unknown", [ Other ]), + ok. -%% Register local services -handle_rpc("register_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), - - [ok, FullSvcName] = gen_server:call(?SERVER, { rvi, register_local_service, - [ Service, Address ]}), - - {ok, [ {status, rvi_common:json_rpc_status(ok) }, { full_service_name, FullSvcName }]}; - - -%% -%% Get remote services -%% -handle_rpc("get_remote_services", _Args) -> - [ok, Services ] = gen_server:call(?SERVER, { rvi, get_remote_services, - [ ]}), - JSONServices = convert_json_services(Services), - - {ok, [ {status, rvi_common:json_rpc_status(ok)} , - { services, { array, JSONServices } }]}; - %% %% Get all services %% -handle_rpc("get_all_services", _Args) -> - ?debug("service_discovery_rpc:get_all_services(json-rpc)"), - [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services, - []}), - ?debug("service_discovery_rpc:Done"), +handle_rpc("get_services", _Args) -> + ?debug("service_discovery_rpc:get_services(json-rpc)"), + [ok, Services ] = gen_server:call(?SERVER, { rvi, get_services, []}), {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]}; -%% -%% Get remote network addresses -%% -handle_rpc("get_remote_addresses", _Args) -> - [ok, Addresses ] = gen_server:call(?SERVER, { rvi, get_remote_addresses, - []}), - {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]}; - -%% -%% Resolve remote service -%% -handle_rpc("resolve_remote_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - - case gen_server:call(?SERVER, { rvi, resolve_remote_service, - [Service]}) of - [ok, Addresses ] -> - {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]}; - - [ Other ] -> {ok, [ {status, rvi_common:json_rpc_status(Other)} ]} - end; - - - -%% -%% Resolve local service -%% -handle_rpc("resolve_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - - case gen_server:call(?SERVER, { rvi, resolve_local_service, - [Service]}) of - [ok, Addresses ] -> - {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]}; - - [ Other ] -> {ok, [ {status, rvi_common:json_rpc_status(Other)} ]} - end; - - - - -%% -%% Get local services -%% -handle_rpc("get_local_services", _Args) -> - [ok, LocalServices ] = - gen_server:call(?SERVER, { rvi, get_local_services, []}), - - JSONServices = convert_json_services(LocalServices), - - {ok, [ {status, rvi_common:json_rpc_status(ok)} , - { services, { array, JSONServices }}]}; - - - -%% -%% Get local network addresses -%% -handle_rpc("get_local_addresses", _Args) -> - [ok, LocalAddresses ] = - gen_server:call(?SERVER, { rvi, get_local_addresses, []}), - - {ok, [ {status, rvi_common:json_rpc_status(ok)} , { addresses, { array, LocalAddresses }}]}; - %% @@ -323,99 +165,14 @@ handle_rpc( Other, _Args) -> {ok, [ { status, invalid_command } ]}. -%% Handle calls received through regular gen_server calls, routed by -%% rvi_common:request() - -handle_call({rvi, register_local_service, [Service, Address] }, _From, St) -> - ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", - [Service, Address]), - - FullSvcName = rvi_common:local_service_to_string(Service), - - ets:insert(?LOCAL_SERVICE_TABLE, - #service_entry { - service = FullSvcName, - address = Address - }), - - notify_subscribers(available, Service, local, Address), - {reply, [ ok, FullSvcName ], St }; - - -handle_call({rvi, resolve_remote_service, [RawService]}, _From, St) -> - Service = rvi_common:sanitize_service_string(RawService), - ?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]), - ?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]), - case resolve_service_(?REMOTE_SERVICE_TABLE, Service) of - {ok, Address } -> - {reply, [ok, Address ], St}; - - not_found -> - ?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. " - "Trying static nodes", - [Service]), - - - %% Check if this is a service residing on the backend server - case rvi_common:get_static_node(Service) of - not_found -> %% Not found - ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.", - [Service]), - - {reply, [not_found], St }; - - Address -> %% Found - ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p", - [Service, Address]), - - {reply, [ok, Address ], St} - end - end; - -handle_call({rvi, get_remote_services, _Args}, _From, St) -> - Services = get_services_(?REMOTE_SERVICE_TABLE), - {reply, [ok, Services ], St }; -handle_call({rvi, get_all_services, _Args}, _From, St) -> - RemoteSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> - [ ServiceName | Acc ] end, - [], ?REMOTE_SERVICE_TABLE), +handle_call({rvi, get_services, _Args}, _From, St) -> + Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> + [ ServiceName | Acc ] end, + [], ?SERVICE_TABLE), - LocalSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> - [ ServiceName | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - Services = RemoteSvc++LocalSvc, - - {reply, [ok, Services], St }; - - -handle_call({rvi, get_remote_addresses, _Args}, _From, St) -> - Addresses = get_addresses_(?REMOTE_ADDRESS_TABLE), - {reply, [ ok, Addresses ], St }; - - -handle_call({rvi, resolve_local_service, [RawService]}, _From, St) -> - Service = rvi_common:sanitize_service_string(RawService), - ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]), - ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]), - - case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of - not_found -> - { reply, [not_found], St }; - - {ok, Address } -> - { reply, [ok, Address], St} - end; - -handle_call({rvi, get_local_services, _Args}, _From, St) -> - Services = get_services_(?LOCAL_SERVICE_TABLE), - {reply, [ok, Services ], St }; - -handle_call({rvi, get_local_addresses, _Args}, _From, St) -> - Addresses = get_addresses_(?LOCAL_SERVICE_TABLE), - {reply, [ ok, Addresses ], St }; + {reply, [ok, Svcs], St }; handle_call(Other, _From, St) -> @@ -423,195 +180,79 @@ handle_call(Other, _From, St) -> { reply, [unknown_command] , St}. -handle_cast({rvi, subscribe_to_service_availability, [Service, Module] }, St) -> - case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of + +handle_cast({rvi, subscribe, [Service, SubsMod] }, St) -> + case ets:lookup(?NOTIFICATION_TABLE, Service) of '$end_of_table' -> %% Insert new entry. - ets:insert(?NOTIFICATION_SUBS_TABLE, + ets:insert(?NOTIFICATION_TABLE, #notification_subs { service = Service, - modules = [Module] }); + modules = [SubsMod] }); - #notification_subs { modules = Modules } -> + #notification_subs { modules = SubsMods } -> %% Replace existing entry - ets:insert(?NOTIFICATION_SUBS_TABLE, + ets:insert(?NOTIFICATION_TABLE, #notification_subs { service = Service, - modules = lists:usort([ Module | Modules ]) }), + modules = lists:usort([ SubsMod | SubsMods ]) }) end, - notify_on_existing_service(Service, Module) + notify_on_existing_service(St#st.cs, Service, SubsMod), { noreply, St}; -handle_cast({rvi, unsubscribe_to_service_availability, [Service, Module] }, St) -> - case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of + +handle_cast({rvi, unsubscribe, [Service, SubsMod] }, St) -> + case ets:lookup(?NOTIFICATION_TABLE, Service) of '$end_of_table' -> %% No match. ok; - #notification_subs { modules = Modules } -> + #notification_subs { modules = SubsMods } -> %% Replace existing entry - ets:insert(?NOTIFICATION_SUBS_TABLE, + ets:insert(?NOTIFICATION_TABLE, #notification_subs { service = Service, - modules = Modules -- Module }) + modules = SubsMods -- SubsMod }) end, { noreply, St}; -handle_cast({rvi, register_remote_services, [Services, DataLinkModule, Address] }, St) -> - case Services of - %% We have zero services associated with address. - %% Just register the address. - [] -> - ?info("service_discovery_rpc:register_remote_service_(): service(n/a) -> ~p", - [Address]), - - ets:insert(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = "", - address = { Address, DataLinkModule } - }), - - dump_table(?REMOTE_ADDRESS_TABLE), - ok; - %% Loop through the services and register them. - _ -> - lists:map(fun(Svc) -> - register_remote_service_(Svc, - DataLinkModule, - Address) - end, Services), - - notify_subscribers(available, Service, DataLinkModule, Address), - - %% Forward to scheduler now that we have updated our own state - schedule_rpc:register_remote_services(St#st.cs, Address, Services), - - %% Forward to service edge so that it can inform its locally - %% connected services. - %% Build a list of all our local services' addresses to provide - %% to service edge so that it knows where to send the, - LocalSvcAddresses = - ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) -> - [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - %% Call service edge with local addresses (sorted and de-duped) and - %% the services to register. - service_edge_rpc: - register_remote_services(St#st.cs, - Services, - lists:usort(LocalSvcAddresses)) - end, - {noreply, St }; +%% Handle calls received through regular gen_server calls, routed by +%% rvi_common:request() +handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) -> + ?info("service_discovery_rpc:register_service(): ~p:~p", + [DataLinkModule, Services]), + [ register_single_service_(SvcName, DataLinkModule) || SvcName <- Services], -%% -%% Delete all services registered under the given address -%% -handle_cast({rvi, unregister_remote_services_by_address, [DataLinkModule, Address]}, St) -> - - %% Retrieve all services associated with the remote address - Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, { Address, DataLinkModule }), - - %% We now have a list of service records, convert them to a list of service - %% names and send them of to schedule for deregistration - AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) -> - [SvcName | Acc] - end, [], Svcs), - - ?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p", - [Address, AllSvcNames]), - - %% We need to filter AllSvcNames to remove all service entries that have - %% been registered under another name. - %% We do this by creating a list of all matching entries associated - %% with a network address not matching the disconnected Address - %% - %% See issue https://github.com/PDXostc/rvi/issues/14 for details - FilterSvc = - lists:foldr( - fun(Service, Acc) -> - %% Lookup the service in the service table. - case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of - - %% Not found. Do not filter out. - [] -> - Acc; - - %% We found or own entry, tiet to the disconnected address. - %% Do not add to addresses to be removed. - [ #service_entry { address = { Address, DataLinkModule} } ] -> - Acc; - - %% We found an entry that does not the disconnected - %% network address. This one should be filtered out - [ _ ] -> - [ Service | Acc ] - - end - end, [], AllSvcNames), - - SvcNames = AllSvcNames -- FilterSvc, + %% Notify all subscribers + notify_subscribers(St#st.cs, + available, + Services, + DataLinkModule), - ?info("service_discovery_rpc:unregister_remote_services_by_address(): " - "Resurrected services: ~p", - [FilterSvc]), - - %% Delete any addresses stored with an empty service name, - %% installed with register_remote_service/1, since we now have at - %% least one service name. - ets:match_delete(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = [], - address = { Address, DataLinkModule} - }), + {noreply, St }; - ets:delete(?REMOTE_ADDRESS_TABLE, {Address, DataLinkModule}), - %% Go through all service names and unregister them with schedulew - %% and service edge - case SvcNames of - %% Nothing to do. - [] -> - true; - _ -> - notify_subscribers(unavailable, SvcNames, DataLinkModule, Address), - - %% Tell scheduler to kill off services - schedule_rpc:unregister_remote_services(St#st.cs, SvcNames), - - %% Delete all services from remote service table. - [ ets:delete(?REMOTE_SERVICE_TABLE, Svc#service_entry.service) || Svc <- Svcs ], - - %% Forward to service edge so that it can inform its locally - %% connected services. - %% Build a list of all our local services' addresses to provide - %% to service edge so that it knows where to send the, - LocalSvcAddresses = - ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) -> - [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - %% Call service edge with local addresses (sorted and - %% de-duped) and the services to register. - service_edge_rpc:unregister_remote_services(St#st.cs, - SvcNames, - lists:usort(LocalSvcAddresses)) +%% Handle calls received through regular gen_server calls, routed by +%% rvi_common:request() +handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) -> - end, - {noreply, St }; + ?info("service_discovery_rpc:unregister_service(): ~p:~p", + [DataLinkModule, Services]), -handle_cast({rvi, unregister_remote_services_by_name, [Services]}, St) -> - unregister_remote_services_by_name_(St#st.cs, Services), - {noreply, St }; + [ unregister_single_service_(SvcName, DataLinkModule) || SvcName <- Services], + + %% Notify all subscribers + notify_subscribers(St#st.cs, + unavailable, + Services, + DataLinkModule), -handle_cast({rvi, unregister_local_service, [Service]}, St) -> - ?info("service_discovery_rpc:unregister_local_service(): ~p", - [Service]), - ets:delete(?LOCAL_SERVICE_TABLE, Service), {noreply, St }; + + handle_cast(Other, St) -> ?warning("service_discovery_rpc:handle_cast(~p): unknown", [ Other ]), {noreply, St}. @@ -628,151 +269,110 @@ code_change(_OldVsn, St, _Extra) -> %% %% INTERNAL SUPPORT FUNCTIONS %% -dump_table(_Table, '$end_of_table') -> - true; +%% dump_table(_Table, '$end_of_table') -> +%% true; -dump_table(Table, Key) -> - Val = ets:lookup(Table, Key), - ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]), - dump_table(Table, ets:next(Table, Key)). +%% dump_table(Table, Key) -> +%% Val = ets:lookup(Table, Key), +%% ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]), +%% dump_table(Table, ets:next(Table, Key)). -dump_table(Table) -> - dump_table(Table, ets:first(Table)). +%% dump_table(Table) -> +%% dump_table(Table, ets:first(Table)). -register_remote_service_(Service, DataLinkModule, Address) -> - ?info("service_discovery_rpc:register_remote_service_(): service(~p) -> ~p", - [Service, Address]), +register_single_service_(Service, DataLinkModule) -> + ?info("service_discovery_rpc:register_remote_service_(~p:~p)", + [DataLinkModule,Service]), - FullSvcName = rvi_common:remote_service_to_string(Service), - - ets:insert(?REMOTE_SERVICE_TABLE, - #service_entry { - service = FullSvcName, - address = { Address, DataLinkModule } - }), - - %% Delete any addresses stored with an empty service name, - %% installed with register_remote_service/1, since we now have at - %% least one service name. - ets:match_delete(?REMOTE_ADDRESS_TABLE, + %% Delete any previous instances of the given entry, in case + %% the service registers multiple times + ets:match_delete(?MODULE_TABLE, #service_entry { - service = [], - address = { Address, DataLinkModule} + service = Service, + data_link_mod = DataLinkModule }), - %% Delete any previous instances of the given entry, in case - %% the service registers multiple times - ets:match_delete(?REMOTE_ADDRESS_TABLE, + ets:match_delete(?SERVICE_TABLE, #service_entry { - service = FullSvcName, - address = { Address , DataLinkModule } + service = Service, + data_link_mod = DataLinkModule }), - %% Insert new element - ets:insert(?REMOTE_ADDRESS_TABLE, + ets:insert(?SERVICE_TABLE, + #service_entry { + service = Service, + data_link_mod = DataLinkModule + }), + + ets:insert(?MODULE_TABLE, #service_entry { - service = FullSvcName, - address = { Address, DataLinkModule} + service = Service, + data_link_mod = DataLinkModule }), - {ok, FullSvcName}. + + ok. -unregister_single_remote_service_by_name_(CompSpec, Service) -> - ?info("service_discovery_rpc:unregister_single_remote_service_by_name_(): ~p", - [Service]), +unregister_single_service_(Service, DataLinkModule) -> + ?info("service_discovery_rpc:unregister_single_service_(): ~p:~p", + [DataLinkModule, Service]), + %% Delete any service table entries with a matching Service. + ets:match_delete(?SERVICE_TABLE, + #service_entry { + service = Service, + data_link_mod = DataLinkModule + }), %% Delete any remote address table entries with a matching Service. - ets:match_delete(?REMOTE_ADDRESS_TABLE, + ets:match_delete(?MODULE_TABLE, #service_entry { service = Service, - address = '_' + data_link_mod = DataLinkModule }), - ets:delete(?REMOTE_SERVICE_TABLE, Service), - After = ets:foldl(fun(#service_entry { service = Svc }, Acc) -> - [ Svc | Acc ] end, - [], ?REMOTE_SERVICE_TABLE), - - ?debug("Ater removing ~p: ~p", [ Service, After ]), - - %% Forward to service edge so that it can inform its locally - %% connected services. - %% Build a list of all our local services' addresses to provide - %% to service edge so that it knows where to send the, - LocalSvcAddresses = - ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) -> - [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - %% Call service edge with local addresses (sorted and de-duped) and - %% the services to register. - notify_subscribers(unavailable, [Service], fixme, fixme), - - service_edge_rpc:unregister_remote_services(CompSpec, - [[Service]], - lists:usort(LocalSvcAddresses)), - - ok. +%% +%% Return all modules that can currently route the provided service. +%% +get_modules_by_service_(Service) -> -%% Loop through multiple services and remove them one by one -unregister_remote_services_by_name_(CompSpec, Services) -> - [ unregister_single_remote_service_by_name_(CompSpec, Svc) || Svc <- Services], - ok. - - - -get_services_(Table) -> - Services = ets:foldl(fun(#service_entry {service = ServiceName, - address = ServiceAddr}, Acc) -> - [ { ServiceName, ServiceAddr } | Acc ] end, - [], Table), - - ?debug("service_discovery_rpc:get_services_(): ~p", [ Services ]), - Services. + ModMatch = ets:lookup(?SERVICE_TABLE, Service), + + ModNames = lists:foldl(fun(#service_entry { + data_link_mod = Mod + }, Acc) -> + [ Mod | Acc ] + end, [], ModMatch), + + -get_addresses_(Table) -> - AddrList = ets:foldl(fun(#service_entry {address = Addr}, Acc) - when Addr =:= unavailable -> - Acc; %% Don't report if service is not active + ?debug("service_discovery_rpc:get_modules_by_service_(): ~p -> ~p", [ Service, ModNames ]), + ModNames. - %% We have an active network address - (#service_entry {address = Addr}, Acc) -> - %% Avoid duplicates - case lists:keyfind(Addr, 1, Acc) of - false ->[ Addr | Acc ]; - _ -> Acc - end - end, [], Table), - %% Return a dup-scrubbed list. - Addresses = sets:to_list(sets:from_list(AddrList)), - ?debug("service_discovery_rpc:get_addresses(~p): ~p", [ Table, Addresses ]), - Addresses. +get_services_by_module_(Module) -> + SvcMatch = ets:lookup(?MODULE_TABLE, Module), + + SvcNames = lists:foldl(fun(#service_entry { + service = Svc + }, Acc) -> + [ Svc | Acc ] + end, [], SvcMatch), + + -resolve_service_(Table, Service) -> - case ets:lookup(Table, Service) of - %% We found a service entry, report it back - [#service_entry { address = Address }] -> - ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> ~p", - [ Table, Service, Address ]), - {ok, Address }; + ?debug("service_discovery_rpc:get_services_by_module_(): ~p -> ~p", [ Module, SvcNames ]), + SvcNames. - %% We did not find a service entry, check statically configured nodes. - [] -> - ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> Not Found", - [ Table, Service ]), - not_found - end. @@ -790,17 +390,12 @@ convert_json_services(Services) -> JSONServices. -get_subscribers([], St) -> - []; - - %% jlr.com/abc -notify_subscribers(_Available, [], _DataLinkModule, _Address) -> +notify_subscribers(_CompSpec, _Available, [], _DataLinkModule) -> ok; - -notify_subscribers(Available, [ Service | Rem], DataLinkModule, Address) -> - %% ?NOTIFICATION_SUBS_TABLE contains services that +notify_subscribers(CompSpec, Available, [ Service | Rem], DataLinkModule) -> + %% ?NOTIFICATION_TABLE contains services that %% should be matched against the given service. %% If there is a match, we notify the associated module @@ -810,31 +405,21 @@ notify_subscribers(Available, [ Service | Rem], DataLinkModule, Address) -> end, %% Retrieve all modules subscribing to a specific service. - case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of - '$end_of_table' -> + case ets:lookup(?NOTIFICATION_TABLE, Service) of + [] -> ok; - #notify_subscribers { modules = Modules } -> + [#notification_subs { modules = Modules }] -> %% Notify each subscriber of the given service. - [ Module:Fun(Service, DataLinkModule, Address) || Module <- Modules], + [ Module:Fun(CompSpec, Service, DataLinkModule) || Module <- Modules], ok end, %% Move on to remaining subscribers. - notify_subscribers(Address, Rem, Address). - + notify_subscribers(CompSpec, Available, Rem, DataLinkModule). -notify_on_existing_service(Service, Module) -> - case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of - not_found -> - case resolve_service(?REMOTE_SERVICE_TABLE, Service) of - not_found -> not_found; - { ok, { DataLinkModule, Address }} -> - Module:service_available(Service, DataLinkModule, Address), - ok - end; - - {ok, Address } -> - Module:service_available(Service, local, Address) - end +notify_on_existing_service(CompSpec, Service, SubsMod) -> + Modules = ets:lookup(?SERVICE_TABLE, Service), + [ SubsMod:service_available(CompSpec, Service, Mod) || Mod <- Modules], + ok. diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index b101921..bdd32ac 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -27,14 +27,17 @@ terminate/2, code_change/3]). --export([register_remote_services/3, - unregister_remote_services/3, - handle_remote_message/7, +-export([handle_remote_message/7, handle_local_timeout/3]). -export([start_json_server/0, start_websocket/0]). +%% Invoked by service discovery +%% FIXME: Should be rvi_service_discovery behavior +-export([service_available/3, + service_unavailable/3]). + %%-include_lib("lhttpc/include/lhttpc.hrl"). -include_lib("lager/include/log.hrl"). @@ -49,6 +52,14 @@ cs = #component_spec{} }). +-define(SERVICE_TABLE, rvi_local_services). + +-record(service_entry, { + service = "", %% Servie handled by this entry. + url = undefined %% URL where the service can be reached. + }). + + start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -65,6 +76,9 @@ init([]) -> ?notice("---- Service Edge URL: ~p", [ URL ]), ?notice("---- Node Service Prefix: ~s", [ rvi_common:local_service_prefix()]), + ets:new(?SERVICE_TABLE, [ set, public, named_table, + { keypos, #service_entry.service }]), + {ok, #st { cs = CompSpec }}. @@ -85,9 +99,6 @@ start_json_server() -> end. - - - start_websocket() -> %% %% Fire up the websocket subsystem, if configured @@ -114,223 +125,82 @@ start_websocket() -> end. -%% Invoked by schedule_rpc. -%% A message originated from a locally connected service -%% has timed out -handle_local_timeout(CompSpec, Service, TransID) -> - rvi_common:notification(service_edge, ?SERVER, handle_local_timeout, - [ { service, Service}, - { transaction_id, TransID} ], - CompSpec). - - -register_remote_services(CompSpec, Service, LocalServiceAddresses) -> - rvi_common:notification(service_edge, ?SERVER, register_remote_services, - [ { service, Service } , - { local_service_addresses, LocalServiceAddresses } ], - CompSpec). - - -unregister_remote_services(CompSpec, Services, LocalServiceAddresses) -> - rvi_common:notification(service_edge, ?SERVER, unregister_remote_services, - [ { services, Services }, - { local_service_addresses, LocalServiceAddresses }], - CompSpec). - -%% -%% Handle a message, delivered from a remote node through protocol, that is -%% to be forwarded to a locally connected service. -%% -handle_remote_message(CompSpec, ServiceName, Timeout, NetworkAddress, - Parameters, Signature, Certificate) -> - rvi_common:notification(service_edge, ?SERVER, handle_remote_message, - [ { service, ServiceName }, - { timeout, Timeout }, - { network_address, NetworkAddress }, - { parameters, Parameters }, - { signature, Signature }, - { certificate, Certificate } ], - CompSpec). - - - - - - -%% Announces the services listed in Services with all -%% local services listed under LocalServices -%% SkipAddress is a single address that, if found in LocalServiceAddresses, -%% will not receive an announcement. -%% This is to avoid that a local service registering itself will get a callback -%% about its own availability. - -announce_service_availability(Cmd, LocalServiceAddresses, Services) -> - announce_service_availability(Cmd, LocalServiceAddresses, Services, undefined). - -announce_service_availability(Cmd, LocalServiceAddresses, Services, SkipAddress) -> - ?info("service_edge_rpc:announce_service_availability(~p, ~p, ~p): Called.", - [ Cmd, LocalServiceAddresses, Services ]), - - lists:map(fun(LocalServiceAddress) when LocalServiceAddress =:= SkipAddress -> - ok; - - (LocalServiceAddress) -> - dispatch_to_local_service(LocalServiceAddress, Cmd, - {struct, [ { services, { array, [Services ]}}]}) - end, LocalServiceAddresses), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ] }. - - - -%% -%% Depending on the format of NetworkAddress -%% Dispatch to websocket or JSON-RPC server -%% FIXME: Should be a pluggable setup where -%% different dispatchers are triggered depending -%% on prefix in NetworkAddress -%% -flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) -> - flatten_ws_args( List ++ T, Acc); +%% Invoked by service_discovery to announce service availability +%% Must be handled either as a JSON-RPC call or a gen_server call. +service_available(CompSpec, Service, DataLinkModule) -> + rvi_common:notify(service_edge, ?MODULE, + service_available, + [{ service, Service }, + { data_link_module, DataLinkModule }], CompSpec). -flatten_ws_args([{ Key, Val}| T], Acc ) -> - NKey = case is_atom(Key) of - true -> atom_to_list(Key); - false -> Key - end, - NVal = flatten_ws_args(Val), +service_unavailable(CompSpec, Service, DataLinkModule) -> + rvi_common:notify(service_edge, ?MODULE, + service_unavailable, + [{ service, Service }, + { data_link_module, DataLinkModule }], CompSpec). - flatten_ws_args(T, [ NKey, NVal] ++ Acc); -flatten_ws_args([], Acc) -> - Acc; - +%% Websocket interface +wse_register_service(Ws, Service ) -> + ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]), + gen_server:call(?SERVER, { rvi, register_local_service, [ Service, "ws:" ++ pid_to_list(Ws)]}), + { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. -flatten_ws_args(Other, []) -> - Other; +wse_unregister_service(Ws, Service ) -> + ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, Service ]), + gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service ]}), + { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. -flatten_ws_args(Other, Acc) -> - [ Other | Acc ]. +wse_get_available_services(_Ws ) -> + ?debug("service_edge_rpc:wse_get_available_services()"), + [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), + { ok, [ { status, rvi_common:json_rpc_status(ok)}, + { services, Services}] }. -flatten_ws_args(Args) -> - flatten_ws_args(Args, []). +wse_message(Ws, ServiceName, Timeout, JSONParameters) -> + %% Parameters are delivered as JSON. Decode into tuple + { ok, Parameters } = exo_json:decode_string(JSONParameters), + ?debug("service_edge_rpc:wse_message(~p) ServiceName: ~p", [ Ws, ServiceName ]), + ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]), + ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]), -dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, - [{ services, Services}] ) -> - ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "services_available", - [ "services", Services ]), - ok; + [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, + [ ServiceName, Timeout, Parameters]}), -dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, - [{ services, Services}] ) -> - ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "services_unavailable", - [ "services", Services ]), - ok; + { ok, [ { status, rvi_common:json_rpc_status(Res) }, + { transaction_id, TID} ] }. -dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, - [{ service_name, SvcName}, { parameters, Args}] ) -> - ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]), - wse:call(list_to_pid(WSPidStr), wse:window(), - "message", - [ "service_name", SvcName ] ++ flatten_ws_args(Args)), - ok; +%% Deprecated +wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) -> + wse_message(Ws, ServiceName, Timeout, JSONParameters). -%% Dispatch to regular JSON-RPC over HTTP. -dispatch_to_local_service(NetworkAddress, Command, Args) -> - CmdStr = atom_to_list(Command), - Res = rvi_common:send_json_request(NetworkAddress, CmdStr, Args), - ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]), - ?debug("dispatch_to_local_service(): Args: ~p",[ Args]), - ?debug("dispatch_to_local_service(): Result: ~p",[ Res]), - Res. -%% Forward a message to a specific locally connected service. -%% Called by forward_message_to_local_service/2. +%% Invoked by locally connected services. +%% Will always be routed as JSON-RPC since that, and websocket, +%% are the only access paths in. %% -forward_message_to_local_service(ServiceName, NetworkAddress, Parameters, _CompSpec) -> - ?debug("service_edge:forward_to_local(): URL: ~p", [NetworkAddress]), - ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), - - %% - %% Strip our node prefix from service_name so that - %% the service receiving the JSON rpc call will have - %% a service_name that is identical to the service name - %% it registered with. - %% - SvcName = string:substr(ServiceName, - length(rvi_common:local_service_prefix())), - - %% Deliver the message to the local service, which can - %% be either a wse websocket, or a regular HTTP JSON-RPC call - case rvi_common:get_request_result( - dispatch_to_local_service(NetworkAddress, - message, - {struct, [ { service_name, SvcName }, - { parameters, Parameters }]})) of - - %% Request delivered. - %% -1 is transaction ID. - { ok, _Result } -> - [ ok, -1 ]; - - %% status returned was an error code. - { Other, _Result } -> - ?warning("service_edge:forward_to_local(): ~p:~p Failed: ~p.", - [NetworkAddress, ServiceName, Other]), - [not_found, -1]; - - Other -> - ?warning("service_edge:forward_to_local(): ~p:~p Unknown error: ~p.", - [NetworkAddress, ServiceName, Other]), - [internal, -1] - end. - -%% A message is to targeting a service that is connected to the local RVI -%% node. We can just bounce the messsage straight over to the target service. -forward_message_to_local_service(ServiceName, Parameters, CompSpec) -> - %% - %% Resolve the local service name to an URL that we can send the - %% request to - %% - ?debug("service_edge:forward_to_local(): service_name: ~p", [ServiceName]), - - case service_discovery_rpc:resolve_local_service(CompSpec, ServiceName) of - [ ok, NetworkAddress] -> - forward_message_to_local_service(ServiceName, NetworkAddress, Parameters, CompSpec); - - %% Local service could not be resolved to an URL - [ not_found ] -> - ?info("service_edge_rpc:forward_message_to_local() Not found: ~p", - [ ServiceName ]), - [not_found, -1]; - - Err -> - ?debug("service_edge_rpc:local_msg() Failed at service discovery: ~p", - [ Err ]), - [internal, -1] - end. - - - - handle_rpc("register_service", Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, Address} = rvi_common:get_json_element(["network_address"], Args), + {ok, URL} = rvi_common:get_json_element(["network_address"], Args), [ok, FullSvcName ] = gen_server:call(?SERVER, { rvi, register_local_service, - [ Service, Address]}), + [ Service, URL]}), {ok, [ {status, rvi_common:json_rpc_status(ok) }, { service, FullSvcName }]}; + +handle_rpc("unregister_service", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service]}), + {ok, [ { status, rvi_common:json_rpc_status(ok) }]}; + + handle_rpc("get_available_services", _Args) -> [ Status, Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), ?debug("get_available_services(): ~p ~p", [ Status, Services ]), @@ -344,25 +214,39 @@ handle_rpc("message", Args) -> {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, [ ServiceName, Timeout, Parameters]}), - io:format("MESSGE: ~p, ~p", [ Res, TID]), + {ok, [ { status, rvi_common:json_rpc_status(Res) }, { transaction_id, TID } ]}; - -handle_rpc("unregister_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service]}), - {ok, [ { status, rvi_common:json_rpc_status(ok) }]}; - handle_rpc(Other, _Args) -> ?warning("service_edge_rpc:handle_rpc(~p): unknown command", [ Other ]), {ok,[ { status, rvi_common:json_rpc_status(invalid_command)} ]}. +handle_notification("service_available", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + ?debug("service_edge:service_available(): service: ~p", [ Service]), + ?debug("service_edge:service_available(): data_link: ~p", [ DataLinkModule]), + + gen_server:cast(?SERVER, { rvi, service_available, + [ Service, DataLinkModule ]}), + + ok; +handle_notification("service_unavailable", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + ?debug("service_edge:service_unavailable(): service: ~p", [ Service]), + ?debug("service_edge:service_unavailable(): data_link: ~p", [ DataLinkModule]), + + gen_server:cast(?SERVER, { rvi, service_unavailable, + [ Service, DataLinkModule ]}), + + ok; + handle_notification("handle_remote_message", Args) -> { ok, ServiceName } = rvi_common:get_json_element(["service_name"], Args), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Args), - { ok, NetworkAddress } = rvi_common:get_json_element(["network_address"], Args), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Args), { ok, Certificate } = rvi_common:get_json_element(["certificate"], Args), { ok, Signature } = rvi_common:get_json_element(["signature"], Args), @@ -370,7 +254,6 @@ handle_notification("handle_remote_message", Args) -> [ ServiceName, Timeout, - NetworkAddress, Parameters, Certificate, Signature @@ -381,23 +264,6 @@ handle_notification("handle_remote_message", Args) -> -handle_notification("register_remote_services", Args) -> - {ok, Services} = rvi_common:get_json_element(["services"], Args), - {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args), - - gen_server:cast(?SERVER, { rvi, register_remote_services, - [ Services, LocalServiceAddresses]}), - - ok; - -handle_notification("unregister_remote_services", Args) -> - {ok, Services} = rvi_common:get_json_element(["services"], Args), - {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args), - - gen_server:cast(?SERVER, { rvi, unregister_remote_services, - [ Services, LocalServiceAddresses]}), - ok; - %% JSON-RPC entry point %% Called by local exo http server handle_notification("handle_local_timeout", Args) -> @@ -413,42 +279,6 @@ handle_notification(Other, _Args) -> ok. -%% Websocket iface -wse_register_service(Ws, Service ) -> - ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]), - gen_server:cast(?SERVER, { rvi, register_local_service, [ Service, "ws:" ++ pid_to_list(Ws)]}), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. - -wse_unregister_service(Ws, Service ) -> - ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, Service ]), - gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service ]}), - { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}. - - -wse_get_available_services(_Ws ) -> - ?debug("service_edge_rpc:wse_get_available_services()"), - [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), - { ok, [ { status, rvi_common:json_rpc_status(ok)}, - { services, Services}] }. - - -wse_message(Ws, ServiceName, Timeout, JSONParameters) -> - %% Parameters are delivered as JSON. Decode into tuple - { ok, Parameters } = exo_json:decode_string(JSONParameters), - ?debug("service_edge_rpc:wse_message(~p) ServiceName: ~p", [ Ws, ServiceName ]), - ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]), - ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]), - - [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ ServiceName, Timeout, Parameters]}), - - { ok, [ { status, rvi_common:json_rpc_status(Res) }, - { transaction_id, TID} ] }. - -%% Deprecated -wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) -> - wse_message(Ws, ServiceName, Timeout, JSONParameters). - @@ -458,28 +288,20 @@ wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) -> %% the only calls invoked by other components, and not the locally %% connected services that uses the same HTTP port to transmit their %% register_service, and message calls. -handle_call({ rvi, register_local_service, [Service, ServiceAddress] }, _From, St) -> +handle_call({ rvi, register_local_service, [Service, URL] }, _From, St) -> ?debug("service_edge_rpc:register_local_service(): service: ~p ", [Service]), - ?debug("service_edge_rpc:register_local_service(): address: ~p ", [ServiceAddress]), + ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), - [ok, FullSvcName ] = - service_discovery_rpc:register_local_service(St#st.cs, - Service, - ServiceAddress), - - SvcString = rvi_common:local_service_to_string(Service), + FullSvcName = rvi_common:local_service_to_string(Service), - %% Announce the new service to all connected nodes - data_link_bert_rpc_rpc: - announce_available_local_service(St#st.cs, SvcString), - - %% Retrieve addresses of all locally registered services. - [ ok, AnnounceAddresses ] = - service_discovery_rpc:get_local_network_addresses(St#st.cs), - - announce_service_availability(services_available, AnnounceAddresses, - [FullSvcName], ServiceAddress), + ets:insert(?SERVICE_TABLE, #service_entry { + service = FullSvcName, + url = URL }), + %% Register with service discovery, will trigger callback to service_available() + %% that forwards the registration to other connected services. + service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local), + %% Return ok. { reply, [ ok, FullSvcName ], St }; @@ -487,16 +309,12 @@ handle_call({ rvi, register_local_service, [Service, ServiceAddress] }, _From, S handle_call({ rvi, unregister_local_service, [Service] }, _From, St) -> ?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [Service]), - service_discovery_rpc:unregister_local_service(St#st.cs, Service), - data_link_bert_rpc_rpc: - announce_unavailable_local_service(St#st.cs, Service), + ets:delete(?SERVICE_TABLE, Service), - [ ok, AnnounceAddresses ] = service_discovery_rpc: - get_local_network_addresses(St#st.cs), - %% Send out an announcement to all locally connected services, but skip - %% the one that made the registration call - announce_service_availability(services_unavailable, AnnounceAddresses, Service), + %% Register with service discovery, will trigger callback to service_available() + %% that forwards the registration to other connected services. + service_discovery_rpc:unregister_services(St#st.cs, [Service], local), %% Return ok. { reply, [ ok ], St }; @@ -505,8 +323,7 @@ handle_call({ rvi, unregister_local_service, [Service] }, _From, St) -> handle_call({rvi, get_available_services, []}, _From, St) -> ?debug("service_edge_rpc:get_available_services()"), - {reply, service_discovery_rpc:get_all_services(St#st.cs), St}; - + {reply, service_discovery_rpc:get_services(St#st.cs), St}; handle_call({ rvi, handle_local_message, [ServiceName, Timeout, Parameters] }, _From, St) -> @@ -527,11 +344,11 @@ handle_call({ rvi, handle_local_message, %% Check if this is a local service by trying to resolve its service name. %% If successful, just forward it to its service_name. %% - case service_discovery_rpc:resolve_local_service(St#st.cs, ServiceName) of - [ ok, NetworkAddress] -> %% ServiceName is local. Forward message + case ets:lookup(?SERVICE_TABLE, ServiceName) of + [ #service_entry { url = URL } ] -> %% ServiceName is local. Forward message ?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."), - Res = forward_message_to_local_service(ServiceName, - NetworkAddress, + Res = forward_message_to_local_service(URL, + ServiceName, Parameters, St#st.cs), { reply, Res , St}; @@ -554,11 +371,20 @@ handle_call(Other, _From, St) -> { reply, [ invalid_command ], St}. +handle_cast({rvi, service_available, [Service, _DataLinkModule] }, St) -> + announce_service_availability(available, Service), + { noreply, St }; + + +handle_cast({rvi, service_unavailable, [Service, _DataLinkModule] }, St) -> + announce_service_availability(unavailable, Service), + { noreply, St }; + + handle_cast({rvi, handle_remote_message, [ ServiceName, Timeout, - NetworkAddress, Parameters, Certificate, Signature @@ -567,36 +393,33 @@ handle_cast({rvi, handle_remote_message, ?debug("service_edge:remote_msg(): service_name: ~p", [ServiceName]), ?debug("service_edge:remote_msg(): timeout: ~p", [Timeout]), ?debug("service_edge:remote_msg(): parameters: ~p", [Parameters]), - ?debug("service_edge:remote_msg(): parameters: ~p", [NetworkAddress]), ?debug("service_edge:remote_msg(): signature: ~p", [Signature]), ?debug("service_edge:remote_msg(): certificate: ~p", [Certificate]), - case - authorize_rpc:authorize_remote_message(St#st.cs, - ServiceName, - Certificate, - Signature) of - [ ok ] -> - forward_message_to_local_service(ServiceName, Parameters, St#st.cs), - { noreply, St}; - - - %% Authorization failed. - [ Err ] -> - ?warning(" service_edge:remote_msg(): Authorization failed: ~p. DROPPED", [Err]), - {noreply, St} - end; - -handle_cast({ rvi, register_remote_services, - [ Services, LocalServiceAddresses ]}, State) -> - announce_service_availability(services_available, LocalServiceAddresses, Services), - { noreply, State }; - -handle_cast({ rvi, unregister_remote_services, - [Services, LocalServiceAddresses]}, State) -> + %% Check if this is a local message. + case ets:lookup(?SERVICE_TABLE, ServiceName) of + [ #service_entry { url = URL }] -> %% This is a local message + case authorize_rpc:authorize_remote_message(St#st.cs, + ServiceName, + Certificate, + Signature) of + [ ok ] -> + forward_message_to_local_service(URL, ServiceName, + Parameters, St#st.cs), + { noreply, St}; + + [ _ ] -> + ?warning("service_entry:remote_msg(): Failed to authenticate ~p", + [ServiceName]), + { noreply, St} + end; + [] -> + ?notice("service_entry:remote_msg(): Service Disappeared ~p", + [ServiceName]), + { noreply, St} + + end; - announce_service_availability(services_unavailable, LocalServiceAddresses, Services), - { noreply, State }; handle_cast({ rvi, handle_local_timeout, [Service, TransactionID] }, St) -> %% FIXME: Should be forwarded to service. @@ -617,3 +440,170 @@ terminate(_Reason, _St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. + + + +%% Invoked by schedule_rpc. +%% A message originated from a locally connected service +%% has timed out +handle_local_timeout(CompSpec, Service, TransID) -> + rvi_common:notification(service_edge, ?SERVER, handle_local_timeout, + [ { service, Service}, + { transaction_id, TransID} ], + CompSpec). + +%% +%% Handle a message, delivered from a remote node through protocol, that is +%% to be forwarded to a locally connected service. +%% +handle_remote_message(CompSpec, ServiceName, Timeout, NetworkAddress, + Parameters, Signature, Certificate) -> + rvi_common:notification(service_edge, ?SERVER, handle_remote_message, + [ { service, ServiceName }, + { timeout, Timeout }, + { network_address, NetworkAddress }, + { parameters, Parameters }, + { signature, Signature }, + { certificate, Certificate } ], + CompSpec). + + + + + +%% +%% Depending on the format of NetworkAddress +%% Dispatch to websocket or JSON-RPC server +%% FIXME: Should be a pluggable setup where +%% different dispatchers are triggered depending +%% on prefix in NetworkAddress +%% + +flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) -> + flatten_ws_args( List ++ T, Acc); + + +flatten_ws_args([{ Key, Val}| T], Acc ) -> + NKey = case is_atom(Key) of + true -> atom_to_list(Key); + false -> Key + end, + + NVal = flatten_ws_args(Val), + + flatten_ws_args(T, [ NKey, NVal] ++ Acc); + +flatten_ws_args([], Acc) -> + Acc; + + +flatten_ws_args(Other, []) -> + Other; + +flatten_ws_args(Other, Acc) -> + [ Other | Acc ]. + + +flatten_ws_args(Args) -> + flatten_ws_args(Args, []). + + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, + [{ services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_available", + [ "services", Services ]), + ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, + [{ services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_unavailable", + [ "services", Services ]), + ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, + [{ service_name, SvcName}, { parameters, Args}] ) -> + ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "message", + [ "service_name", SvcName ] ++ flatten_ws_args(Args)), + ok; + +%% Dispatch to regular JSON-RPC over HTTP. +dispatch_to_local_service(URL, Command, Args) -> + CmdStr = atom_to_list(Command), + Res = rvi_common:send_json_request(URL, CmdStr, Args), + ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]), + ?debug("dispatch_to_local_service(): Args: ~p",[ Args]), + ?debug("dispatch_to_local_service(): URL: ~p",[ URL]), + ?debug("dispatch_to_local_service(): Result: ~p",[ Res]), + Res. + + +%% Forward a message to a specific locally connected service. +%% Called by forward_message_to_local_service/2. +%% +forward_message_to_local_service(URL,ServiceName, Parameters, _CompSpec) -> + ?debug("service_edge:forward_to_local(): URL: ~p", [URL]), + ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), + + %% + %% Strip our node prefix from service_name so that + %% the service receiving the JSON rpc call will have + %% a service_name that is identical to the service name + %% it registered with. + %% + SvcName = string:substr(ServiceName, + length(rvi_common:local_service_prefix())), + + %% Deliver the message to the local service, which can + %% be either a wse websocket, or a regular HTTP JSON-RPC call + case rvi_common:get_request_result( + dispatch_to_local_service(URL, + message, + {struct, [ { service_name, SvcName }, + { parameters, Parameters }]})) of + + %% Request delivered. + %% -1 is transaction ID. + { ok, _Result } -> + [ ok, -1 ]; + + %% status returned was an error code. + { Other, _Result } -> + ?warning("service_edge:forward_to_local(): ~p:~p Failed: ~p.", + [URL, ServiceName, Other]), + [not_found, -1]; + + Other -> + ?warning("service_edge:forward_to_local(): ~p:~p Unknown error: ~p.", + [URL, ServiceName, Other]), + [internal, -1] + end. + + +announce_service_availability(Available, Service) -> + Cmd = case Available of + available -> services_available; + unavailable -> services_unavailable + end, + + ets:foldl( + %% Notify if this is not the originating service. + fun(#service_entry { + service = ServiceEntry, + url = URL }, _Acc) when + ServiceEntry =/= Service -> + + dispatch_to_local_service(URL, Cmd, + {struct, [ { services, + { array, [Service]}}]}), + []; + + %% This is the originating service regsitering itself. Ignore. + (_, _) -> [] + + end, [], ?SERVICE_TABLE). diff --git a/python/rvi_call.py b/python/rvi_call.py index 9c8d1c2..efe54e6 100755 --- a/python/rvi_call.py +++ b/python/rvi_call.py @@ -15,10 +15,11 @@ import sys from rvilib import RVI import threading import time - +import getopt def usage(): - print "Usage:", sys.argv[0], " RVI-node service key=val ..." - print " RVI-node DNS name or IP of host running RVI" + print "Usage:", sys.argv[0], "[-n RVI-node] service key=val ..." + print " RVI-node DNS name or IP of host running RVI. " + print " default: http://localhost:8801" print " service Service to invoke in RVI." print " key=val Named arguments to provide to service." print @@ -32,21 +33,28 @@ def usage(): # # Check that we have the correct arguments # -if len(sys.argv) <3: - usage() +opts, args= getopt.getopt(sys.argv[1:], "n:") + -progname = sys.argv[0] -rvi_node = sys.argv[1] -service = sys.argv[2] -args = [] -i=3 +rvi_node = "http://localhost:8801" +for o, a in opts: + if o == "-n": + rvi_node = a + else: + usage() + +if len(args) < 1: + usage() # Construct a dictionary from the provided paths. -while i < len(sys.argv): - print sys.argv[i] - [k, v] = sys.argv[i].split('=') - args = args + [{ k: v}] - i = i + 1 +i = 0 +service = args[0] +rvi_args = [] +for i in args[1:]: + print i + [k, v] = i.split('=') + rvi_args = rvi_args + [{ k: v}] + @@ -56,17 +64,11 @@ while i < len(sys.argv): # rvi = RVI(rvi_node) - print "RVI Node: ", rvi_node print "Service: ", service -print "args: ", args +print "args: ", rvi_args # # Send the messge. # -rvi.message(service, args) - - - - - +rvi.message(service, rvi_args) diff --git a/python/rvi_get_services.py b/python/rvi_get_services.py index 70d32de..eeadcdc 100755 --- a/python/rvi_get_services.py +++ b/python/rvi_get_services.py @@ -20,7 +20,8 @@ def usage(): print "Return the name of all available services that can be reached" print "through an RVI node." print - print "Usage:", sys.argv[0], " RVI-node" + print "Usage:", sys.argv[0], " [RVI-node]" + print "Default RVI node is http://127.0.0.1:8801" print print "Example: ./callrvi.py http://rvi1.nginfotpdx.net:8801" print @@ -30,12 +31,15 @@ def usage(): # # Check that we have the correct arguments # -if len(sys.argv) <2: +if len(sys.argv) != 1 and len(sys.argv) != 2: usage() progname = sys.argv[0] -rvi_node = sys.argv[1] +if len(sys.argv) == 2: + rvi_node = sys.argv[1] +else: + rvi_node = "http://localhost:8801" # # Setup an outbound JSON-RPC connection to the backend RVI node diff --git a/python/rvi_service.py b/python/rvi_service.py index e1b705c..2c3271f 100755 --- a/python/rvi_service.py +++ b/python/rvi_service.py @@ -13,10 +13,12 @@ # import sys from rvilib import RVI +import getopt def usage(): - print "Usage:", sys.argv[0], "<rvi_url> <service_name>" - print " <rvi_url> URL of Service Edge on a local RVI node" + print "Usage:", sys.argv[0], "[-n <rvi_url>] <service_name>" + print " <rvi_url> URL of Service Edge on a local RVI node." + print " Default: http://localhost:8801" print " <service_name> URL of Service to register" print print "The RVI Service Edge URL can be found in" @@ -26,7 +28,7 @@ def usage(): print "The Service Edge URL is also logged as a notice when the" print "RVI node is started." print - print "Example: ./rvi_service.py http://rvi1.nginfotpdx.net:8801 /test/some_service" + print "Example: ./rvi_service.py /test/some_service http://rvi1.nginfotpdx.net:8801" sys.exit(255) @@ -68,11 +70,23 @@ def services_unavailable(**args): return ['ok'] -if len(sys.argv) != 3: +# +# Check that we have the correct arguments +# +opts, args= getopt.getopt(sys.argv[1:], "n:") + +rvi_node_url = "http://localhost:8801" +for o, a in opts: + if o == "-n": + rvi_node_url = a + else: + usage() + +if len(args) != 1: usage() -# Grab the URL to use -[ progname, rvi_node_url, service_name ] = sys.argv +service_name = args[0] + # Setup a connection to the local RVI node rvi = RVI(rvi_node_url) diff --git a/python/rvilib.py b/python/rvilib.py index f676298..647e333 100644 --- a/python/rvilib.py +++ b/python/rvilib.py @@ -73,6 +73,10 @@ class RVI(SimpleJSONRPCServer): # in an RVI network to access your service. # def register_service(self, service_name, function): + # Add a prefixing slash if necessary + if service_name[0] != '/': + service_name = '/' + service_name + # Register service_name within SimpleJSONRPCServer so that # when it gets invoked with the given URL suffic, it will call 'function'. # diff --git a/rebar.config b/rebar.config index ccdb50f..a67e822 100644 --- a/rebar.config +++ b/rebar.config @@ -8,8 +8,8 @@ {sub_dirs, ["rel", "components/rvi_common", "components/authorize", - "components/data_link_bert_rpc", - "components/protocol", + "components/dlink_tcp", + "components/proto_bert", "components/schedule", "components/service_discovery/", "components/service_edge" diff --git a/rvi_sample.config b/rvi_sample.config index b5738d3..9815697 100644 --- a/rvi_sample.config +++ b/rvi_sample.config @@ -47,8 +47,8 @@ service_discovery, authorize, schedule, - data_link_bert_rpc, - protocol ]}, + dlink_tcp, + proto_bert ]}, %% %% Custom environment settings @@ -150,7 +150,7 @@ %% node_address should be set to "0.0.0.0:0" to inform %% the remote node that it should not attempt to %% connect back to self. - { node_address, "127.0.0.1:8817" }, + { node_address, "127.0.0.1:8807" }, %% Specify the prefix of all services that this rvi node is hosting. %% @@ -189,11 +189,10 @@ { static_nodes, [ %% rvi1.nginfotpdx.net is the JLR hosted RVI server. - { "jlr.com/backend/", "38.129.64.13:8807" } + %% { "jlr.com/backend/", } ] }, - { routing_rules, [ %% Service name prefix that rules are specified for @@ -201,7 +200,7 @@ %% Which protocol and data link pair to use when transmitting the message %% to the targeted service. If a pair reports a failure, the next pair is tried. [ - { bert_rpc, wifi }, + { proto_bert_rpc, { dlink_tcp_rpc, [ { target, "38.129.64.13:8807" } ]}}, { bert_rpc, device_3g }, { bert_rpc, device_sms }, { joynr, wifi }, @@ -211,15 +210,17 @@ %% Used to communicate with vehicles { "jlr.com/vin/", - { bert_rpc, wifi }, - %% server_3g is augmented with hinting, provided to - { bert_rpc, { server_3g, [ initiate_outbound ]} }, - - %% Protocols can have hinting as well. - %% In this case bert_rpc should only be used if the - %% resulting message size can fit in an SMS (140 bytes). + [ + { proto_bert_rpc, { dlink_tcp_rpc, [ broadcast, { interface, "wlan0" } ] } }, + %% server_3g is augmented with hinting, provided to + { bert_rpc, { server_3g, [ initiate_outbound ]} }, + + %% Protocols can have hinting as well. + %% In this case bert_rpc should only be used if the + %% resulting message size can fit in an SMS (140 bytes). - { { bert_rpc, [ { max_msg_size, 140 } ] } , server_sms } + { { bert_rpc, [ { max_msg_size, 140 } ] } , server_sms } + ] } ] }, @@ -269,19 +270,19 @@ [ %% JSON-RPC address will be translated to %% an URL looking like this: - %% http://127.0.0.1:8811 + %% http://127.0.0.1:8801 %% %% This URL is used both for communication with %% locally connected services and for intra-component %% communication in case the access method for %% service_edge_rpc is specified as json_rpc. - { json_rpc_address, { "127.0.0.1", 8811 } }, + { json_rpc_address, { "127.0.0.1", 8801 } }, %% Websocket is used for websocket access, preferably %% through the rvi.js package available for Javascript %% apps in browsers and crosswalk who wants to interface %% RVI. - { websocket, [ { port, 8818}]} + { websocket, [ { port, 8808}]} ] } ] @@ -289,7 +290,7 @@ { service_discovery, [ { service_discovery_rpc, gen_server, [ - { json_rpc_address, { "127.0.0.1", 8812 }} + { json_rpc_address, { "127.0.0.1", 8802 }} ] } ] @@ -297,7 +298,7 @@ { schedule, [ { schedule_rpc, json_rpc, [ - { json_rpc_address, { "127.0.0.1", 8813 }} + { json_rpc_address, { "127.0.0.1", 8803 }} ] } ] @@ -305,29 +306,30 @@ { authorize, [ { authorize_rpc, gen_server, [ - { json_rpc_address, { "127.0.0.1", 8814 } } + { json_rpc_address, { "127.0.0.1", 8804 } } ] } ] }, { protocol, - [ { protocol_rpc, gen_server, + [ { proto_bert_rpc, gen_server, [ - { json_rpc_address, { "127.0.0.1", 8815 } } + { json_rpc_address, { "127.0.0.1", 8805 } } ] } ] }, { data_link, - [ { data_link_bert_rpc_rpc, gen_server, + [ { dlink_tcp_rpc, gen_server, [ - { json_rpc_address, { "127.0.0.1", 8816 } }, + { json_rpc_address, { "127.0.0.1", 8806 } }, %% Bert_rpc server specifies the port we should %% listen to for incoming connections %% from other rvi nodes. %% A specific NIC address can also be specified %% through the {ip, "192.168.0.1" } tuple. - { bert_rpc_server, [ { port, 8817 }]} + { bert_rpc_server, [ { port, 8807 }]}, + { persistent_connections, [ "38.129.64.13:8807" ]} ] } ] diff --git a/src/rvi.app.src b/src/rvi.app.src index 319d1e5..b79fe80 100644 --- a/src/rvi.app.src +++ b/src/rvi.app.src @@ -22,8 +22,8 @@ service_edge, service_discovery, authorize, - data_link_bert_rpc, - protocol + dlink_tcp, + proto_bert ]}, {mod, { rvi_app, []}} ]}. |