summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--components/dlink_tcp/priv/setup.config42
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl389
-rw-r--r--components/dlink_tcp/src/listener.erl2
-rw-r--r--components/proto_bert/src/proto_bert.app.src2
-rw-r--r--components/proto_bert/src/proto_bert_app.erl6
-rw-r--r--components/proto_bert/src/proto_bert_rpc.erl30
-rw-r--r--components/proto_bert/src/proto_bert_sup.erl2
-rw-r--r--components/rvi_common/src/rvi_common.erl63
-rw-r--r--components/schedule/src/rvi_routing.erl17
-rw-r--r--components/schedule/src/rvi_schedule.erl1
-rw-r--r--components/schedule/src/schedule_rpc.erl623
-rw-r--r--components/schedule/src/schedule_sup.erl4
-rw-r--r--components/service_discovery/curl_scripts/register_service.sh31
-rw-r--r--components/service_discovery/curl_scripts/resolve_service.sh26
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl775
-rw-r--r--components/service_edge/src/service_edge_rpc.erl628
-rwxr-xr-xpython/rvi_call.py48
-rwxr-xr-xpython/rvi_get_services.py10
-rwxr-xr-xpython/rvi_service.py26
-rw-r--r--python/rvilib.py4
-rw-r--r--rebar.config4
-rw-r--r--rvi_sample.config52
-rw-r--r--src/rvi.app.src4
23 files changed, 1128 insertions, 1661 deletions
diff --git a/components/dlink_tcp/priv/setup.config b/components/dlink_tcp/priv/setup.config
deleted file mode 100644
index 9fe04fc..0000000
--- a/components/dlink_tcp/priv/setup.config
+++ /dev/null
@@ -1,42 +0,0 @@
-%% -*- erlang -*-
-[
- %% Put include first, making it possible to override any defaults below
- {include_lib, "exoport/priv/setup.config"},
- %%
- %% Add our own app(s)
- {add_apps, [asn1,
- ssl,
- { rvi_common, load},
- data_link_device]},
- %%
- %% Custom environment settings
- {env,
- [
- {setup, [{data_dir, "db"}]},
- %% Tell exoport where to find our config file
- {exoport,
- [
- {config, filename:join(CWD, "exoport.config")},
- {access,
- [{redirect, [{data_link, data_link_device_rpc}]},
- {accept, data_link_device_rpc}
- ]},
- {kvdb_databases,
- [{kvdb_conf,
- [{file,"$DATA_DIR/kvdb_conf.db"},
- {backend,ets},
- {log_dir, "$DATA_DIR/kvdb_conf.log"},
- {log_threshold, [{writes, 1000}]},
- {save_mode, [on_switch, on_close]},
- {tables,[data]},
- {encoding,{raw,term,term}},
- {schema,kvdb_schema_events}]}
- ]}
- ]}
- %% %% We run with a logging ETS backend in the database
- %% {kvdb,
- %% [
- %% ]}
- %% ]}
- ]}
-].
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 3cb27a5..9ff68f0 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -22,9 +22,12 @@
-export([start_json_server/0]).
-export([start_connection_manager/0]).
--export([announce_available_local_service/2,
- announce_unavailable_local_service/2,
- setup_data_link/2,
+%% Invoked by service discovery
+%% FIXME: Should be rvi_service_discovery behavior
+-export([service_available/4,
+ service_unavailable/4]).
+
+-export([setup_data_link/3,
disconnect_data_link/2,
send_data/3]).
@@ -32,11 +35,21 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
+-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BERT_RPC_PORT, 9999).
-define(DEFAULT_RECONNECT_INTERVAL, 5000).
-define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
+
+-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
+-define(SERVICE_TABLE, rvi_dlink_tcp_services).
+
+-record(service_entry, {
+ service = [], %% Name of service
+ connection = undefined %% PID of connection that can reach this service
+ }).
+
-record(st, {
cs = #component_spec{}
}).
@@ -46,9 +59,15 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- ?info("data_link_bert:init(): Called"),
+ ?info("dlink_tcp:init(): Called"),
%% Dig out the bert rpc server setup
+ ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table,
+ { keypos, #service_entry.service }]),
+
+ ets:new(?CONNECTION_TABLE, [ duplicate_bag, public, named_table,
+ { keypos, #service_entry.connection }]),
+
{ok, #st {
cs = rvi_common:get_component_specification()
}
@@ -68,12 +87,12 @@ start_connection_manager() ->
IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS),
Port = proplists:get_value(port, BertOpts, ?DEFAULT_BERT_RPC_PORT),
- ?info("data_link_bert:init_rvi_component(~p): Starting listener.", [self()]),
+ ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
connection_manager:start_link(),
{ok,Pid} = listener:start_link(),
- ?info("data_link_bert:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
+ ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
%% Add listener port.
case listener:add_listener(Pid, IP, Port, CompSpec) of
@@ -82,39 +101,54 @@ start_connection_manager() ->
[ application:get_env(rvi, node_address, undefined)]);
Err ->
- ?error("data_link_bert:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
+ ?error("dlink_tcp:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
ok
end,
- ?info("data_link_bert:init_rvi_component(): Setting up static nodes."),
- setup_static_node_data_links_(rvi_common:static_nodes(), CompSpec),
+ ?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."),
+
+ {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ ?PERSISTENT_CONNECTIONS,
+ [],
+ CompSpec),
+
+
+ setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
-setup_static_node_data_links_([ ], _CompSpec) ->
- ok;
+setup_persistent_connections_([ ], _CompSpec) ->
+ ok;
-setup_static_node_data_links_([ { Prefix, NetworkAddress} | T], CompSpec) ->
- ?debug("~p: Will connect static node ~p -> ~p", [self(), Prefix, NetworkAddress]),
+setup_persistent_connections_([ NetworkAddress | T], CompSpec) ->
+ ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]),
[ IP, Port] = string:tokens(NetworkAddress, ":"),
- connect_and_retry_remote(Prefix, IP, Port, CompSpec),
- setup_static_node_data_links_(T, CompSpec),
+ connect_and_retry_remote(IP, Port, CompSpec),
+ setup_persistent_connections_(T, CompSpec),
ok.
-%% Behavior implementation
-announce_available_local_service(CompSpec, Service) ->
- rvi_common:notification(data_link, ?MODULE, announce_available_local_service,
- [ {service, Service }], CompSpec).
+service_available(CompSpec, SvcName, DataLinkModule, Address) ->
+ rvi_common:notification(data_link, ?MODULE,
+ service_available,
+ [{ service, SvcName },
+ { data_link_module, DataLinkModule },
+ { address, Address }],
+ CompSpec).
+service_unavailable(CompSpec, SvcName, DataLinkModule, Address) ->
+ rvi_common:notification(data_link, ?MODULE,
+ service_unavailable,
+ [{ service, SvcName },
+ { data_link_module, DataLinkModule },
+ { address, Address }],
+ CompSpec).
-announce_unavailable_local_service(CompSpec, Service) ->
- rvi_common:notification(data_link, ?MODULE, announce_unavailable_local_service,
- [ {service, Service }], CompSpec).
-
-setup_data_link(CompSpec, NetworkAddress) ->
+setup_data_link(CompSpec, Service, Opts) ->
rvi_common:request(data_link, ?MODULE, setup_data_link,
- [ { network_address, NetworkAddress }],
- [status], CompSpec).
+ [ { service, Service },
+ { opts, Opts }],
+ [status, timeout], CompSpec).
disconnect_data_link(CompSpec, NetworkAddress) ->
rvi_common:request(data_link, ?MODULE, disconnect_data_link,
@@ -122,12 +156,13 @@ disconnect_data_link(CompSpec, NetworkAddress) ->
[status], CompSpec).
-send_data(CompSpec, NetworkAddress, Data) ->
+send_data(CompSpec, Service, Data) ->
rvi_common:request(data_link, ?MODULE, send_data,
- [ { network_address, NetworkAddress },
+ [ { service, Service },
{ data, Data } ],
[status], CompSpec).
+
%% End of behavior
%%
@@ -140,12 +175,12 @@ connect_remote(IP, Port, CompSpec) ->
not_found ->
%% Setup a new outbound connection
- ?info("data_link_bert:connect_remote(): Connecting ~p:~p",
+ ?info("dlink_tcp:connect_remote(): Connecting ~p:~p",
[IP, Port]),
case gen_tcp:connect(IP, Port, [binary, {packet, 4}]) of
{ ok, Sock } ->
- ?info("data_link_bert:connect_remote(): Connected ~p:~p",
+ ?info("dlink_tcp:connect_remote(): Connected ~p:~p",
[IP, Port]),
%% Setup a genserver around the new connection.
@@ -161,36 +196,35 @@ connect_remote(IP, Port, CompSpec) ->
ok;
{error, Err } ->
- ?info("data_link_bert:connect_remote(): Failed ~p:~p: ~p",
+ ?info("dlink_tcp:connect_remote(): Failed ~p:~p: ~p",
[IP, Port, Err]),
not_available
end
end.
-connect_and_retry_remote(Prefix, IP, Port, CompSpec) ->
- ?info("data_link_bert:setup_static(): Connecting ~p -> ~p:~p",
- [Prefix, IP, Port]),
+connect_and_retry_remote( IP, Port, CompSpec) ->
+ ?info("dlink_tcp:connect_and_retry_remote(): ~p:~p",
+ [ IP, Port]),
case connect_remote(IP, list_to_integer(Port), CompSpec) of
ok -> ok;
Err -> %% Failed to connect. Sleep and try again
- ?notice("data_link_bert:setup_static_node_data_link(~p:~p): Failed: ~p",
+ ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Failed: ~p",
[IP, Port, Err]),
- ?notice("data_link_bert:setup_static_node_data_link(~p:~p): Will try again in ~p sec",
+ ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
[IP, Port, ?DEFAULT_RECONNECT_INTERVAL]),
- setup_static_node_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL,
- Prefix, IP, Port, CompSpec),
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec),
not_available
end.
announce_local_service_(CompSpec, Service, Availability) ->
- ?debug("data_link_bert:announce_local_service(~p): Service: ~p", [Availability, Service]),
+ ?debug("dlink_tcp:announce_local_service(~p): Service: ~p", [Availability, Service]),
%% Grab our local address.
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
@@ -205,7 +239,7 @@ announce_local_service_(CompSpec, Service, Availability) ->
%% Loop over all returned addresses
lists:map(
fun(Address) ->
- ?info("data_link_bert:announce_local_service(~p): Announcing ~p to ~p",
+ ?info("dlink_tcp:announce_local_service(~p): Announcing ~p to ~p",
[ Availability, Service, Address]),
%% Split the address into host and port
@@ -216,13 +250,13 @@ announce_local_service_(CompSpec, Service, Availability) ->
Res = connection:send(RemoteAddress, list_to_integer(RemotePort),
{service_announce, 3, Availability,
[Service], { signature, {}}}),
- ?debug("data_link_bert:announce_local_service(~p): Res ~p",
+ ?debug("dlink_tcp:announce_local_service(~p): Res ~p",
[ Availability, Res])
end, Addresses),
ok.
handle_socket(_FromPid, PeerIP, PeerPort, data, ping, [_CompSpec]) ->
- ?info("data_link_bert:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]),
+ ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]),
ok;
handle_socket(FromPid, PeerIP, PeerPort, data,
@@ -234,12 +268,12 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
Certificate,
Signature}, [CompSpec]) ->
- ?info("data_link_bert:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
- ?info("data_link_bert:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
- ?info("data_link_bert:authorize(): Protocol: ~p", [ Protocol ]),
- ?debug("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]),
- ?debug("data_link_bert:authorize(): Signature: ~p", [ Signature ]),
+ ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
+ ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
+ ?info("dlink_tcp:authorize(): Protocol: ~p", [ Protocol ]),
+ ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
+ ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]),
+ ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]),
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
@@ -252,7 +286,7 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
- ?info("data_link_bert:authorize(): Remote is behind firewall. Will use ~p:~p",
+ ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
[ PeerIP, PeerPort]),
{ PeerIP, PeerPort };
@@ -267,14 +301,14 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
%% FIXME: Validate certificate and signature before continuing.
case connection_manager:find_connection_by_pid(FromPid) of
not_found ->
- ?info("data_link_bert:authorize(): New connection!"),
+ ?info("dlink_tcp:authorize(): New connection!"),
connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid),
- ?debug("data_link_bert:authorize(): Sending authorize."),
+ ?debug("dlink_tcp:authorize(): Sending authorize."),
Res = connection:send(FromPid,
{ authorize,
1, LocalAddress, LocalPort, rvi_binary,
{certificate, {}}, { signature, {}}}),
- ?debug("data_link_bert:authorize(): Sending authorize: ~p", [ Res]),
+ ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]),
ok;
_ -> ok
end,
@@ -299,7 +333,7 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
%% Send an authorize back to the remote node
- ?info("data_link_bert:authorize(): Announcing local services: ~p to remote ~p:~p",
+ ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
[LocalServices, NRemoteAddress, NRemotePort]),
connection:send(FromPid,
@@ -310,145 +344,216 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }),
ok;
-handle_socket(_FromPid, RemoteIP, RemotePort, data,
+handle_socket(FromPid, RemoteIP, RemotePort, data,
{ service_announce,
- TransactionID,
+ TransactionID,
available,
- Services,
- Signature}, [CompSpec]) ->
- ?debug("data_link_bert:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]),
- ?debug("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]),
- ?debug("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]),
- ?debug("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]),
- ?debug("data_link_bert:service_announce(available): Service: ~p", [ Services ]),
-
+ Services,
+ Signature }, [CompSpec]) ->
+ ?debug("dlink_tcp:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]),
+ ?debug("dlink_tcp:service_announce(available): Remote Port: ~p", [ RemotePort ]),
+ ?debug("dlink_tcp:service_announce(available): TransactionID: ~p", [ TransactionID ]),
+ ?debug("dlink_tcp:service_announce(available): Signature: ~p", [ Signature ]),
+ ?debug("dlink_tcp:service_announce(available): Service: ~p", [ Services ]),
- %% Register the received services with all relevant components
- NetworkAddress = RemoteIP ++ ":" ++ integer_to_list(RemotePort),
- service_discovery_rpc:register_remote_services(CompSpec, Services, NetworkAddress),
+ %% Insert into our own tables
+ [ ets:insert(?SERVICE_TABLE,
+ #service_entry {
+ service = SvcName,
+ connection = FromPid }) || SvcName <- Services ],
+
+ [ ets:insert(?CONNECTION_TABLE,
+ #service_entry {
+ service = SvcName,
+ connection = FromPid }) || SvcName <- Services ],
+
+ service_discovery_rpc:register_services(CompSpec, Services, ?MODULE),
ok;
-handle_socket(_FromPid, RemoteIP, RemotePort, data,
+handle_socket(FromPid, RemoteIP, RemotePort, data,
{ service_announce,
TransactionID,
unavailable,
Services,
Signature}, [CompSpec]) ->
- ?debug("data_link_bert:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]),
- ?debug("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]),
- ?debug("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]),
- ?debug("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]),
- ?debug("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]),
+ ?debug("dlink_tcp:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]),
+ ?debug("dlink_tcp:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]),
+ ?debug("dlink_tcp:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]),
+ ?debug("dlink_tcp:service_announce(unavailable): Signature: ~p", [ Signature ]),
+ ?debug("dlink_tcp:service_announce(unavailable): Service: ~p", [ Services ]),
%% Register the received services with all relevant components
- service_discovery_rpc:unregister_remote_services_by_name(CompSpec, Services),
+
+ %% Delete from our own tables.
+
+ [ ets:delete(?SERVICE_TABLE, SvcName ) || SvcName <- Services ],
+
+ [ ets:match_delete(?CONNECTION_TABLE,
+ #service_entry {
+ service = SvcName,
+ connection = FromPid }) || SvcName <- Services ],
+
+ service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE),
ok;
handle_socket(_FromPid, SetupIP, SetupPort, data,
{ receive_data, Data}, [CompSpec]) ->
-%% ?info("data_link_bert:receive_data(): ~p", [ Data ]),
- ?debug("data_link_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+%% ?info("dlink_tcp:receive_data(): ~p", [ Data ]),
+ ?debug("dlink_tcp:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
protocol_rpc:receive_message(CompSpec, Data),
ok;
handle_socket(_FromPid, SetupIP, SetupPort, data, Data, [_CompSpec]) ->
- ?warning("data_link_bert:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
- ?warning("data_link_bert:unknown_data(): Unknown data: ~p", [ Data]),
+ ?warning("dlink_tcp:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?warning("dlink_tcp:unknown_data(): Unknown data: ~p", [ Data]),
ok.
+
%% We lost the socket connection.
%% Unregister all services that were routed to the remote end that just died.
-handle_socket(_FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
- ?info("data_link_bert:socket_closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+ ?info("dlink_tcp:socket_closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
- service_discovery_rpc:
- unregister_remote_services_by_address(CompSpec, NetworkAddress),
- %% Check if this is a static node. If so, setup a timer for a reconnect
- case lists:keyfind(NetworkAddress, 2, rvi_common:static_nodes()) of
- false ->
- true;
+ %% Get all service records associated with the given connection and
+ %% extract the service name from them
+ %
- { StaticPrefix, StaticNetworkAddress } ->
- ?info("data_link_bert:socket_closed(): Reconnect service: ~p", [ StaticPrefix ]),
- ?info("data_link_bert:socket_closed(): Reconnect address: ~p", [ StaticNetworkAddress ]),
- ?info("data_link_bert:socket_closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]),
- [ IP, Port] = string:tokens(StaticNetworkAddress, ":"),
- setup_static_node_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL,
- StaticPrefix,
- IP, Port, CompSpec)
-
+ Services = [ SvcName || #service_entry { service = SvcName } <-
+ ets:lookup(?CONNECTION_TABLE, FromPid) ],
+ ?debug("dlink_tcp:close(): Deleting ~p", [ Services]),
+
+ %% Step through all services and delete their corresponding record
+ %% from the service table.
+ %% We do this instead of match_delete because it is much faster since
+ %% service is the key in ?SERVICE_TABLE. No linear search and delete
+ %% needed.
+ [ ets:delete(?SERVICE_TABLE, SvcName) || SvcName <- Services ],
+
+ %% Delete all entries in the connection table that matches the closed
+ %% connection.
+ ets:delete(?CONNECTION_TABLE, FromPid),
+
+
+ {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ persistent_connections,
+ [],
+ CompSpec),
+ %% Check if this is a static node. If so, setup a timer for a reconnect
+ case lists:member(NetworkAddress, PersistentConnections) of
+ true ->
+ ?info("dlink_tcp:socket_closed(): Reconnect address: ~p", [ NetworkAddress ]),
+ ?info("dlink_tcp:socket_closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]),
+ [ IP, Port] = string:tokens(NetworkAddress, ":"),
+
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL,
+ IP, Port, CompSpec);
+ false -> ok
end,
ok;
handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
- ?info("data_link_bert:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
ok.
%% JSON-RPC entry point
%% CAlled by local exo http server
-handle_notification("announce_available_local_service", Args) ->
- { ok, Service } = rvi_common:get_json_element(["service"], Args),
- gen_server:cast(?SERVER, { rvi, announce_available_local_service, [Service]}),
+handle_notification("service_available", Args) ->
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ {ok, Address} = rvi_common:get_json_element(["address"], Args),
+
+ gen_server:cast(?SERVER, { rvi, service_available,
+ [ SvcName,
+ DataLinkModule,
+ Address ]}),
+
ok;
+handle_notification("service_unavailable", Args) ->
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ {ok, Address} = rvi_common:get_json_element(["address"], Args),
-handle_notification("announce_unavailable_local_service", Args) ->
- { ok, Service } = rvi_common:get_json_element(["service"], Args),
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
+ [ SvcName,
+ DataLinkModule,
+ Address ]}),
- gen_server:cast(?SERVER, { rvi, announce_unavailable_local_service, [Service]}),
ok;
handle_notification(Other, _Args) ->
- ?info("data_link_bert:handle_notification(~p): unknown", [ Other ]),
+ ?info("dlink_tcp:handle_notification(~p): unknown", [ Other ]),
ok.
handle_rpc("setup_data_link", Args) ->
- { ok, Address } = rvi_common:get_json_element(["network_address"], Args),
- Res = gen_server:call(?SERVER, { rvi, setup_data_link,
- [ Address]}),
- {ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
+ { ok, Service } = rvi_common:get_json_element(["service"], Args),
+
+ { ok, Opts } = rvi_common:get_json_element(["opts"], Args),
+
+ [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link,
+ [ Service, Opts ] }),
+
+ {ok, [ {status, rvi_common:json_rpc_status(Res)} , { timeout, Timeout }]};
handle_rpc("disconenct_data_link", Args) ->
{ ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
- Res = gen_server:call(?SERVER, { rvi, disconnect_data_link, [NetworkAddress]}),
+ [Res] = gen_server:call(?SERVER, { rvi, disconnect_data_link, [NetworkAddress]}),
{ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
handle_rpc("send_data", Args) ->
- {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
- { ok, Data} = rvi_common:get_json_element(["data"], Args),
- [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [NetworkAddress, Data]}),
+ { ok, Service } = rvi_common:get_json_element(["service"], Args),
+ { ok, Data } = rvi_common:get_json_element(["data"], Args),
+ [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [Service, Data]}),
{ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
-
+
handle_rpc(Other, _Args) ->
- ?info("data_link_bert:handle_rpc(~p): unknown", [ Other ]),
+ ?info("dlink_tcp:handle_rpc(~p): unknown", [ Other ]),
{ ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
-handle_cast({rvi, announce_available_local_service, [Service]}, St) ->
- announce_local_service_(St#st.cs, Service, available),
+handle_cast( {rvi, service_available, [SvcName, local, _Address]}, St) ->
+ announce_local_service_(St#st.cs, SvcName, available),
{noreply, St};
-handle_cast({rvi, announce_unavailable_local_service, [Service]}, St) ->
- announce_local_service_(St#st.cs, Service, unavailable),
+
+handle_cast( {rvi, service_unavailable, [SvcName, local, _Address]}, St) ->
+ announce_local_service_(St#st.cs, SvcName, unavailable),
{noreply, St};
handle_cast(Other, St) ->
- ?warning("data_link_bert:handle_cast(~p): unknown", [ Other ]),
+ ?warning("dlink_tcp:handle_cast(~p): unknown", [ Other ]),
{noreply, St}.
-handle_call({rvi, setup_data_link, [ NetworkAddress ]}, _From, St) ->
- [ RemoteAddress, RemotePort] = string:tokens(NetworkAddress, ":"),
- Res = connect_remote(RemoteAddress, list_to_integer(RemotePort), St#st.cs),
- { reply, [Res], St };
+
+handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
+ case proplists:get_value(target, Opts, undefined) of
+ undefined ->
+ ?info("dlink_tcp:setup_data_link(~p) Failed: no target given in options.",
+ [Service]),
+ { reply, [ no_route, 0 ], St };
+
+ Addr ->
+ [ Address, Port] = string:tokens(Addr, ":"),
+
+ case connect_remote(Address, list_to_integer(Port), St#st.cs) of
+ ok ->
+ { reply, [ok, 2000], St }; %% 2 second timeout
+
+ Err ->
+ { reply, [Err, 0], St }
+ end
+ end;
handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
@@ -457,23 +562,31 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [NetworkAddress, Data]}, _From, St) ->
- [ RemoteAddress, RemotePortStr] = string:tokens(NetworkAddress, ":"),
- RemotePort = list_to_integer(RemotePortStr),
- ?info("data_link_bert:send_data(): Remote: ~p:~p", [ RemoteAddress, RemotePort]),
- Res = connection:send(RemoteAddress, RemotePort, {receive_data, Data}),
- { reply, [ Res ], St};
+handle_call({rvi, send_data, [Service, Data]}, _From, St) ->
+
+ %% Resolve connection pid from service
+ case ets:lookup(?SERVICE_TABLE, Service) of
+ [ #service_entry { connection = ConnPid } ] ->
+ ?debug("dlink_tcp:send_data(): ~p -> ~p", [ Service, ConnPid]),
+ Res = connection:send(ConnPid, {receive_data, Data}),
+ { reply, [ Res ], St};
+
+ [] -> %% Service disappeared during send.
+ { reply, [ no_route ], St}
+ end;
+
+
handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
{ok, ServerOpts } = rvi_common:get_module_config(data_link,
- dlink_tcp,
+ ?MODULE,
bert_rpc_server, [],
St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
- ?info("data_link_bert:setup_ping(): ~p:~p will be pinged every ~p msec",
+ ?info("dlink_tcp:setup_ping(): ~p:~p will be pinged every ~p msec",
[ Address, Port, Timeout] ),
erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }),
@@ -481,7 +594,7 @@ handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) ->
{reply, ok, St};
handle_call(Other, _From, St) ->
- ?warning("data_link_bert:handle_rpc(~p): unknown", [ Other ]),
+ ?warning("dlink_tcp:handle_rpc(~p): unknown", [ Other ]),
{ reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, St}.
@@ -492,7 +605,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
%% Check that connection is up
case connection:is_connection_up(Pid) of
true ->
- ?info("data_link_bert:ping(): Pinging: ~p:~p", [Address, Port]),
+ ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
connection:send(Pid, ping),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
@@ -503,13 +616,13 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
{noreply, St};
%% Setup static nodes
-handle_info({ rvi_setup_static_node_data_link, Prefix, IP, Port, CompSpec }, St) ->
- connect_and_retry_remote(Prefix, IP, Port, CompSpec),
+handle_info({ rvi_setup_persitent_connection, IP, Port, CompSpec }, St) ->
+ connect_and_retry_remote(IP, Port, CompSpec),
{ noreply, St };
handle_info(Info, St) ->
- ?notice("data_link_bert(): Unkown message: ~p", [ Info]),
+ ?notice("dlink_tcp(): Unkown message: ~p", [ Info]),
{noreply, St}.
terminate(_Reason, _St) ->
@@ -517,9 +630,11 @@ terminate(_Reason, _St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-setup_static_node_reconnect_timer(MSec, Prefix, IP, Port, CompSpec) ->
+setup_reconnect_timer(MSec, IP, Port, CompSpec) ->
erlang:send_after(MSec, ?MODULE,
- { rvi_setup_static_node_data_link,
- Prefix, IP, Port, CompSpec }),
+ { rvi_setup_persitent_connection,
+ IP, Port, CompSpec }),
ok.
+
+
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index eadec8b..4ec5038 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -74,7 +74,7 @@ new_connection(IP, Port, Sock, State) ->
%% first data.
%% Provide component spec as extra arg.
{ok, _P} = connection:setup(undefined, 0, Sock,
- data_link_bert_rpc_rpc,
+ dlink_tcp_rpc,
handle_socket, [gen_nb_server:get_cb_state(State)]),
{ok, State}.
diff --git a/components/proto_bert/src/proto_bert.app.src b/components/proto_bert/src/proto_bert.app.src
index d3ab76f..0fec506 100644
--- a/components/proto_bert/src/proto_bert.app.src
+++ b/components/proto_bert/src/proto_bert.app.src
@@ -8,7 +8,7 @@
%% -*- erlang -*-
-{application, proto_bert_rpc,
+{application, proto_bert,
[
{description, ""},
{vsn, "0.1"},
diff --git a/components/proto_bert/src/proto_bert_app.erl b/components/proto_bert/src/proto_bert_app.erl
index 101e0c4..68ee09e 100644
--- a/components/proto_bert/src/proto_bert_app.erl
+++ b/components/proto_bert/src/proto_bert_app.erl
@@ -21,14 +21,14 @@
%% ===================================================================
start(_StartType, _StartArgs) ->
- protocol_sup:start_link().
+ proto_bert_sup:start_link().
start_phase(init, _, _) ->
- protocol_rpc:init_rvi_component(),
+ proto_bert_rpc:init_rvi_component(),
ok;
start_phase(json_rpc, _, _) ->
- protocol_rpc:start_json_server(),
+ proto_bert_rpc:start_json_server(),
ok.
stop(_State) ->
diff --git a/components/proto_bert/src/proto_bert_rpc.erl b/components/proto_bert/src/proto_bert_rpc.erl
index 805ba63..3baf356 100644
--- a/components/proto_bert/src/proto_bert_rpc.erl
+++ b/components/proto_bert/src/proto_bert_rpc.erl
@@ -22,7 +22,7 @@
-define(SERVER, ?MODULE).
-export([start_json_server/0]).
--export([send_message/10,
+-export([send_message/9,
receive_message/2]).
-record(st, {
@@ -34,11 +34,11 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- ?debug("protocol_rpc:init(): called."),
+ ?debug("proto_bert_rpc:init(): called."),
{ok, #st { cs = rvi_common:get_component_specification() } }.
start_json_server() ->
- rvi_common:start_json_rpc_server(protocol, ?MODULE, protocol_sup).
+ rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_bert_sup).
@@ -48,7 +48,6 @@ send_message(CompSpec,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- NetworkAddress,
Parameters,
Signature,
Certificate) ->
@@ -58,7 +57,6 @@ send_message(CompSpec,
{ protocol_opts, ProtoOpts },
{ data_link_mod, DataLinkMod },
{ data_link_opts, DataLinkOpts },
- { network_address, NetworkAddress },
{ parameters, Parameters },
{ signature, Signature },
{ certificate, Certificate }],
@@ -78,7 +76,6 @@ handle_rpc("send_message", Args) ->
{ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args),
{ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args),
{ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args),
- {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
{ok, Signature} = rvi_common:get_json_element(["signature"], Args),
{ok, Certificate} = rvi_common:get_json_element(["certificate"], Args),
@@ -88,7 +85,6 @@ handle_rpc("send_message", Args) ->
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- NetworkAddress,
Parameters,
Signature,
Certificate]}),
@@ -97,7 +93,7 @@ handle_rpc("send_message", Args) ->
handle_rpc(Other, _Args) ->
- ?warning("protocol_rpc:handle_rpc(~p): Unknown~n", [ Other ]),
+ ?warning("proto_bert_rpc:handle_rpc(~p): Unknown~n", [ Other ]),
{ ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
@@ -108,7 +104,7 @@ handle_notification("receive_message", Args) ->
ok;
handle_notification(Other, _Args) ->
- ?debug("protocol_rpc:handle_other(~p): unknown", [ Other ]),
+ ?debug("proto_bert_rpc:handle_other(~p): unknown", [ Other ]),
ok.
@@ -118,31 +114,28 @@ handle_call({rvi, send_message,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- NetworkAddress,
Parameters,
Signature,
Certificate]}, _From, St) ->
?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
- ?debug(" protocol:send(): opts: ~p~n", [Opts]),
+ ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]),
?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]),
?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]),
- ?debug(" protocol:send(): network_address: ~p~n", [NetworkAddress]),
%% ?debug(" protocol:send(): parameters: ~p~n", [Parameters]),
?debug(" protocol:send(): signature: ~p~n", [Signature]),
?debug(" protocol:send(): certificate: ~p~n", [Certificate]),
- Data = term_to_binary({ ServiceName, Timeout, NetworkAddress,
- Parameters, Signature, Certificate }),
+ Data = term_to_binary({ ServiceName, Timeout, Parameters, Signature, Certificate }),
- Res = DataLinkMod:send_data(St#st.cs, NetworkAddress, DataLinkOpts, Data),
+ Res = DataLinkMod:send_data(St#st.cs, ServiceName, DataLinkOpts, Data),
{ reply, Res, St };
handle_call(Other, _From, St) ->
- ?warning("protocol_rpc:handle_call(~p): unknown", [ Other ]),
+ ?warning("proto_bert_rpc:handle_call(~p): unknown", [ Other ]),
{ reply, [ invalid_command ], St}.
%% Convert list-based data to binary.
@@ -152,13 +145,11 @@ handle_cast({rvi, receive_message, [Data]}, St) when is_list(Data)->
handle_cast({rvi, receive_message, [Data]}, St) ->
{ ServiceName,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate } = binary_to_term(Data),
?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
- ?debug(" protocol:rcv(): network_address: ~p~n", [NetworkAddress]),
%% ?debug(" protocol:rcv(): parameters: ~p~n", [Parameters]),
?debug(" protocol:rcv(): signature: ~p~n", [Signature]),
?debug(" protocol:rcv(): certificate: ~p~n", [Certificate]),
@@ -166,7 +157,6 @@ handle_cast({rvi, receive_message, [Data]}, St) ->
service_edge_rpc:handle_remote_message(St#st.cs,
ServiceName,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate),
@@ -174,7 +164,7 @@ handle_cast({rvi, receive_message, [Data]}, St) ->
handle_cast(Other, St) ->
- ?warning("protocol_rpc:handle_cast(~p): unknown", [ Other ]),
+ ?warning("proto_bert_rpc:handle_cast(~p): unknown", [ Other ]),
{noreply, St}.
handle_info(_Info, St) ->
diff --git a/components/proto_bert/src/proto_bert_sup.erl b/components/proto_bert/src/proto_bert_sup.erl
index 9c23f9a..449553c 100644
--- a/components/proto_bert/src/proto_bert_sup.erl
+++ b/components/proto_bert/src/proto_bert_sup.erl
@@ -34,6 +34,6 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10},
[
- ?CHILD(proto_bert_rpc_rpc, worker)
+ ?CHILD(proto_bert_rpc, worker)
]} }.
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index c28bb8c..e50f9ec 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -26,8 +26,6 @@
-export([remote_service_to_string/1]).
-export([remote_service_to_string/2]).
-export([local_service_prefix/0]).
--export([get_static_node/1]).
--export([static_nodes/0]).
-export([node_address_string/0]).
-export([node_address_tuple/0]).
-export([get_request_result/1]).
@@ -46,7 +44,6 @@
-define(NODE_SERVICE_PREFIX, node_service_prefix).
-define(NODE_ADDRESS, node_address).
--define(STATIC_NODES, static_nodes).
json_rpc_status(0) ->
@@ -85,6 +82,10 @@ json_rpc_status(5) ->
json_rpc_status("5") ->
already_connected;
+
+json_rpc_status(6) ->
+ no_route;
+
json_rpc_status("6") ->
no_route;
@@ -403,62 +404,6 @@ local_service_prefix() ->
_ -> Prefix ++ "/"
end.
-static_nodes() ->
- case application:get_env(rvi, ?STATIC_NODES) of
-
- {ok, NodeList} ->
- NodeList;
-
- undefined ->
- not_found
- end.
-
-
-%% Locate the statically configured node whose service(s) prefix-
-%% matches the provided service.
-%% FIXME: Longest prefix match.
-get_static_node(Service) ->
- case application:get_env(rvi, ?STATIC_NODES) of
-
- {ok, NodeList} when is_list(NodeList) ->
- get_static_node(Service, NodeList);
-
- undefined ->
- ?debug("No ~p configured under rvi.", [?STATIC_NODES]),
- not_found
- end.
-
-get_static_node(_Service, []) ->
- not_found;
-
-%% Validate that argumenst are all lists.
-get_static_node(Service, [{ SvcPrefix, NetworkAddress} | T ]) when
- not is_list(Service); not is_list(SvcPrefix); not is_list(NetworkAddress) ->
- ?warning("rvi_common:get_static_node(): Could not resolve ~p against {~p, ~p}:"
- "One or more elements not strings.", [ Service, SvcPrefix, NetworkAddress]),
- get_static_node(Service, T );
-
-
-%% If the service we are trying to resolve has a shorter name than
-%% the prefix we are comparing with, ignore.
-get_static_node(Service, [{ SvcPrefix, _NetworkAddress } | T ]) when
- length(Service) < length(SvcPrefix) ->
- ?debug("rvi_common:get_static_node(): Service: ~p is shorter than prefix ~p. Ignore.",
- [ Service, SvcPrefix]),
- get_static_node(Service, T );
-
-get_static_node(Service, [{ SvcPrefix, NetworkAddress} | T] ) ->
- case string:str(Service, SvcPrefix) of
- 1 ->
- ?debug("rvi_common:get_static_node(): Service: ~p -> { ~p, ~p}.",
- [ Service, SvcPrefix, NetworkAddress]),
- NetworkAddress;
- _ ->
- ?debug("rvi_common:get_static_node(): Service: ~p != { ~p, ~p}.",
- [ Service, SvcPrefix, NetworkAddress]),
- get_static_node(Service, T )
- end.
-
node_address_string() ->
case application:get_env(rvi, ?NODE_ADDRESS) of
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index d5b0548..0e6dfbe 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -9,6 +9,9 @@
-behaviour(gen_server).
+
+-include_lib("lager/include/log.hrl").
+
%% API
-export([get_service_routes/1]).
-export([start_link/0]).
@@ -88,7 +91,7 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
handle_call( { rvi_get_service_routes, Service }, _From, St) ->
- {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t};
+ {reply, normalize_routes_(find_routes_( St#st.routes, Service), []), t};
handle_call(_Request, _From, St) ->
Reply = ok,
@@ -179,8 +182,13 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen )
false ->
%% Continue with the old routes and matching len installed
find_routes_(T, Service, CurRoutes, CurMatchLen)
- end.
+ end;
+find_routes_(Rt, Svc, CurRoutes, CurMatchLen) ->
+ ?info("----------------Failed on ~p", [Rt]),
+ { x, y}.
+
+
find_routes_(Routes, Service) ->
case find_routes_(Routes, Service, undefined, 0) of
{ undefined, 0 } ->
@@ -203,6 +211,5 @@ normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) ->
normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) ->
normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
-normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) ->
- normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]).
-
+normalize_routes_({ServicePrefix, [ {Pr, DL} | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, [] } } | Acc]).
diff --git a/components/schedule/src/rvi_schedule.erl b/components/schedule/src/rvi_schedule.erl
index 00a142a..2b4606e 100644
--- a/components/schedule/src/rvi_schedule.erl
+++ b/components/schedule/src/rvi_schedule.erl
@@ -7,6 +7,7 @@
%%
-module(rvi_schedule).
+
-include_lib("rvi_common/include/rvi_common.hrl").
-callback schedule_message(CompSpec :: #component_spec{},
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index 310823c..7a1b23a 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -8,20 +8,17 @@
-module(schedule_rpc).
-behaviour(gen_server).
--behaviour(rvi_schedule).
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
%% API
-export([start_link/0]).
--export([schedule_message/6,
- register_remote_services/3,
- unregister_remote_services/2]).
+-export([schedule_message/6]).
%% Invoked by service discovery
%% FIXME: Should be rvi_service_discovery behavior
-export([service_available/3,
- service_unavailable/3]
+ service_unavailable/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -40,12 +37,10 @@
%%
%% Service -> ETS -> Messages
-record(service, {
- name = "", %% Fully qualified service name.
- %% Set to { IP, Port} when service is available, else unknown_network_address.
- address = unknown,
+ key = { "", unknown }, %% Service name and data link modu
- %% Targeted data link module
- data_link_module = unknown
+ %% Is this service currently available on the data link module
+ available = false,
%% Table containing #message records,
%% indexed by their transaction ID (and sequence of delivery)
@@ -117,7 +112,7 @@ init([]) ->
cs = rvi_common:get_component_specification(),
services_tid = ets:new(rvi_schedule_services,
[ set, private,
- { keypos, #service.name } ])}}.
+ { keypos, #service.key } ])}}.
start_json_server() ->
rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup).
@@ -139,35 +134,19 @@ schedule_message(CompSpec,
[status, transaction_id], CompSpec).
-register_remote_services(CompSpec, NetworkAddress, Services) ->
- rvi_common:notification(schedule, ?MODULE,
- register_remote_services,
- [{ network_address, NetworkAddress} ,
- { services, Services }],
- CompSpec).
-
-
-unregister_remote_services(CompSpec, ServiceNames) ->
- rvi_common:notification(schedule, ?MODULE,
- unregister_remote_services,
- [{ services, ServiceNames }],
- CompSpec).
-
-service_available(Service, DataLinkModule, Address) ->
+service_available(CompSpec, SvcName, DataLinkModule) ->
rvi_common:notification(schedule, ?MODULE,
service_available,
- [{ service, Service },
- { data_link_module, DataLinkModule },
- { address, Address }],
+ [{ service, SvcName },
+ { data_link_mod, DataLinkModule } ],
CompSpec).
-service_unavailable(Service, DataLinkModule, Address) ->
+service_unavailable(CompSpec, SvcName, DataLinkModule) ->
rvi_common:notification(schedule, ?MODULE,
service_unavailable,
- [{ service, Service },
- { data_link_module, DataLinkModule },
- { address, Address }],
+ [{ service, SvcName },
+ { data_link_mod, DataLinkModule } ],
CompSpec).
%% JSON-RPC entry point
@@ -197,54 +176,28 @@ handle_rpc("schedule_message", Args) ->
{ transaction_id, TransID } ] };
-
-
handle_rpc(Other, _Args) ->
?debug("schedule_rpc:handle_rpc(~p): unknown", [ Other ]),
{ok, [ {status, rvi_common:json_rpc_status(invalid_command)}]}.
-handle_notification("register_remote_services", Args) ->
- {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- ?debug("schedule_notification:register_remote_services(): network_address: ~p", [ NetworkAddress]),
- ?debug("schedule_notification:register_remote_services(): services: ~p", [ Services]),
-
- gen_server:cast(?SERVER, { rvi, register_remote_services,
- [ NetworkAddress,
- Services ]}),
-
- ok;
-
-handle_notification("unregister_remote_services", Args) ->
- {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args),
- ?debug("schedule_notification:unregister_remote_services(): services ~p",
- [ DiscountinuedServices]),
-
- gen_server:cast(?SERVER, { rvi, unregister_remote_services,
- [ DiscountinuedServices ]}),
- ok;
handle_notification("service_available", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
gen_server:cast(?SERVER, { rvi, service_available,
- [ Service,
- DataLinkModule,
- Address ]}),
+ [ SvcName,
+ DataLinkModule ]}),
ok;
handle_notification("service_unavailable", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
gen_server:cast(?SERVER, { rvi, service_unavailable,
- [ Service,
- DataLinkModule,
- Address ]}),
+ [ SvcName,
+ DataLinkModule ]}),
ok;
@@ -264,20 +217,28 @@ handle_call( { rvi, schedule_message,
?debug("schedule:sched_msg(): parameters: ~p", [Parameters]),
?debug("schedule:sched_msg(): signature: ~p", [Signature]),
?debug("schedule:sched_msg(): certificate ~p", [Certificate]),
- ?debug("schedule:sched_msg(): St: ~p", [St]),
+ %%?debug("schedule:sched_msg(): St: ~p", [St]),
%%
%% Retrieve the routes that we should try for this message
%%
- case rvi_routing:get_service_routing(SvcName) of
+ case rvi_routing:get_service_routes(SvcName) of
not_found ->
{reply, [ no_route ], St };
Routes ->
- { NewTransID, NSt1} = create_transaction_id(St),
- NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature);
-
- { reply, [ok, TransID], NS2 }
+ { TransID, NSt1} = create_transaction_id(St),
+ NSt2 = queue_message(SvcName,
+ TransID,
+ Routes,
+ Timeout,
+ calc_relative_tout(Timeout),
+ Parameters,
+ Signature,
+ Certificate,
+ NSt1),
+
+ { reply, [ok, TransID], NSt2 }
end;
@@ -297,71 +258,40 @@ handle_call(Other, _From, St) ->
%% @end
%%--------------------------------------------------------------------
-handle_cast( {rvi, register_remote_services,
- [ NetworkAddress, Services]}, St) ->
-
- ?info("schedule:register_remote_services(): services(~p) -> ~p",
- [Services, NetworkAddress]),
-
- {ok, NSt} = multiple_services_available(Services, NetworkAddress, St),
- {noreply, NSt};
+handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) ->
+ %% Find or create the service.
+ ?debug("schedule:service_available(): ~p:~p", [ DataLinkModule, SvcName ]),
-handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) ->
- ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]),
- {ok, NSt} = multiple_services_unavailable(ServiceNames, St),
- {noreply, NSt };
+ SvcRec = update_service(SvcName, DataLinkModule, available, St),
-handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) ->
- %% Find the service
- case ets:lookup(SvcTid, {Service, DataLinkModule}) of
- [] -> %% No service found - Just unsubscribe. Shouldn't really happen
- service_discovery_rpc:
- unsubscribe_to_service_availability({Service, DataLinkModule},
- ?MODULE);
-
- { noreply, St };
+ %% Try to send any pending messages.
+ { _, NSt} = try_sending_messages(SvcRec, St),
- [ Svc ] ->
- %% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
- ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.",
- [ DataLinkModule, Service, Address ]),
- update_service_network_address(Svc, Address, St),
- { _, NSt2 } = try_sending_messages(Svc, NSt1),
- { noreply, NSt2}
- end.
+ { noreply, NSt};
-handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]},
+handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
#st { services_tid = SvcTid } = St) ->
- %% Find the service
- case ets:lookup(SvcTid, {Service, DataLinkModule}) of
+ %% Grab the service
+ case ets:lookup(SvcTid, {SvcName, DataLinkModule}) of
[] -> %% No service found - no op.
{noreply, St};
- [ Svc ] ->
- %% Delete service if it is unused.
- case delete_unused_service(SvcTid, Svc) of
+ [ SvcRec ] ->
+ %% Delete service if it does not have any pending messages.
+ case delete_unused_service(SvcTid, SvcRec) of
true -> %% service was deleted
-
- %% Unsubscribe from service availablility notifications
- service_discovery_rpc:
- unsubscribe_to_service_availability({ Service, DataLinkModule},
- ?MODULE);
-
{ noreply, St};
- false -> %% Service was not deleted, update its network address to unknown
- { _, _, _, NSt} =
- update_service_network_address(Svc, unknown_network_address, St),
-
- { noreply, NSt }
+ false -> %% SvcName was not deleted, set it to not available
+ update_service(SvcName, DataLinkModule, unavailable, St),
+
+ { noreply, St }
end
- end.
- { noreply, St};
+ end;
handle_cast(Other, St) ->
@@ -381,24 +311,41 @@ handle_cast(Other, St) ->
%% Handle timeouts
handle_info({ rvi_message_timeout, SvcName, DLMod, TransID},
- #st { cs = CompSpec, services_tid = SvcTid } = St) ->
+ #st { services_tid = SvcTid } = St) ->
+ %% Look up the service / DataLink mod
case ets:lookup(SvcTid, {SvcName, DLMod}) of
- [ Svc ] ->
- %% Delete from ets.
- case ets:lookup(Svc#service.messages_tid, TransID) of
+ [ SvcRec ] -> %% Found service for specific data link
+
+ %% Delete message from service queue
+ case ets:lookup(SvcRec#service.messages_tid, TransID) of
[ Msg ] ->
?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]),
- ets:delete(Svc#service.messages_tid, TransID),
- queue_message(
+ ets:delete(SvcRec#service.messages_tid, TransID),
+
+ %% Try to requeue message
+ { _Res, NSt } =
+ queue_message(SvcName,
+ Msg#message.transaction_id,
+ Msg#message.routes,
+ Msg#message.timeout,
+ calc_relative_tout(Msg#message.timeout),
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+ {noreply, NSt};
+
_ ->
?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing",
[ TransID, SvcName]),
- ok
+
+ {noreply, St}
+
end;
- _-> ok
- end,
- {noreply, St};
+ _ -> {noreply, St}
+
+ end;
handle_info(_Info, St) ->
@@ -435,289 +382,234 @@ code_change(_OldVsn, St, _Extra) ->
+%%
%% No more routes to try
-queue_message(_SvcName,
+%%
+queue_message(SvcName,
_TransID,
- _MessageTimeout
[ ],
+ _MessageTimeout,
+ _RelativeTimeout,
_Parameters,
_Signature,
_Certificate, St) ->
%% FIXME: Handle route failure
+ ?notice("schedule:queue_message(): Ran out of routes to try for ~p", [SvcName]),
{ route_failed, St };
+%%
+%% The message has timed out
+%%
+queue_message(SvcName,
+ TransID,
+ _Routes,
+ _MessageTimeout,
+ -1, %% We are timed out
+ _Parameters,
+ _Signature,
+ _Certificate,
+ St) ->
+ do_timeout_callback(St#st.cs, SvcName, TransID),
+ { timeout, St };
+
+
+%% Try to queue message
queue_message(SvcName,
TransID,
- [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ],
- MessageTimeout,
+ [ { { _Pr, _PrOp }, { DLMod, DLOp } } | RemainingRoutes ],
+ Timeout,
+ RelativeTimeout,
Parameters,
Signature,
Certificate,
St) ->
+
%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]),
%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]),
%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]),
%% ?info("schedule:sched_msg(): signature: ~p", [Signature]),
%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]),
- %% Calculate how many msec we have until message times out
+ SvcRec = find_or_create_service(SvcName, DLMod, St),
- RelativeTimeout = calc_relative_tout(MessageTimeout),
-
- %% Did we run out of time for the message?
- case RelativeTimeout > 1 of
- %% Nope
- true ->
- {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St),
-
- Msg = #message {
- service = SvcName,
- transaction_id = TransID,
- data_link = { DLMod, DLOp },
- routes = RemaingRoutes,
- message_timeout = MessageTimeout,
- route_timeout_ref = TRef,
- parameters = Parameters,
- signature = Signature,
- certificate = Certificate
- },
-
-
- %% Add to ets table
- TransID = ets:insert(Service#service.messages_tid, Msg),
-
- %% If this is a new service, subscribe to updates on this service's availablity.
- %% If service is already available, we will get service_available/3 notification
- %% immediately.
- case IsNewService of
- true ->
- service_discovery_rpc:
- subscribe_to_service_availability({SvcName, DLMod},
- ?MODULE);
- false ->
- ok
- end,
+ %% The service may already be available, give it a shot.
+ case try_sending_messages(SvcRec, St) of
+ { ok, NSt } -> %% We managed to send the message. Done.
+ { ok, NSt };
+ { _ , NSt } -> %% Failed to send message. Setup data link
+
%%
%% Bring up the relevant data link for the given route.
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
%%
- case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of
+ case DLMod:setup_data_link(NSt#st.cs, SvcName, DLOp) of
{ ok, DLTimeout } ->
- %% Setup a timeout to be
- TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout),
+
+ %% Setup a timeout that triggers on whatever
+ %% comes first of the message's general timeout
+ %% or the timeout of the data link bringup
+ TRef = erlang:send_after(min(RelativeTimeout, DLTimeout),
self(),
- { rvi_message_timeout, SvcName, DLMod, TransID });
-
+ { rvi_message_timeout, SvcName, DLMod, TransID }),
+
+ %% Setup a message to be added to the service / dl table.
+ Msg = #message {
+ transaction_id = TransID,
+ data_link = { DLMod, DLOp },
+ routes = RemainingRoutes,
+ timeout = Timeout,
+ timeout_tref = TRef,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ },
+
+
+ %% Add to ets table
+ TransID = ets:insert(SvcRec#service.messages_tid, Msg),
+ {ok, NSt};
+
%% We failed with this route. Try the next one
{ error, _Reason} ->
queue_message(SvcName,
TransID,
- RelativeTimeout,
+ Timeout,
+ calc_relative_tout(Timeout),
RemainingRoutes,
Parameters,
Signature,
Certificate,
- St)
- end;
-
- %% We are out of time
- false ->
- do_timeout_callback(CompSpec, Svc, Msg),
- { timeout, St}
+ NSt)
+ end
end.
-forward_to_protocol(Svc, Msg, St) ->
- { DataLinkMod, DataLinkOpts } = Msg#message.data_link,
- { ProtoMod, ProtoOpts } = Msg#messge.protocol,
-
- case ProtoMod:send_message(
- St#st.cs,
- SvcName,
- Msg#message.timeout,
- ProtoOpts,
- DataLinkMod,
- DataLinkOpts,
- NetworkAddress,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate) of
-
- %% Success
- [ok] ->
- %% Send the rest.
- try_sending_messages(Service, St);
-
- %% Failed
- [Err] ->
- ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p",
- [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]),
-
- %% Requeue this message with the next route
- { _, St1} = queue_message(SvcName,
- TransID,
- Msg#message.timeout,
- Msg#message.routes,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate,
- St),
-
- %% Send the rest of the messgages targeting
- %% the same service through the same data link
- try_sending_messages(Svc, St)
- end.
-
-
-
-%% Check if we can send messages queued up under the given service.
+%% The service is not available
try_sending_messages(#service {
- name = SvcName,
- network_address = unknown_network_address,
- messages_tid = _Tid } = _Service, St) ->
+ key = { SvcName, _ },
+ available = false,
+ messages_tid = _Tid } = _SvcRec, St) ->
+
?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]),
- {not_available, St};
+ { not_available, St };
try_sending_messages(#service {
- name = SvcName,
- data_link_module = DataLinkModule,
- network_address = NetworkAddress,
- messages_tid = Tid } = Service, St) ->
+ key = { SvcName, DataLinkMod },
+ available = true,
+ messages_tid = Tid } = SvcRec, St) ->
- ?debug("schedule:try_send(): SvcName: ~p", [SvcName]),
- ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]),
+ ?debug("schedule:try_send(): Service: ~p:~p", [DataLinkMod, SvcName]),
%% Extract the first message of the queue.
- case ets:first(Tid) of
- %% No more messages to send.
- '$end_of_table' ->
+ case first_service_message(SvcRec) of
+ empty ->
?debug("schedule:try_send(): Nothing to send"),
{ ok, St };
- Key ->
- St1 = forward_to_protocol
- ?debug("schedule:try_send(): Sending: ~p", [Key]),
- %% Extract first message and try to send it
- case ets:lookup(Tid, Key) of
- [ Msg ] ->
- %% Wipe from ets table and cancel timer
- ets:delete(Tid, Key),
- erlang:cancel_timer(Msg#message.timeout_tref),
-
- %% Forward to protocol and resend
- forward_to_protocol(Service, Msg, St);
-
- _ ->
- ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]),
- { ok, St}
-
+ yanked ->
+ ?info("schedule:try_send(): Message was yanked while trying to send: ~p",
+ [SvcRec#service.key]),
+ { ok, St};
+
+ Msg ->
+ %% Wipe from ets table and cancel timer
+ ets:delete(Tid, Msg#message.transaction_id),
+
+ erlang:cancel_timer(Msg#message.timeout_tref),
+
+ %% Forward to protocol.
+ { _, DataLinkOpts } = Msg#message.data_link,
+ { ProtoMod, ProtoOpts } = Msg#message.protocol,
+
+ %% Send off message to the correct protocol module
+ case ProtoMod:send_message(
+ St#st.cs,
+ SvcName,
+ Msg#message.timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate) of
+
+ %% Success
+ [ok] ->
+ %% Send the rest of the messages associated with this service/dl
+ try_sending_messages(SvcRec, St);
+
+ %% Failed
+ [Err] ->
+ ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p",
+ [ProtoMod, DataLinkMod, SvcName, Err]),
+
+ %% Requeue this message with the next route
+ { _, St1} = queue_message(SvcName,
+ Msg#message.transaction_id,
+ Msg#message.timeout,
+ calc_relative_tout(Msg#message.timeout),
+ Msg#message.routes,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+
+ %% Send the rest of the messgages targeting
+ %% the same service through the same data link
+ %% If they all fail due to some data link error,
+ %% they will all be requeued to the next route.
+ try_sending_messages(SvcRec, St1)
end
end.
-%%
-%% data_link_up has reported that multiple services are now
-%% available at NetworkAddress.
-%% Iterate through all services and mark them as available,
-%% possibly sending any pending message targeting the newly activated
-%% service.
-%%
-multiple_services_available([], _NetworkAddress, St) ->
- {ok, St};
-
-multiple_services_available([ Svc | T], NetworkAddress, St) ->
- {ok, NSt} = service_available(Svc, NetworkAddress, St),
- multiple_services_available(T, NetworkAddress, NSt).
-
-multiple_services_unavailable([], St) ->
- {ok, St};
-
-multiple_services_unavailable([ SvcName | T], St) ->
- {ok, NSt} = service_unavailable(SvcName, St),
- multiple_services_unavailable(T, NSt).
+find_or_create_service(SvcName, DataLinkMod, #st { services_tid = SvcTid } = St) ->
+ ?debug("schedule:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]),
-
-find_or_create_service(ServiceName, St) ->
- %% Invoke with retain to keep any exising network addresses already
- %% in place in an existing service.
- find_or_create_service(ServiceName, retain_existing_address, St).
-
-find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) ->
- ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]),
-
- case ets:lookup(SvcTid, ServiceName) of
+ case ets:lookup(SvcTid, { SvcName, DataLinkMod }) of
[] -> %% The given service does not exist, create it.
- ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]),
- create_service(ServiceName, NetworkAddress, St);
-
- [ Svc1 ] when NetworkAddress =:= retain_existing_address ->
- %% We found a service, and are instructed not to touch
- %% its network address. Return it.
- { ok, Svc1, false, St};
+ ?debug("schedule:find_or_create_service(): Creating new ~p", [ SvcName]),
+ update_service(SvcName, DataLinkMod, unavailable, St);
- [ Svc2 ] ->
+ [ SvcRec ] ->
%% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
- ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]),
- update_service_network_address(Svc2, NetworkAddress, St)
+ %% the new service / State as {ok, NSvcRec, false, NSt}
+ ?debug("schedule:find_or_create_service(): Updating existing ~p", [ SvcName]),
+ SvcRec
end.
-%%
-%% Catch where no network address update is necessary.
-%%
-update_service_network_address(#service { network_address = NetworkAddress } = Service,
- NetworkAddress, St) ->
- { ok, Service, false, St}; %% False indicates that the service exists.
-
-
-%%
-%% Update the service in the ets table with a new network address.
-%%
-update_service_network_address(#service {} = Service, NetworkAddress, St) ->
- %% Create a new service.
- NewService = Service#service { network_address = NetworkAddress},
- %% Replace existing serviceo in the ets table of services.
- ets:insert(St#st.services_tid, [ NewService ]),
- { ok, NewService, false, St}. %% False indicates that the service exists.
-
-
-
-
-%% If we are creating a new service, we need to remap the
-%% retain_existing_address to unknown_network_address in order to get the correct
-%% initial state, which is a created service with an unknown network address.
-create_service(ServiceName, retain_existing_address, St) ->
- create_service(ServiceName, unknown_network_address, St);
-
-
-%% Create a new service and return the new state (not currently modified)
-%% and the newly initialized service revord.
+%% Create a new service, or update an existing one, and return the new
+%% state (not currently modified) and the newly initialized service
+%% revord.
%%
-create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) ->
- Svc = #service {
- name = ServiceName,
- network_address = NetworkAddress,
- messages_tid = ets:new(rvi_messages,
- [ ordered_set, private,
- { keypos, #message.transaction_id } ])
- },
+update_service(SvcName, DataLinkMod, Available,
+ #st { services_tid = SvcsTid, cs = CS }) ->
+ %% Return new service and existing state.
+ ?debug("schedule:create_service(): ~p:~p ", [ DataLinkMod, SvcName]),
+ SvcRec = #service {
+ key = { SvcName, DataLinkMod },
+ available = Available,
+ messages_tid = ets:new(rvi_messages,
+ [ ordered_set, private,
+ { keypos, #message.transaction_id } ])
+ },
%% Insert new service to ets table.
- ets:insert(SvcsTid, Svc),
-
- %% Return new service and existing state.
- ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]),
- ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]),
- { ok, Svc, true, St}. %% True indicates that the service is newly created.
+ ets:insert(SvcsTid, SvcRec),
+
+ %% Subscribe to updates on the availbility of this service.
+ service_discovery_rpc:subscribe(CS, SvcName, DataLinkMod),
+
+ SvcRec.
%% Create a new and unique transaction id
@@ -748,7 +640,7 @@ calc_relative_tout(UnixTime) ->
case TOut =< 0 of
true ->
- 1; %% One millisec is the smallest value we will time out on
+ -1; %% We have timed out
false ->
TOut * 1000
@@ -756,30 +648,45 @@ calc_relative_tout(UnixTime) ->
%% Handle a callback for a timed out message.
-do_timeout_callback(CompSpec, Service,
- #message {transaction_id = TransID}) ->
- service_edge_rpc:handle_local_timeout(CompSpec, Service, TransID),
- ok;
+do_timeout_callback(CompSpec, SvcName, TransID) ->
+ service_edge_rpc:handle_local_timeout(CompSpec, SvcName, TransID),
+ ok.
-%% callback element of #message is not an {M,F,A} format, ignore.
-do_timeout_callback(_,_,_) ->
- ok.
%% Kill off a service that is no longer used.
%%
-delete_unused_service(SvcTid, Svc) ->
+delete_unused_service(SvcTid, SvcRec) ->
%% Do we have messages waiting for this service?
- case ets:first(Svc#service.messages_tid) of
+ case ets:first(SvcRec#service.messages_tid) of
%% Nope.
'$end_of_table' ->
- ets:delete(Svc#messages_tid),
- ets:delete(SvcTid, { Service, DataLinkModule})
+ ets:delete(SvcRec#service.messages_tid),
+ ets:delete(SvcTid, SvcRec#service.key),
+
+ { SvcName, DataLinkMod} = SvcRec#service.key,
+ %% Unsubscribe from service availablility notifications
+ service_discovery_rpc:
+ unsubscribe_to_service_availability(SvcName, DataLinkMod, ?MODULE),
%% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
+ %% the new service / State as {ok, NSvcRec, false, NSt}
?debug("schedule:service_unavailable(): Service ~p:~p now has no address.",
- [ DataLinkModule, Service ]),
+ [ SvcRec#service.key ]),
true;
_ -> false
end.
+
+first_service_message(#service { messages_tid = Tid }) ->
+ case ets:first(Tid) of
+ '$end_of_table' ->
+ empty;
+
+ Key ->
+ case ets:lookup(Tid, Key) of
+ [ Msg ] -> Msg;
+ [] -> yanked
+ end
+ end.
+
+
diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl
index 8589df0..76c403e 100644
--- a/components/schedule/src/schedule_sup.erl
+++ b/components/schedule/src/schedule_sup.erl
@@ -34,7 +34,7 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10},
[
- ?CHILD(schedule_rpc, worker)
- ?CHILD(router, worker)
+ ?CHILD(schedule_rpc, worker),
+ ?CHILD(rvi_routing, worker)
]} }.
diff --git a/components/service_discovery/curl_scripts/register_service.sh b/components/service_discovery/curl_scripts/register_service.sh
deleted file mode 100644
index 7441eb4..0000000
--- a/components/service_discovery/curl_scripts/register_service.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/sh
-# Create new accounts, like the ga account
-# But other accounts may be create
-. $HOME/.exodmrc
-
- #if [ $# != 2 ]
-#then
-# echo "Usage: $0 temperature"
-# exit 255
-#fi
-# the password (actually erlang node cookie) must be 100% hidden
-# so this is only for testing!!!!!
-
-URL=http://localhost:8801/exodm/rpc
-curl -u $USER_AUTH -k -X POST $URL -d @- << EOF
-{
- "jsonrpc": "2.0",
-
- "method": "service_discovery:register_service",
- "id": "1",
- "params":
- {
- "service": "hvac",
- "address": "http://localhost:8901",
- "methods": [
- { "access_type": "rpc", "method": "set_temperature" },
- { "access_type": "rpc", "method": "set_fan_speed" }
- ]
- }
-}
-EOF
diff --git a/components/service_discovery/curl_scripts/resolve_service.sh b/components/service_discovery/curl_scripts/resolve_service.sh
deleted file mode 100644
index d480a7d..0000000
--- a/components/service_discovery/curl_scripts/resolve_service.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/sh
-# Create new accounts, like the ga account
-# But other accounts may be create
-. $HOME/.exodmrc
-
- #if [ $# != 2 ]
-#then
-# echo "Usage: $0 temperature"
-# exit 255
-#fi
-# the password (actually erlang node cookie) must be 100% hidden
-# so this is only for testing!!!!!
-
-URL=http://localhost:8801/exodm/rpc
-curl -u $USER_AUTH -k -X POST $URL -d @- << EOF
-{
- "jsonrpc": "2.0",
-
- "method": "service_discovery:resolve_remote_service",
- "id": "1",
- "params":
- {
- "service": "rpc://jaguarlandrover.com/vin/1234/services/hvac/set_speed"
- }
-}
-EOF
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index 0819cb1..8b1622a 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -18,42 +18,35 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([get_all_services/1,
- get_local_addresses/1,
- get_remote_addresses/1,
- get_local_services/1,
- get_remote_services/1,
- subscribe_to_service_availability/2,
- unsubscribe_from_service_availability/2,
- resolve_local_service/2,
- resolve_remote_service/2,
- register_remote_services/3,
- register_local_service/3,
- unregister_remote_services_by_address/2,
- unregister_remote_services_by_name/2,
- unregister_local_service/2]).
+-export([get_services/1,
+ subscribe/3,
+ unsubscribe/3,
+ register_services/3,
+ unregister_services/3]).
-export([start_json_server/0]).
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--define(LOCAL_SERVICE_TABLE, rvi_local_services).
--define(REMOTE_SERVICE_TABLE, rvi_remote_services).
--define(REMOTE_ADDRESS_TABLE, rvi_address_services).
--define(NOTIFICATION_SUBS_TABLE, rvi_notification_subs).
+-define(SERVICE_TABLE, rvi_services).
+-define(MODULE_TABLE, rvi_modules).
+-define(NOTIFICATION_TABLE, rvi_notification_subs).
-record(service_entry, {
- service = [],
- address = {undefined,undefined} %% Module and Address where service can be found
+ service = [], %% Servie handled by this entry.
+ data_link_mod = undefined %% Module handling service, 'local' if local service
}).
+
-record(notification_subs, {
service = [],
modules = [] %% List of modules subscribing to this service
}).
+
-define(SERVER, ?MODULE).
+
-record(st, {
%% Component specification
cs = #component_spec{}
@@ -64,11 +57,14 @@ start_link() ->
init([]) ->
?debug("service_discovery_rpc:init(): called."),
- ets:new(?REMOTE_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]),
- ets:new(?REMOTE_ADDRESS_TABLE, [duplicate_bag, public, named_table,
- {keypos, #service_entry.address}]),
+ ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table,
+ { keypos, #service_entry.service }]),
+
+ ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table,
+ { keypos, #service_entry.data_link_mod }]),
- ets:new(?NOTIFICATION_SUBS_TABLE, [set, public, named_table, { keypos, #notification_subs.service }]),
+ ets:new(?NOTIFICATION_TABLE, [set, public, named_table,
+ { keypos, #notification_subs.service }]),
{ok, #st { cs = rvi_common:get_component_specification() } }.
@@ -76,80 +72,33 @@ init([]) ->
start_json_server() ->
rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup).
-get_all_services(CompSpec) ->
- rvi_common:request(service_discovery, ?MODULE,
- get_all_services, [], [status, services], CompSpec).
-
-
-get_local_services(CompSpec) ->
- rvi_common:request(service_discovery, ?MODULE,
- get_local_services, [], [status, services], CompSpec).
-
-get_remote_services(CompSpec) ->
- rvi_common:request(service_discovery, ?MODULE,
- get_remote_services, [], [status, services], CompSpec).
-
-get_local_addresses(CompSpec) ->
- rvi_common:request(service_discovery, ?MODULE,
- get_local_addresses, [], [status], CompSpec).
-
-get_remote_addresses(CompSpec) ->
+get_services(CompSpec) ->
rvi_common:request(service_discovery, ?MODULE,
- get_remote_addresses, [], [status], CompSpec).
+ get_services, [], [status, services], CompSpec).
-resolve_local_service(CompSpec, RawService) ->
- rvi_common:request(service_discovery, ?MODULE, resolve_local_service,
- [{ service, RawService }],
- [status, full_service], CompSpec).
-
-resolve_remote_service(CompSpec, RawService) ->
- rvi_common:request(service_discovery, ?MODULE, resolve_remote_service,
- [{ service, RawService }],
- [status], CompSpec).
-
-register_remote_services(CompSpec, DataLinkModule, Address, Services) ->
- rvi_common:notification(service_discovery, ?MODULE, register_remote_services,
- [{ data_link_module, DataLinkModule },
- { address, Address },
- { services, Services }],
+register_services(CompSpec, Services, DataLinkModule) ->
+ rvi_common:notification(service_discovery, ?MODULE, register_services,
+ [{ services, Services },
+ { data_link_module, DataLinkModule }],
CompSpec).
-
-register_local_service(CompSpec, Address, Services) ->
- rvi_common:request(service_discovery, ?MODULE, register_local_service,
- [{ address, Address },
- { service, Services }],
- [status], CompSpec).
-
-
-unregister_remote_services_by_address(CompSpec, DataLinkModule, Address) ->
- rvi_common:notification(service_discovery, ?MODULE, unregister_remote_services_by_address,
+unregister_services(CompSpec, Services, DataLinkModule) ->
+ rvi_common:notification(service_discovery, ?MODULE, unregister_service,
[{ data_link_module, DataLinkModule},
- { address, Address }],
- CompSpec).
-
-unregister_remote_services_by_name(CompSpec, Service) ->
- rvi_common:notification(service_discovery, ?MODULE, unregister_remote_services_by_name,
- [{ services, Service }],
- CompSpec).
+ { services, Services }],
+ CompSpec).
-unregister_local_service(CompSpec, Service) ->
- rvi_common:notification(service_discovery, ?MODULE, unregister_local_service,
- [{ service, Service }],
+subscribe(CompSpec, Service, SubscribingMod) ->
+ rvi_common:notification(service_discovery, ?MODULE, subscribe,
+ [ { service, Service }],
+ [ { subscribing_module, SubscribingMod }],
CompSpec).
-
-subscribe_to_service_availability(Service, Module) ->
- rvi_common:notification(service_discovery, ?MODULE, subscribe_to_service_availability,
- [{ service, Service }],
- [{ module, Module }],
- CompSpec).
-
-unsubscribe_from_service_availability(Service, Module) ->
+unsubscribe(CompSpec, Service, SubscribingMod) ->
rvi_common:notification(service_discovery, ?MODULE, unsubscribe_from_service_availability,
- [{ service, Service }],
- [{ module, Module }],
+ [ { service, Service }],
+ [ { subscribing_module, SubscribingMod }],
CompSpec).
@@ -157,162 +106,55 @@ unsubscribe_from_service_availability(Service, Module) ->
%% Called by local exo http server
%% Register remote services
-handle_notification("register_remote_services", Args) ->
+handle_notification("register_services", Args) ->
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
- gen_server:cast(?SERVER, { rvi, register_remote_services,
- [ Services, DataLinkModule, Address ]}),
+ gen_server:cast(?SERVER, { rvi, register_services,
+ [ Services, DataLinkModule ]}),
ok;
-
-handle_notification("unregister_remote_services_by_address", Args) ->
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
- gen_server:cast(?SERVER, { rvi, unregister_remote_services_by_address,
- [ DataLinkModule, Address ]}),
- ok;
-
-
-handle_notification("unregister_remote_services_by_name", Args) ->
+handle_notification("unregister_services", Args) ->
{ok, Services} = rvi_common:get_json_element(["services"], Args),
- gen_server:cast(?SERVER, { rvi, unregister_remote_service_by_Name,
- [ Services ]}),
- ok;
-
-
-
-handle_notification("unregister_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- %% De-register service
- gen_server:cast(?SERVER, { rvi, unregister_local_service,
- [ Service ]}),
+ {ok, DataLinkModule } = rvi_common:get_json_element(["data_link_module"], Args),
+ gen_server:cast(?SERVER, { rvi, unregister_services,
+ [ Services, DataLinkModule ]}),
ok;
-handle_notification( Other, _Args) ->
- ?info("service_discovery_rpc:handle_notification(~p): unknown", [ Other ]),
- ok.
-
-handle_notification("subscribe_to_service_availability", Args) ->
+handle_notification("subscribe", Args) ->
{ok, Service } = rvi_common:get_json_element(["service"], Args),
- {ok, Module } = rvi_common:get_json_element(["module"], Args),
+ {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args),
%% De-register service
- gen_server:cast(?SERVER, { rvi, subscribe_to_service_availability,
+ gen_server:cast(?SERVER, { rvi, subscribe,
[ Service, Module ]}),
ok;
handle_notification("unsubscribe_from_service", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, Module } = rvi_common:get_json_element(["module"], Args),
+ {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args),
%% De-register service
gen_server:cast(?SERVER, { rvi, unsubscribe_from_service,
[ Service, Module ]}),
ok;
+handle_notification( Other, _Args) ->
+ ?info("service_discovery_rpc:handle_notification(~p): unknown", [ Other ]),
+ ok.
-%% Register local services
-handle_rpc("register_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
-
- [ok, FullSvcName] = gen_server:call(?SERVER, { rvi, register_local_service,
- [ Service, Address ]}),
-
- {ok, [ {status, rvi_common:json_rpc_status(ok) }, { full_service_name, FullSvcName }]};
-
-
-%%
-%% Get remote services
-%%
-handle_rpc("get_remote_services", _Args) ->
- [ok, Services ] = gen_server:call(?SERVER, { rvi, get_remote_services,
- [ ]}),
- JSONServices = convert_json_services(Services),
-
- {ok, [ {status, rvi_common:json_rpc_status(ok)} ,
- { services, { array, JSONServices } }]};
-
%%
%% Get all services
%%
-handle_rpc("get_all_services", _Args) ->
- ?debug("service_discovery_rpc:get_all_services(json-rpc)"),
- [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services,
- []}),
- ?debug("service_discovery_rpc:Done"),
+handle_rpc("get_services", _Args) ->
+ ?debug("service_discovery_rpc:get_services(json-rpc)"),
+ [ok, Services ] = gen_server:call(?SERVER, { rvi, get_services, []}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]};
-%%
-%% Get remote network addresses
-%%
-handle_rpc("get_remote_addresses", _Args) ->
- [ok, Addresses ] = gen_server:call(?SERVER, { rvi, get_remote_addresses,
- []}),
- {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]};
-
-%%
-%% Resolve remote service
-%%
-handle_rpc("resolve_remote_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
-
- case gen_server:call(?SERVER, { rvi, resolve_remote_service,
- [Service]}) of
- [ok, Addresses ] ->
- {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]};
-
- [ Other ] -> {ok, [ {status, rvi_common:json_rpc_status(Other)} ]}
- end;
-
-
-
-%%
-%% Resolve local service
-%%
-handle_rpc("resolve_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
-
- case gen_server:call(?SERVER, { rvi, resolve_local_service,
- [Service]}) of
- [ok, Addresses ] ->
- {ok, [ {status, rvi_common:json_rpc_status(ok)}, { addresses, { array, Addresses }}]};
-
- [ Other ] -> {ok, [ {status, rvi_common:json_rpc_status(Other)} ]}
- end;
-
-
-
-
-%%
-%% Get local services
-%%
-handle_rpc("get_local_services", _Args) ->
- [ok, LocalServices ] =
- gen_server:call(?SERVER, { rvi, get_local_services, []}),
-
- JSONServices = convert_json_services(LocalServices),
-
- {ok, [ {status, rvi_common:json_rpc_status(ok)} ,
- { services, { array, JSONServices }}]};
-
-
-
-%%
-%% Get local network addresses
-%%
-handle_rpc("get_local_addresses", _Args) ->
- [ok, LocalAddresses ] =
- gen_server:call(?SERVER, { rvi, get_local_addresses, []}),
-
- {ok, [ {status, rvi_common:json_rpc_status(ok)} , { addresses, { array, LocalAddresses }}]};
-
%%
@@ -323,99 +165,14 @@ handle_rpc( Other, _Args) ->
{ok, [ { status, invalid_command } ]}.
-%% Handle calls received through regular gen_server calls, routed by
-%% rvi_common:request()
-
-handle_call({rvi, register_local_service, [Service, Address] }, _From, St) ->
- ?info("service_discovery_rpc:register_local_service(): ~p -> ~p",
- [Service, Address]),
-
- FullSvcName = rvi_common:local_service_to_string(Service),
-
- ets:insert(?LOCAL_SERVICE_TABLE,
- #service_entry {
- service = FullSvcName,
- address = Address
- }),
-
- notify_subscribers(available, Service, local, Address),
- {reply, [ ok, FullSvcName ], St };
-
-
-handle_call({rvi, resolve_remote_service, [RawService]}, _From, St) ->
- Service = rvi_common:sanitize_service_string(RawService),
- ?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]),
- ?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]),
- case resolve_service_(?REMOTE_SERVICE_TABLE, Service) of
- {ok, Address } ->
- {reply, [ok, Address ], St};
-
- not_found ->
- ?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. "
- "Trying static nodes",
- [Service]),
-
-
- %% Check if this is a service residing on the backend server
- case rvi_common:get_static_node(Service) of
- not_found -> %% Not found
- ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.",
- [Service]),
-
- {reply, [not_found], St };
-
- Address -> %% Found
- ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p",
- [Service, Address]),
-
- {reply, [ok, Address ], St}
- end
- end;
-
-handle_call({rvi, get_remote_services, _Args}, _From, St) ->
- Services = get_services_(?REMOTE_SERVICE_TABLE),
- {reply, [ok, Services ], St };
-handle_call({rvi, get_all_services, _Args}, _From, St) ->
- RemoteSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
- [ ServiceName | Acc ] end,
- [], ?REMOTE_SERVICE_TABLE),
+handle_call({rvi, get_services, _Args}, _From, St) ->
+ Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
+ [ ServiceName | Acc ] end,
+ [], ?SERVICE_TABLE),
- LocalSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
- [ ServiceName | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- Services = RemoteSvc++LocalSvc,
-
- {reply, [ok, Services], St };
-
-
-handle_call({rvi, get_remote_addresses, _Args}, _From, St) ->
- Addresses = get_addresses_(?REMOTE_ADDRESS_TABLE),
- {reply, [ ok, Addresses ], St };
-
-
-handle_call({rvi, resolve_local_service, [RawService]}, _From, St) ->
- Service = rvi_common:sanitize_service_string(RawService),
- ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]),
- ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]),
-
- case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of
- not_found ->
- { reply, [not_found], St };
-
- {ok, Address } ->
- { reply, [ok, Address], St}
- end;
-
-handle_call({rvi, get_local_services, _Args}, _From, St) ->
- Services = get_services_(?LOCAL_SERVICE_TABLE),
- {reply, [ok, Services ], St };
-
-handle_call({rvi, get_local_addresses, _Args}, _From, St) ->
- Addresses = get_addresses_(?LOCAL_SERVICE_TABLE),
- {reply, [ ok, Addresses ], St };
+ {reply, [ok, Svcs], St };
handle_call(Other, _From, St) ->
@@ -423,195 +180,79 @@ handle_call(Other, _From, St) ->
{ reply, [unknown_command] , St}.
-handle_cast({rvi, subscribe_to_service_availability, [Service, Module] }, St) ->
- case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of
+
+handle_cast({rvi, subscribe, [Service, SubsMod] }, St) ->
+ case ets:lookup(?NOTIFICATION_TABLE, Service) of
'$end_of_table' ->
%% Insert new entry.
- ets:insert(?NOTIFICATION_SUBS_TABLE,
+ ets:insert(?NOTIFICATION_TABLE,
#notification_subs { service = Service,
- modules = [Module] });
+ modules = [SubsMod] });
- #notification_subs { modules = Modules } ->
+ #notification_subs { modules = SubsMods } ->
%% Replace existing entry
- ets:insert(?NOTIFICATION_SUBS_TABLE,
+ ets:insert(?NOTIFICATION_TABLE,
#notification_subs { service = Service,
- modules = lists:usort([ Module | Modules ]) }),
+ modules = lists:usort([ SubsMod | SubsMods ]) })
end,
- notify_on_existing_service(Service, Module)
+ notify_on_existing_service(St#st.cs, Service, SubsMod),
{ noreply, St};
-handle_cast({rvi, unsubscribe_to_service_availability, [Service, Module] }, St) ->
- case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of
+
+handle_cast({rvi, unsubscribe, [Service, SubsMod] }, St) ->
+ case ets:lookup(?NOTIFICATION_TABLE, Service) of
'$end_of_table' ->
%% No match.
ok;
- #notification_subs { modules = Modules } ->
+ #notification_subs { modules = SubsMods } ->
%% Replace existing entry
- ets:insert(?NOTIFICATION_SUBS_TABLE,
+ ets:insert(?NOTIFICATION_TABLE,
#notification_subs { service = Service,
- modules = Modules -- Module })
+ modules = SubsMods -- SubsMod })
end,
{ noreply, St};
-handle_cast({rvi, register_remote_services, [Services, DataLinkModule, Address] }, St) ->
- case Services of
- %% We have zero services associated with address.
- %% Just register the address.
- [] ->
- ?info("service_discovery_rpc:register_remote_service_(): service(n/a) -> ~p",
- [Address]),
-
- ets:insert(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = "",
- address = { Address, DataLinkModule }
- }),
-
- dump_table(?REMOTE_ADDRESS_TABLE),
- ok;
- %% Loop through the services and register them.
- _ ->
- lists:map(fun(Svc) ->
- register_remote_service_(Svc,
- DataLinkModule,
- Address)
- end, Services),
-
- notify_subscribers(available, Service, DataLinkModule, Address),
-
- %% Forward to scheduler now that we have updated our own state
- schedule_rpc:register_remote_services(St#st.cs, Address, Services),
-
- %% Forward to service edge so that it can inform its locally
- %% connected services.
- %% Build a list of all our local services' addresses to provide
- %% to service edge so that it knows where to send the,
- LocalSvcAddresses =
- ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) ->
- [ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- %% Call service edge with local addresses (sorted and de-duped) and
- %% the services to register.
- service_edge_rpc:
- register_remote_services(St#st.cs,
- Services,
- lists:usort(LocalSvcAddresses))
- end,
- {noreply, St };
+%% Handle calls received through regular gen_server calls, routed by
+%% rvi_common:request()
+handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) ->
+ ?info("service_discovery_rpc:register_service(): ~p:~p",
+ [DataLinkModule, Services]),
+ [ register_single_service_(SvcName, DataLinkModule) || SvcName <- Services],
-%%
-%% Delete all services registered under the given address
-%%
-handle_cast({rvi, unregister_remote_services_by_address, [DataLinkModule, Address]}, St) ->
-
- %% Retrieve all services associated with the remote address
- Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, { Address, DataLinkModule }),
-
- %% We now have a list of service records, convert them to a list of service
- %% names and send them of to schedule for deregistration
- AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) ->
- [SvcName | Acc]
- end, [], Svcs),
-
- ?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p",
- [Address, AllSvcNames]),
-
- %% We need to filter AllSvcNames to remove all service entries that have
- %% been registered under another name.
- %% We do this by creating a list of all matching entries associated
- %% with a network address not matching the disconnected Address
- %%
- %% See issue https://github.com/PDXostc/rvi/issues/14 for details
- FilterSvc =
- lists:foldr(
- fun(Service, Acc) ->
- %% Lookup the service in the service table.
- case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of
-
- %% Not found. Do not filter out.
- [] ->
- Acc;
-
- %% We found or own entry, tiet to the disconnected address.
- %% Do not add to addresses to be removed.
- [ #service_entry { address = { Address, DataLinkModule} } ] ->
- Acc;
-
- %% We found an entry that does not the disconnected
- %% network address. This one should be filtered out
- [ _ ] ->
- [ Service | Acc ]
-
- end
- end, [], AllSvcNames),
-
- SvcNames = AllSvcNames -- FilterSvc,
+ %% Notify all subscribers
+ notify_subscribers(St#st.cs,
+ available,
+ Services,
+ DataLinkModule),
- ?info("service_discovery_rpc:unregister_remote_services_by_address(): "
- "Resurrected services: ~p",
- [FilterSvc]),
-
- %% Delete any addresses stored with an empty service name,
- %% installed with register_remote_service/1, since we now have at
- %% least one service name.
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = [],
- address = { Address, DataLinkModule}
- }),
+ {noreply, St };
- ets:delete(?REMOTE_ADDRESS_TABLE, {Address, DataLinkModule}),
- %% Go through all service names and unregister them with schedulew
- %% and service edge
- case SvcNames of
- %% Nothing to do.
- [] ->
- true;
- _ ->
- notify_subscribers(unavailable, SvcNames, DataLinkModule, Address),
-
- %% Tell scheduler to kill off services
- schedule_rpc:unregister_remote_services(St#st.cs, SvcNames),
-
- %% Delete all services from remote service table.
- [ ets:delete(?REMOTE_SERVICE_TABLE, Svc#service_entry.service) || Svc <- Svcs ],
-
- %% Forward to service edge so that it can inform its locally
- %% connected services.
- %% Build a list of all our local services' addresses to provide
- %% to service edge so that it knows where to send the,
- LocalSvcAddresses =
- ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) ->
- [ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- %% Call service edge with local addresses (sorted and
- %% de-duped) and the services to register.
- service_edge_rpc:unregister_remote_services(St#st.cs,
- SvcNames,
- lists:usort(LocalSvcAddresses))
+%% Handle calls received through regular gen_server calls, routed by
+%% rvi_common:request()
+handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) ->
- end,
- {noreply, St };
+ ?info("service_discovery_rpc:unregister_service(): ~p:~p",
+ [DataLinkModule, Services]),
-handle_cast({rvi, unregister_remote_services_by_name, [Services]}, St) ->
- unregister_remote_services_by_name_(St#st.cs, Services),
- {noreply, St };
+ [ unregister_single_service_(SvcName, DataLinkModule) || SvcName <- Services],
+
+ %% Notify all subscribers
+ notify_subscribers(St#st.cs,
+ unavailable,
+ Services,
+ DataLinkModule),
-handle_cast({rvi, unregister_local_service, [Service]}, St) ->
- ?info("service_discovery_rpc:unregister_local_service(): ~p",
- [Service]),
- ets:delete(?LOCAL_SERVICE_TABLE, Service),
{noreply, St };
+
+
handle_cast(Other, St) ->
?warning("service_discovery_rpc:handle_cast(~p): unknown", [ Other ]),
{noreply, St}.
@@ -628,151 +269,110 @@ code_change(_OldVsn, St, _Extra) ->
%%
%% INTERNAL SUPPORT FUNCTIONS
%%
-dump_table(_Table, '$end_of_table') ->
- true;
+%% dump_table(_Table, '$end_of_table') ->
+%% true;
-dump_table(Table, Key) ->
- Val = ets:lookup(Table, Key),
- ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]),
- dump_table(Table, ets:next(Table, Key)).
+%% dump_table(Table, Key) ->
+%% Val = ets:lookup(Table, Key),
+%% ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]),
+%% dump_table(Table, ets:next(Table, Key)).
-dump_table(Table) ->
- dump_table(Table, ets:first(Table)).
+%% dump_table(Table) ->
+%% dump_table(Table, ets:first(Table)).
-register_remote_service_(Service, DataLinkModule, Address) ->
- ?info("service_discovery_rpc:register_remote_service_(): service(~p) -> ~p",
- [Service, Address]),
+register_single_service_(Service, DataLinkModule) ->
+ ?info("service_discovery_rpc:register_remote_service_(~p:~p)",
+ [DataLinkModule,Service]),
- FullSvcName = rvi_common:remote_service_to_string(Service),
-
- ets:insert(?REMOTE_SERVICE_TABLE,
- #service_entry {
- service = FullSvcName,
- address = { Address, DataLinkModule }
- }),
-
- %% Delete any addresses stored with an empty service name,
- %% installed with register_remote_service/1, since we now have at
- %% least one service name.
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ %% Delete any previous instances of the given entry, in case
+ %% the service registers multiple times
+ ets:match_delete(?MODULE_TABLE,
#service_entry {
- service = [],
- address = { Address, DataLinkModule}
+ service = Service,
+ data_link_mod = DataLinkModule
}),
- %% Delete any previous instances of the given entry, in case
- %% the service registers multiple times
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ ets:match_delete(?SERVICE_TABLE,
#service_entry {
- service = FullSvcName,
- address = { Address , DataLinkModule }
+ service = Service,
+ data_link_mod = DataLinkModule
}),
- %% Insert new element
- ets:insert(?REMOTE_ADDRESS_TABLE,
+ ets:insert(?SERVICE_TABLE,
+ #service_entry {
+ service = Service,
+ data_link_mod = DataLinkModule
+ }),
+
+ ets:insert(?MODULE_TABLE,
#service_entry {
- service = FullSvcName,
- address = { Address, DataLinkModule}
+ service = Service,
+ data_link_mod = DataLinkModule
}),
- {ok, FullSvcName}.
+
+ ok.
-unregister_single_remote_service_by_name_(CompSpec, Service) ->
- ?info("service_discovery_rpc:unregister_single_remote_service_by_name_(): ~p",
- [Service]),
+unregister_single_service_(Service, DataLinkModule) ->
+ ?info("service_discovery_rpc:unregister_single_service_(): ~p:~p",
+ [DataLinkModule, Service]),
+ %% Delete any service table entries with a matching Service.
+ ets:match_delete(?SERVICE_TABLE,
+ #service_entry {
+ service = Service,
+ data_link_mod = DataLinkModule
+ }),
%% Delete any remote address table entries with a matching Service.
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ ets:match_delete(?MODULE_TABLE,
#service_entry {
service = Service,
- address = '_'
+ data_link_mod = DataLinkModule
}),
- ets:delete(?REMOTE_SERVICE_TABLE, Service),
- After = ets:foldl(fun(#service_entry { service = Svc }, Acc) ->
- [ Svc | Acc ] end,
- [], ?REMOTE_SERVICE_TABLE),
-
- ?debug("Ater removing ~p: ~p", [ Service, After ]),
-
- %% Forward to service edge so that it can inform its locally
- %% connected services.
- %% Build a list of all our local services' addresses to provide
- %% to service edge so that it knows where to send the,
- LocalSvcAddresses =
- ets:foldl(fun(#service_entry { address = LocalAddress }, Acc) ->
- [ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- %% Call service edge with local addresses (sorted and de-duped) and
- %% the services to register.
- notify_subscribers(unavailable, [Service], fixme, fixme),
-
- service_edge_rpc:unregister_remote_services(CompSpec,
- [[Service]],
- lists:usort(LocalSvcAddresses)),
-
-
ok.
+%%
+%% Return all modules that can currently route the provided service.
+%%
+get_modules_by_service_(Service) ->
-%% Loop through multiple services and remove them one by one
-unregister_remote_services_by_name_(CompSpec, Services) ->
- [ unregister_single_remote_service_by_name_(CompSpec, Svc) || Svc <- Services],
- ok.
-
-
-
-get_services_(Table) ->
- Services = ets:foldl(fun(#service_entry {service = ServiceName,
- address = ServiceAddr}, Acc) ->
- [ { ServiceName, ServiceAddr } | Acc ] end,
- [], Table),
-
- ?debug("service_discovery_rpc:get_services_(): ~p", [ Services ]),
- Services.
+ ModMatch = ets:lookup(?SERVICE_TABLE, Service),
+
+ ModNames = lists:foldl(fun(#service_entry {
+ data_link_mod = Mod
+ }, Acc) ->
+ [ Mod | Acc ]
+ end, [], ModMatch),
+
+
-get_addresses_(Table) ->
- AddrList = ets:foldl(fun(#service_entry {address = Addr}, Acc)
- when Addr =:= unavailable ->
- Acc; %% Don't report if service is not active
+ ?debug("service_discovery_rpc:get_modules_by_service_(): ~p -> ~p", [ Service, ModNames ]),
+ ModNames.
- %% We have an active network address
- (#service_entry {address = Addr}, Acc) ->
- %% Avoid duplicates
- case lists:keyfind(Addr, 1, Acc) of
- false ->[ Addr | Acc ];
- _ -> Acc
- end
- end, [], Table),
- %% Return a dup-scrubbed list.
- Addresses = sets:to_list(sets:from_list(AddrList)),
- ?debug("service_discovery_rpc:get_addresses(~p): ~p", [ Table, Addresses ]),
- Addresses.
+get_services_by_module_(Module) ->
+ SvcMatch = ets:lookup(?MODULE_TABLE, Module),
+
+ SvcNames = lists:foldl(fun(#service_entry {
+ service = Svc
+ }, Acc) ->
+ [ Svc | Acc ]
+ end, [], SvcMatch),
+
+
-resolve_service_(Table, Service) ->
- case ets:lookup(Table, Service) of
- %% We found a service entry, report it back
- [#service_entry { address = Address }] ->
- ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> ~p",
- [ Table, Service, Address ]),
- {ok, Address };
+ ?debug("service_discovery_rpc:get_services_by_module_(): ~p -> ~p", [ Module, SvcNames ]),
+ SvcNames.
- %% We did not find a service entry, check statically configured nodes.
- [] ->
- ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> Not Found",
- [ Table, Service ]),
- not_found
- end.
@@ -790,17 +390,12 @@ convert_json_services(Services) ->
JSONServices.
-get_subscribers([], St) ->
- [];
-
-
%% jlr.com/abc
-notify_subscribers(_Available, [], _DataLinkModule, _Address) ->
+notify_subscribers(_CompSpec, _Available, [], _DataLinkModule) ->
ok;
-
-notify_subscribers(Available, [ Service | Rem], DataLinkModule, Address) ->
- %% ?NOTIFICATION_SUBS_TABLE contains services that
+notify_subscribers(CompSpec, Available, [ Service | Rem], DataLinkModule) ->
+ %% ?NOTIFICATION_TABLE contains services that
%% should be matched against the given service.
%% If there is a match, we notify the associated module
@@ -810,31 +405,21 @@ notify_subscribers(Available, [ Service | Rem], DataLinkModule, Address) ->
end,
%% Retrieve all modules subscribing to a specific service.
- case ets:lookup(?NOTIFICATION_SUBS_TABLE, Service) of
- '$end_of_table' ->
+ case ets:lookup(?NOTIFICATION_TABLE, Service) of
+ [] ->
ok;
- #notify_subscribers { modules = Modules } ->
+ [#notification_subs { modules = Modules }] ->
%% Notify each subscriber of the given service.
- [ Module:Fun(Service, DataLinkModule, Address) || Module <- Modules],
+ [ Module:Fun(CompSpec, Service, DataLinkModule) || Module <- Modules],
ok
end,
%% Move on to remaining subscribers.
- notify_subscribers(Address, Rem, Address).
-
+ notify_subscribers(CompSpec, Available, Rem, DataLinkModule).
-notify_on_existing_service(Service, Module) ->
- case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of
- not_found ->
- case resolve_service(?REMOTE_SERVICE_TABLE, Service) of
- not_found -> not_found;
- { ok, { DataLinkModule, Address }} ->
- Module:service_available(Service, DataLinkModule, Address),
- ok
- end;
-
- {ok, Address } ->
- Module:service_available(Service, local, Address)
- end
+notify_on_existing_service(CompSpec, Service, SubsMod) ->
+ Modules = ets:lookup(?SERVICE_TABLE, Service),
+ [ SubsMod:service_available(CompSpec, Service, Mod) || Mod <- Modules],
+ ok.
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index b101921..bdd32ac 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -27,14 +27,17 @@
terminate/2,
code_change/3]).
--export([register_remote_services/3,
- unregister_remote_services/3,
- handle_remote_message/7,
+-export([handle_remote_message/7,
handle_local_timeout/3]).
-export([start_json_server/0,
start_websocket/0]).
+%% Invoked by service discovery
+%% FIXME: Should be rvi_service_discovery behavior
+-export([service_available/3,
+ service_unavailable/3]).
+
%%-include_lib("lhttpc/include/lhttpc.hrl").
-include_lib("lager/include/log.hrl").
@@ -49,6 +52,14 @@
cs = #component_spec{}
}).
+-define(SERVICE_TABLE, rvi_local_services).
+
+-record(service_entry, {
+ service = "", %% Servie handled by this entry.
+ url = undefined %% URL where the service can be reached.
+ }).
+
+
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -65,6 +76,9 @@ init([]) ->
?notice("---- Service Edge URL: ~p", [ URL ]),
?notice("---- Node Service Prefix: ~s", [ rvi_common:local_service_prefix()]),
+ ets:new(?SERVICE_TABLE, [ set, public, named_table,
+ { keypos, #service_entry.service }]),
+
{ok, #st {
cs = CompSpec
}}.
@@ -85,9 +99,6 @@ start_json_server() ->
end.
-
-
-
start_websocket() ->
%%
%% Fire up the websocket subsystem, if configured
@@ -114,223 +125,82 @@ start_websocket() ->
end.
-%% Invoked by schedule_rpc.
-%% A message originated from a locally connected service
-%% has timed out
-handle_local_timeout(CompSpec, Service, TransID) ->
- rvi_common:notification(service_edge, ?SERVER, handle_local_timeout,
- [ { service, Service},
- { transaction_id, TransID} ],
- CompSpec).
-
-
-register_remote_services(CompSpec, Service, LocalServiceAddresses) ->
- rvi_common:notification(service_edge, ?SERVER, register_remote_services,
- [ { service, Service } ,
- { local_service_addresses, LocalServiceAddresses } ],
- CompSpec).
-
-
-unregister_remote_services(CompSpec, Services, LocalServiceAddresses) ->
- rvi_common:notification(service_edge, ?SERVER, unregister_remote_services,
- [ { services, Services },
- { local_service_addresses, LocalServiceAddresses }],
- CompSpec).
-
-%%
-%% Handle a message, delivered from a remote node through protocol, that is
-%% to be forwarded to a locally connected service.
-%%
-handle_remote_message(CompSpec, ServiceName, Timeout, NetworkAddress,
- Parameters, Signature, Certificate) ->
- rvi_common:notification(service_edge, ?SERVER, handle_remote_message,
- [ { service, ServiceName },
- { timeout, Timeout },
- { network_address, NetworkAddress },
- { parameters, Parameters },
- { signature, Signature },
- { certificate, Certificate } ],
- CompSpec).
-
-
-
-
-
-
-%% Announces the services listed in Services with all
-%% local services listed under LocalServices
-%% SkipAddress is a single address that, if found in LocalServiceAddresses,
-%% will not receive an announcement.
-%% This is to avoid that a local service registering itself will get a callback
-%% about its own availability.
-
-announce_service_availability(Cmd, LocalServiceAddresses, Services) ->
- announce_service_availability(Cmd, LocalServiceAddresses, Services, undefined).
-
-announce_service_availability(Cmd, LocalServiceAddresses, Services, SkipAddress) ->
- ?info("service_edge_rpc:announce_service_availability(~p, ~p, ~p): Called.",
- [ Cmd, LocalServiceAddresses, Services ]),
-
- lists:map(fun(LocalServiceAddress) when LocalServiceAddress =:= SkipAddress ->
- ok;
-
- (LocalServiceAddress) ->
- dispatch_to_local_service(LocalServiceAddress, Cmd,
- {struct, [ { services, { array, [Services ]}}]})
- end, LocalServiceAddresses),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ] }.
-
-
-
-%%
-%% Depending on the format of NetworkAddress
-%% Dispatch to websocket or JSON-RPC server
-%% FIXME: Should be a pluggable setup where
-%% different dispatchers are triggered depending
-%% on prefix in NetworkAddress
-%%
-flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) ->
- flatten_ws_args( List ++ T, Acc);
+%% Invoked by service_discovery to announce service availability
+%% Must be handled either as a JSON-RPC call or a gen_server call.
+service_available(CompSpec, Service, DataLinkModule) ->
+ rvi_common:notify(service_edge, ?MODULE,
+ service_available,
+ [{ service, Service },
+ { data_link_module, DataLinkModule }], CompSpec).
-flatten_ws_args([{ Key, Val}| T], Acc ) ->
- NKey = case is_atom(Key) of
- true -> atom_to_list(Key);
- false -> Key
- end,
- NVal = flatten_ws_args(Val),
+service_unavailable(CompSpec, Service, DataLinkModule) ->
+ rvi_common:notify(service_edge, ?MODULE,
+ service_unavailable,
+ [{ service, Service },
+ { data_link_module, DataLinkModule }], CompSpec).
- flatten_ws_args(T, [ NKey, NVal] ++ Acc);
-flatten_ws_args([], Acc) ->
- Acc;
-
+%% Websocket interface
+wse_register_service(Ws, Service ) ->
+ ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]),
+ gen_server:call(?SERVER, { rvi, register_local_service, [ Service, "ws:" ++ pid_to_list(Ws)]}),
+ { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
-flatten_ws_args(Other, []) ->
- Other;
+wse_unregister_service(Ws, Service ) ->
+ ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, Service ]),
+ gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service ]}),
+ { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
-flatten_ws_args(Other, Acc) ->
- [ Other | Acc ].
+wse_get_available_services(_Ws ) ->
+ ?debug("service_edge_rpc:wse_get_available_services()"),
+ [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
+ { ok, [ { status, rvi_common:json_rpc_status(ok)},
+ { services, Services}] }.
-flatten_ws_args(Args) ->
- flatten_ws_args(Args, []).
+wse_message(Ws, ServiceName, Timeout, JSONParameters) ->
+ %% Parameters are delivered as JSON. Decode into tuple
+ { ok, Parameters } = exo_json:decode_string(JSONParameters),
+ ?debug("service_edge_rpc:wse_message(~p) ServiceName: ~p", [ Ws, ServiceName ]),
+ ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]),
+ ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]),
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
- [{ services, Services}] ) ->
- ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "services_available",
- [ "services", Services ]),
- ok;
+ [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
+ [ ServiceName, Timeout, Parameters]}),
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
- [{ services, Services}] ) ->
- ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "services_unavailable",
- [ "services", Services ]),
- ok;
+ { ok, [ { status, rvi_common:json_rpc_status(Res) },
+ { transaction_id, TID} ] }.
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
- [{ service_name, SvcName}, { parameters, Args}] ) ->
- ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]),
- wse:call(list_to_pid(WSPidStr), wse:window(),
- "message",
- [ "service_name", SvcName ] ++ flatten_ws_args(Args)),
- ok;
+%% Deprecated
+wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) ->
+ wse_message(Ws, ServiceName, Timeout, JSONParameters).
-%% Dispatch to regular JSON-RPC over HTTP.
-dispatch_to_local_service(NetworkAddress, Command, Args) ->
- CmdStr = atom_to_list(Command),
- Res = rvi_common:send_json_request(NetworkAddress, CmdStr, Args),
- ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]),
- ?debug("dispatch_to_local_service(): Args: ~p",[ Args]),
- ?debug("dispatch_to_local_service(): Result: ~p",[ Res]),
- Res.
-%% Forward a message to a specific locally connected service.
-%% Called by forward_message_to_local_service/2.
+%% Invoked by locally connected services.
+%% Will always be routed as JSON-RPC since that, and websocket,
+%% are the only access paths in.
%%
-forward_message_to_local_service(ServiceName, NetworkAddress, Parameters, _CompSpec) ->
- ?debug("service_edge:forward_to_local(): URL: ~p", [NetworkAddress]),
- ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]),
-
- %%
- %% Strip our node prefix from service_name so that
- %% the service receiving the JSON rpc call will have
- %% a service_name that is identical to the service name
- %% it registered with.
- %%
- SvcName = string:substr(ServiceName,
- length(rvi_common:local_service_prefix())),
-
- %% Deliver the message to the local service, which can
- %% be either a wse websocket, or a regular HTTP JSON-RPC call
- case rvi_common:get_request_result(
- dispatch_to_local_service(NetworkAddress,
- message,
- {struct, [ { service_name, SvcName },
- { parameters, Parameters }]})) of
-
- %% Request delivered.
- %% -1 is transaction ID.
- { ok, _Result } ->
- [ ok, -1 ];
-
- %% status returned was an error code.
- { Other, _Result } ->
- ?warning("service_edge:forward_to_local(): ~p:~p Failed: ~p.",
- [NetworkAddress, ServiceName, Other]),
- [not_found, -1];
-
- Other ->
- ?warning("service_edge:forward_to_local(): ~p:~p Unknown error: ~p.",
- [NetworkAddress, ServiceName, Other]),
- [internal, -1]
- end.
-
-%% A message is to targeting a service that is connected to the local RVI
-%% node. We can just bounce the messsage straight over to the target service.
-forward_message_to_local_service(ServiceName, Parameters, CompSpec) ->
- %%
- %% Resolve the local service name to an URL that we can send the
- %% request to
- %%
- ?debug("service_edge:forward_to_local(): service_name: ~p", [ServiceName]),
-
- case service_discovery_rpc:resolve_local_service(CompSpec, ServiceName) of
- [ ok, NetworkAddress] ->
- forward_message_to_local_service(ServiceName, NetworkAddress, Parameters, CompSpec);
-
- %% Local service could not be resolved to an URL
- [ not_found ] ->
- ?info("service_edge_rpc:forward_message_to_local() Not found: ~p",
- [ ServiceName ]),
- [not_found, -1];
-
- Err ->
- ?debug("service_edge_rpc:local_msg() Failed at service discovery: ~p",
- [ Err ]),
- [internal, -1]
- end.
-
-
-
-
handle_rpc("register_service", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
+ {ok, URL} = rvi_common:get_json_element(["network_address"], Args),
[ok, FullSvcName ] = gen_server:call(?SERVER,
{ rvi, register_local_service,
- [ Service, Address]}),
+ [ Service, URL]}),
{ok, [ {status, rvi_common:json_rpc_status(ok) }, { service, FullSvcName }]};
+
+handle_rpc("unregister_service", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service]}),
+ {ok, [ { status, rvi_common:json_rpc_status(ok) }]};
+
+
handle_rpc("get_available_services", _Args) ->
[ Status, Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
?debug("get_available_services(): ~p ~p", [ Status, Services ]),
@@ -344,25 +214,39 @@ handle_rpc("message", Args) ->
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
[ ServiceName, Timeout, Parameters]}),
- io:format("MESSGE: ~p, ~p", [ Res, TID]),
+
{ok, [ { status, rvi_common:json_rpc_status(Res) },
{ transaction_id, TID } ]};
-
-handle_rpc("unregister_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service]}),
- {ok, [ { status, rvi_common:json_rpc_status(ok) }]};
-
handle_rpc(Other, _Args) ->
?warning("service_edge_rpc:handle_rpc(~p): unknown command", [ Other ]),
{ok,[ { status, rvi_common:json_rpc_status(invalid_command)} ]}.
+handle_notification("service_available", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ ?debug("service_edge:service_available(): service: ~p", [ Service]),
+ ?debug("service_edge:service_available(): data_link: ~p", [ DataLinkModule]),
+
+ gen_server:cast(?SERVER, { rvi, service_available,
+ [ Service, DataLinkModule ]}),
+
+ ok;
+handle_notification("service_unavailable", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ ?debug("service_edge:service_unavailable(): service: ~p", [ Service]),
+ ?debug("service_edge:service_unavailable(): data_link: ~p", [ DataLinkModule]),
+
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
+ [ Service, DataLinkModule ]}),
+
+ ok;
+
handle_notification("handle_remote_message", Args) ->
{ ok, ServiceName } = rvi_common:get_json_element(["service_name"], Args),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Args),
- { ok, NetworkAddress } = rvi_common:get_json_element(["network_address"], Args),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Args),
{ ok, Certificate } = rvi_common:get_json_element(["certificate"], Args),
{ ok, Signature } = rvi_common:get_json_element(["signature"], Args),
@@ -370,7 +254,6 @@ handle_notification("handle_remote_message", Args) ->
[
ServiceName,
Timeout,
- NetworkAddress,
Parameters,
Certificate,
Signature
@@ -381,23 +264,6 @@ handle_notification("handle_remote_message", Args) ->
-handle_notification("register_remote_services", Args) ->
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args),
-
- gen_server:cast(?SERVER, { rvi, register_remote_services,
- [ Services, LocalServiceAddresses]}),
-
- ok;
-
-handle_notification("unregister_remote_services", Args) ->
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- {ok, LocalServiceAddresses} = rvi_common:get_json_element(["local_service_addresses"], Args),
-
- gen_server:cast(?SERVER, { rvi, unregister_remote_services,
- [ Services, LocalServiceAddresses]}),
- ok;
-
%% JSON-RPC entry point
%% Called by local exo http server
handle_notification("handle_local_timeout", Args) ->
@@ -413,42 +279,6 @@ handle_notification(Other, _Args) ->
ok.
-%% Websocket iface
-wse_register_service(Ws, Service ) ->
- ?debug("service_edge_rpc:wse_register_service(~p) service: ~p", [ Ws, Service ]),
- gen_server:cast(?SERVER, { rvi, register_local_service, [ Service, "ws:" ++ pid_to_list(Ws)]}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
-
-wse_unregister_service(Ws, Service ) ->
- ?debug("service_edge_rpc:wse_unregister_service(~p) service: ~p", [ Ws, Service ]),
- gen_server:call(?SERVER, { rvi, unregister_local_service, [ Service ]}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}.
-
-
-wse_get_available_services(_Ws ) ->
- ?debug("service_edge_rpc:wse_get_available_services()"),
- [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
- { ok, [ { status, rvi_common:json_rpc_status(ok)},
- { services, Services}] }.
-
-
-wse_message(Ws, ServiceName, Timeout, JSONParameters) ->
- %% Parameters are delivered as JSON. Decode into tuple
- { ok, Parameters } = exo_json:decode_string(JSONParameters),
- ?debug("service_edge_rpc:wse_message(~p) ServiceName: ~p", [ Ws, ServiceName ]),
- ?debug("service_edge_rpc:wse_message(~p) Timeout: ~p", [ Ws, Timeout]),
- ?debug("service_edge_rpc:wse_message(~p) Parameters: ~p", [ Ws, Parameters ]),
-
- [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ ServiceName, Timeout, Parameters]}),
-
- { ok, [ { status, rvi_common:json_rpc_status(Res) },
- { transaction_id, TID} ] }.
-
-%% Deprecated
-wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) ->
- wse_message(Ws, ServiceName, Timeout, JSONParameters).
-
@@ -458,28 +288,20 @@ wse_message(Ws, ServiceName, Timeout, JSONParameters, _CallingService) ->
%% the only calls invoked by other components, and not the locally
%% connected services that uses the same HTTP port to transmit their
%% register_service, and message calls.
-handle_call({ rvi, register_local_service, [Service, ServiceAddress] }, _From, St) ->
+handle_call({ rvi, register_local_service, [Service, URL] }, _From, St) ->
?debug("service_edge_rpc:register_local_service(): service: ~p ", [Service]),
- ?debug("service_edge_rpc:register_local_service(): address: ~p ", [ServiceAddress]),
+ ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
- [ok, FullSvcName ] =
- service_discovery_rpc:register_local_service(St#st.cs,
- Service,
- ServiceAddress),
-
- SvcString = rvi_common:local_service_to_string(Service),
+ FullSvcName = rvi_common:local_service_to_string(Service),
- %% Announce the new service to all connected nodes
- data_link_bert_rpc_rpc:
- announce_available_local_service(St#st.cs, SvcString),
-
- %% Retrieve addresses of all locally registered services.
- [ ok, AnnounceAddresses ] =
- service_discovery_rpc:get_local_network_addresses(St#st.cs),
-
- announce_service_availability(services_available, AnnounceAddresses,
- [FullSvcName], ServiceAddress),
+ ets:insert(?SERVICE_TABLE, #service_entry {
+ service = FullSvcName,
+ url = URL }),
+ %% Register with service discovery, will trigger callback to service_available()
+ %% that forwards the registration to other connected services.
+ service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local),
+
%% Return ok.
{ reply, [ ok, FullSvcName ], St };
@@ -487,16 +309,12 @@ handle_call({ rvi, register_local_service, [Service, ServiceAddress] }, _From, S
handle_call({ rvi, unregister_local_service, [Service] }, _From, St) ->
?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [Service]),
- service_discovery_rpc:unregister_local_service(St#st.cs, Service),
- data_link_bert_rpc_rpc:
- announce_unavailable_local_service(St#st.cs, Service),
+ ets:delete(?SERVICE_TABLE, Service),
- [ ok, AnnounceAddresses ] = service_discovery_rpc:
- get_local_network_addresses(St#st.cs),
- %% Send out an announcement to all locally connected services, but skip
- %% the one that made the registration call
- announce_service_availability(services_unavailable, AnnounceAddresses, Service),
+ %% Register with service discovery, will trigger callback to service_available()
+ %% that forwards the registration to other connected services.
+ service_discovery_rpc:unregister_services(St#st.cs, [Service], local),
%% Return ok.
{ reply, [ ok ], St };
@@ -505,8 +323,7 @@ handle_call({ rvi, unregister_local_service, [Service] }, _From, St) ->
handle_call({rvi, get_available_services, []}, _From, St) ->
?debug("service_edge_rpc:get_available_services()"),
- {reply, service_discovery_rpc:get_all_services(St#st.cs), St};
-
+ {reply, service_discovery_rpc:get_services(St#st.cs), St};
handle_call({ rvi, handle_local_message,
[ServiceName, Timeout, Parameters] }, _From, St) ->
@@ -527,11 +344,11 @@ handle_call({ rvi, handle_local_message,
%% Check if this is a local service by trying to resolve its service name.
%% If successful, just forward it to its service_name.
%%
- case service_discovery_rpc:resolve_local_service(St#st.cs, ServiceName) of
- [ ok, NetworkAddress] -> %% ServiceName is local. Forward message
+ case ets:lookup(?SERVICE_TABLE, ServiceName) of
+ [ #service_entry { url = URL } ] -> %% ServiceName is local. Forward message
?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."),
- Res = forward_message_to_local_service(ServiceName,
- NetworkAddress,
+ Res = forward_message_to_local_service(URL,
+ ServiceName,
Parameters,
St#st.cs),
{ reply, Res , St};
@@ -554,11 +371,20 @@ handle_call(Other, _From, St) ->
{ reply, [ invalid_command ], St}.
+handle_cast({rvi, service_available, [Service, _DataLinkModule] }, St) ->
+ announce_service_availability(available, Service),
+ { noreply, St };
+
+
+handle_cast({rvi, service_unavailable, [Service, _DataLinkModule] }, St) ->
+ announce_service_availability(unavailable, Service),
+ { noreply, St };
+
+
handle_cast({rvi, handle_remote_message,
[
ServiceName,
Timeout,
- NetworkAddress,
Parameters,
Certificate,
Signature
@@ -567,36 +393,33 @@ handle_cast({rvi, handle_remote_message,
?debug("service_edge:remote_msg(): service_name: ~p", [ServiceName]),
?debug("service_edge:remote_msg(): timeout: ~p", [Timeout]),
?debug("service_edge:remote_msg(): parameters: ~p", [Parameters]),
- ?debug("service_edge:remote_msg(): parameters: ~p", [NetworkAddress]),
?debug("service_edge:remote_msg(): signature: ~p", [Signature]),
?debug("service_edge:remote_msg(): certificate: ~p", [Certificate]),
- case
- authorize_rpc:authorize_remote_message(St#st.cs,
- ServiceName,
- Certificate,
- Signature) of
- [ ok ] ->
- forward_message_to_local_service(ServiceName, Parameters, St#st.cs),
- { noreply, St};
-
-
- %% Authorization failed.
- [ Err ] ->
- ?warning(" service_edge:remote_msg(): Authorization failed: ~p. DROPPED", [Err]),
- {noreply, St}
- end;
-
-handle_cast({ rvi, register_remote_services,
- [ Services, LocalServiceAddresses ]}, State) ->
- announce_service_availability(services_available, LocalServiceAddresses, Services),
- { noreply, State };
-
-handle_cast({ rvi, unregister_remote_services,
- [Services, LocalServiceAddresses]}, State) ->
+ %% Check if this is a local message.
+ case ets:lookup(?SERVICE_TABLE, ServiceName) of
+ [ #service_entry { url = URL }] -> %% This is a local message
+ case authorize_rpc:authorize_remote_message(St#st.cs,
+ ServiceName,
+ Certificate,
+ Signature) of
+ [ ok ] ->
+ forward_message_to_local_service(URL, ServiceName,
+ Parameters, St#st.cs),
+ { noreply, St};
+
+ [ _ ] ->
+ ?warning("service_entry:remote_msg(): Failed to authenticate ~p",
+ [ServiceName]),
+ { noreply, St}
+ end;
+ [] ->
+ ?notice("service_entry:remote_msg(): Service Disappeared ~p",
+ [ServiceName]),
+ { noreply, St}
+
+ end;
- announce_service_availability(services_unavailable, LocalServiceAddresses, Services),
- { noreply, State };
handle_cast({ rvi, handle_local_timeout, [Service, TransactionID] }, St) ->
%% FIXME: Should be forwarded to service.
@@ -617,3 +440,170 @@ terminate(_Reason, _St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
+
+
+
+%% Invoked by schedule_rpc.
+%% A message originated from a locally connected service
+%% has timed out
+handle_local_timeout(CompSpec, Service, TransID) ->
+ rvi_common:notification(service_edge, ?SERVER, handle_local_timeout,
+ [ { service, Service},
+ { transaction_id, TransID} ],
+ CompSpec).
+
+%%
+%% Handle a message, delivered from a remote node through protocol, that is
+%% to be forwarded to a locally connected service.
+%%
+handle_remote_message(CompSpec, ServiceName, Timeout, NetworkAddress,
+ Parameters, Signature, Certificate) ->
+ rvi_common:notification(service_edge, ?SERVER, handle_remote_message,
+ [ { service, ServiceName },
+ { timeout, Timeout },
+ { network_address, NetworkAddress },
+ { parameters, Parameters },
+ { signature, Signature },
+ { certificate, Certificate } ],
+ CompSpec).
+
+
+
+
+
+%%
+%% Depending on the format of NetworkAddress
+%% Dispatch to websocket or JSON-RPC server
+%% FIXME: Should be a pluggable setup where
+%% different dispatchers are triggered depending
+%% on prefix in NetworkAddress
+%%
+
+flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) ->
+ flatten_ws_args( List ++ T, Acc);
+
+
+flatten_ws_args([{ Key, Val}| T], Acc ) ->
+ NKey = case is_atom(Key) of
+ true -> atom_to_list(Key);
+ false -> Key
+ end,
+
+ NVal = flatten_ws_args(Val),
+
+ flatten_ws_args(T, [ NKey, NVal] ++ Acc);
+
+flatten_ws_args([], Acc) ->
+ Acc;
+
+
+flatten_ws_args(Other, []) ->
+ Other;
+
+flatten_ws_args(Other, Acc) ->
+ [ Other | Acc ].
+
+
+flatten_ws_args(Args) ->
+ flatten_ws_args(Args, []).
+
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
+ [{ services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_available",
+ [ "services", Services ]),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
+ [{ services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_unavailable",
+ [ "services", Services ]),
+ ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
+ [{ service_name, SvcName}, { parameters, Args}] ) ->
+ ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "message",
+ [ "service_name", SvcName ] ++ flatten_ws_args(Args)),
+ ok;
+
+%% Dispatch to regular JSON-RPC over HTTP.
+dispatch_to_local_service(URL, Command, Args) ->
+ CmdStr = atom_to_list(Command),
+ Res = rvi_common:send_json_request(URL, CmdStr, Args),
+ ?debug("dispatch_to_local_service(): Command: ~p",[ CmdStr]),
+ ?debug("dispatch_to_local_service(): Args: ~p",[ Args]),
+ ?debug("dispatch_to_local_service(): URL: ~p",[ URL]),
+ ?debug("dispatch_to_local_service(): Result: ~p",[ Res]),
+ Res.
+
+
+%% Forward a message to a specific locally connected service.
+%% Called by forward_message_to_local_service/2.
+%%
+forward_message_to_local_service(URL,ServiceName, Parameters, _CompSpec) ->
+ ?debug("service_edge:forward_to_local(): URL: ~p", [URL]),
+ ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]),
+
+ %%
+ %% Strip our node prefix from service_name so that
+ %% the service receiving the JSON rpc call will have
+ %% a service_name that is identical to the service name
+ %% it registered with.
+ %%
+ SvcName = string:substr(ServiceName,
+ length(rvi_common:local_service_prefix())),
+
+ %% Deliver the message to the local service, which can
+ %% be either a wse websocket, or a regular HTTP JSON-RPC call
+ case rvi_common:get_request_result(
+ dispatch_to_local_service(URL,
+ message,
+ {struct, [ { service_name, SvcName },
+ { parameters, Parameters }]})) of
+
+ %% Request delivered.
+ %% -1 is transaction ID.
+ { ok, _Result } ->
+ [ ok, -1 ];
+
+ %% status returned was an error code.
+ { Other, _Result } ->
+ ?warning("service_edge:forward_to_local(): ~p:~p Failed: ~p.",
+ [URL, ServiceName, Other]),
+ [not_found, -1];
+
+ Other ->
+ ?warning("service_edge:forward_to_local(): ~p:~p Unknown error: ~p.",
+ [URL, ServiceName, Other]),
+ [internal, -1]
+ end.
+
+
+announce_service_availability(Available, Service) ->
+ Cmd = case Available of
+ available -> services_available;
+ unavailable -> services_unavailable
+ end,
+
+ ets:foldl(
+ %% Notify if this is not the originating service.
+ fun(#service_entry {
+ service = ServiceEntry,
+ url = URL }, _Acc) when
+ ServiceEntry =/= Service ->
+
+ dispatch_to_local_service(URL, Cmd,
+ {struct, [ { services,
+ { array, [Service]}}]}),
+ [];
+
+ %% This is the originating service regsitering itself. Ignore.
+ (_, _) -> []
+
+ end, [], ?SERVICE_TABLE).
diff --git a/python/rvi_call.py b/python/rvi_call.py
index 9c8d1c2..efe54e6 100755
--- a/python/rvi_call.py
+++ b/python/rvi_call.py
@@ -15,10 +15,11 @@ import sys
from rvilib import RVI
import threading
import time
-
+import getopt
def usage():
- print "Usage:", sys.argv[0], " RVI-node service key=val ..."
- print " RVI-node DNS name or IP of host running RVI"
+ print "Usage:", sys.argv[0], "[-n RVI-node] service key=val ..."
+ print " RVI-node DNS name or IP of host running RVI. "
+ print " default: http://localhost:8801"
print " service Service to invoke in RVI."
print " key=val Named arguments to provide to service."
print
@@ -32,21 +33,28 @@ def usage():
#
# Check that we have the correct arguments
#
-if len(sys.argv) <3:
- usage()
+opts, args= getopt.getopt(sys.argv[1:], "n:")
+
-progname = sys.argv[0]
-rvi_node = sys.argv[1]
-service = sys.argv[2]
-args = []
-i=3
+rvi_node = "http://localhost:8801"
+for o, a in opts:
+ if o == "-n":
+ rvi_node = a
+ else:
+ usage()
+
+if len(args) < 1:
+ usage()
# Construct a dictionary from the provided paths.
-while i < len(sys.argv):
- print sys.argv[i]
- [k, v] = sys.argv[i].split('=')
- args = args + [{ k: v}]
- i = i + 1
+i = 0
+service = args[0]
+rvi_args = []
+for i in args[1:]:
+ print i
+ [k, v] = i.split('=')
+ rvi_args = rvi_args + [{ k: v}]
+
@@ -56,17 +64,11 @@ while i < len(sys.argv):
#
rvi = RVI(rvi_node)
-
print "RVI Node: ", rvi_node
print "Service: ", service
-print "args: ", args
+print "args: ", rvi_args
#
# Send the messge.
#
-rvi.message(service, args)
-
-
-
-
-
+rvi.message(service, rvi_args)
diff --git a/python/rvi_get_services.py b/python/rvi_get_services.py
index 70d32de..eeadcdc 100755
--- a/python/rvi_get_services.py
+++ b/python/rvi_get_services.py
@@ -20,7 +20,8 @@ def usage():
print "Return the name of all available services that can be reached"
print "through an RVI node."
print
- print "Usage:", sys.argv[0], " RVI-node"
+ print "Usage:", sys.argv[0], " [RVI-node]"
+ print "Default RVI node is http://127.0.0.1:8801"
print
print "Example: ./callrvi.py http://rvi1.nginfotpdx.net:8801"
print
@@ -30,12 +31,15 @@ def usage():
#
# Check that we have the correct arguments
#
-if len(sys.argv) <2:
+if len(sys.argv) != 1 and len(sys.argv) != 2:
usage()
progname = sys.argv[0]
-rvi_node = sys.argv[1]
+if len(sys.argv) == 2:
+ rvi_node = sys.argv[1]
+else:
+ rvi_node = "http://localhost:8801"
#
# Setup an outbound JSON-RPC connection to the backend RVI node
diff --git a/python/rvi_service.py b/python/rvi_service.py
index e1b705c..2c3271f 100755
--- a/python/rvi_service.py
+++ b/python/rvi_service.py
@@ -13,10 +13,12 @@
#
import sys
from rvilib import RVI
+import getopt
def usage():
- print "Usage:", sys.argv[0], "<rvi_url> <service_name>"
- print " <rvi_url> URL of Service Edge on a local RVI node"
+ print "Usage:", sys.argv[0], "[-n <rvi_url>] <service_name>"
+ print " <rvi_url> URL of Service Edge on a local RVI node."
+ print " Default: http://localhost:8801"
print " <service_name> URL of Service to register"
print
print "The RVI Service Edge URL can be found in"
@@ -26,7 +28,7 @@ def usage():
print "The Service Edge URL is also logged as a notice when the"
print "RVI node is started."
print
- print "Example: ./rvi_service.py http://rvi1.nginfotpdx.net:8801 /test/some_service"
+ print "Example: ./rvi_service.py /test/some_service http://rvi1.nginfotpdx.net:8801"
sys.exit(255)
@@ -68,11 +70,23 @@ def services_unavailable(**args):
return ['ok']
-if len(sys.argv) != 3:
+#
+# Check that we have the correct arguments
+#
+opts, args= getopt.getopt(sys.argv[1:], "n:")
+
+rvi_node_url = "http://localhost:8801"
+for o, a in opts:
+ if o == "-n":
+ rvi_node_url = a
+ else:
+ usage()
+
+if len(args) != 1:
usage()
-# Grab the URL to use
-[ progname, rvi_node_url, service_name ] = sys.argv
+service_name = args[0]
+
# Setup a connection to the local RVI node
rvi = RVI(rvi_node_url)
diff --git a/python/rvilib.py b/python/rvilib.py
index f676298..647e333 100644
--- a/python/rvilib.py
+++ b/python/rvilib.py
@@ -73,6 +73,10 @@ class RVI(SimpleJSONRPCServer):
# in an RVI network to access your service.
#
def register_service(self, service_name, function):
+ # Add a prefixing slash if necessary
+ if service_name[0] != '/':
+ service_name = '/' + service_name
+
# Register service_name within SimpleJSONRPCServer so that
# when it gets invoked with the given URL suffic, it will call 'function'.
#
diff --git a/rebar.config b/rebar.config
index ccdb50f..a67e822 100644
--- a/rebar.config
+++ b/rebar.config
@@ -8,8 +8,8 @@
{sub_dirs, ["rel",
"components/rvi_common",
"components/authorize",
- "components/data_link_bert_rpc",
- "components/protocol",
+ "components/dlink_tcp",
+ "components/proto_bert",
"components/schedule",
"components/service_discovery/",
"components/service_edge"
diff --git a/rvi_sample.config b/rvi_sample.config
index b5738d3..9815697 100644
--- a/rvi_sample.config
+++ b/rvi_sample.config
@@ -47,8 +47,8 @@
service_discovery,
authorize,
schedule,
- data_link_bert_rpc,
- protocol ]},
+ dlink_tcp,
+ proto_bert ]},
%%
%% Custom environment settings
@@ -150,7 +150,7 @@
%% node_address should be set to "0.0.0.0:0" to inform
%% the remote node that it should not attempt to
%% connect back to self.
- { node_address, "127.0.0.1:8817" },
+ { node_address, "127.0.0.1:8807" },
%% Specify the prefix of all services that this rvi node is hosting.
%%
@@ -189,11 +189,10 @@
{ static_nodes,
[
%% rvi1.nginfotpdx.net is the JLR hosted RVI server.
- { "jlr.com/backend/", "38.129.64.13:8807" }
+ %% { "jlr.com/backend/", }
]
},
-
{ routing_rules,
[
%% Service name prefix that rules are specified for
@@ -201,7 +200,7 @@
%% Which protocol and data link pair to use when transmitting the message
%% to the targeted service. If a pair reports a failure, the next pair is tried.
[
- { bert_rpc, wifi },
+ { proto_bert_rpc, { dlink_tcp_rpc, [ { target, "38.129.64.13:8807" } ]}},
{ bert_rpc, device_3g },
{ bert_rpc, device_sms },
{ joynr, wifi },
@@ -211,15 +210,17 @@
%% Used to communicate with vehicles
{ "jlr.com/vin/",
- { bert_rpc, wifi },
- %% server_3g is augmented with hinting, provided to
- { bert_rpc, { server_3g, [ initiate_outbound ]} },
-
- %% Protocols can have hinting as well.
- %% In this case bert_rpc should only be used if the
- %% resulting message size can fit in an SMS (140 bytes).
+ [
+ { proto_bert_rpc, { dlink_tcp_rpc, [ broadcast, { interface, "wlan0" } ] } },
+ %% server_3g is augmented with hinting, provided to
+ { bert_rpc, { server_3g, [ initiate_outbound ]} },
+
+ %% Protocols can have hinting as well.
+ %% In this case bert_rpc should only be used if the
+ %% resulting message size can fit in an SMS (140 bytes).
- { { bert_rpc, [ { max_msg_size, 140 } ] } , server_sms }
+ { { bert_rpc, [ { max_msg_size, 140 } ] } , server_sms }
+ ]
}
]
},
@@ -269,19 +270,19 @@
[
%% JSON-RPC address will be translated to
%% an URL looking like this:
- %% http://127.0.0.1:8811
+ %% http://127.0.0.1:8801
%%
%% This URL is used both for communication with
%% locally connected services and for intra-component
%% communication in case the access method for
%% service_edge_rpc is specified as json_rpc.
- { json_rpc_address, { "127.0.0.1", 8811 } },
+ { json_rpc_address, { "127.0.0.1", 8801 } },
%% Websocket is used for websocket access, preferably
%% through the rvi.js package available for Javascript
%% apps in browsers and crosswalk who wants to interface
%% RVI.
- { websocket, [ { port, 8818}]}
+ { websocket, [ { port, 8808}]}
]
}
]
@@ -289,7 +290,7 @@
{ service_discovery,
[ { service_discovery_rpc, gen_server,
[
- { json_rpc_address, { "127.0.0.1", 8812 }}
+ { json_rpc_address, { "127.0.0.1", 8802 }}
]
}
]
@@ -297,7 +298,7 @@
{ schedule,
[ { schedule_rpc, json_rpc,
[
- { json_rpc_address, { "127.0.0.1", 8813 }}
+ { json_rpc_address, { "127.0.0.1", 8803 }}
]
}
]
@@ -305,29 +306,30 @@
{ authorize,
[ { authorize_rpc, gen_server,
[
- { json_rpc_address, { "127.0.0.1", 8814 } }
+ { json_rpc_address, { "127.0.0.1", 8804 } }
]
}
]
},
{ protocol,
- [ { protocol_rpc, gen_server,
+ [ { proto_bert_rpc, gen_server,
[
- { json_rpc_address, { "127.0.0.1", 8815 } }
+ { json_rpc_address, { "127.0.0.1", 8805 } }
]
}
]
},
{ data_link,
- [ { data_link_bert_rpc_rpc, gen_server,
+ [ { dlink_tcp_rpc, gen_server,
[
- { json_rpc_address, { "127.0.0.1", 8816 } },
+ { json_rpc_address, { "127.0.0.1", 8806 } },
%% Bert_rpc server specifies the port we should
%% listen to for incoming connections
%% from other rvi nodes.
%% A specific NIC address can also be specified
%% through the {ip, "192.168.0.1" } tuple.
- { bert_rpc_server, [ { port, 8817 }]}
+ { bert_rpc_server, [ { port, 8807 }]},
+ { persistent_connections, [ "38.129.64.13:8807" ]}
]
}
]
diff --git a/src/rvi.app.src b/src/rvi.app.src
index 319d1e5..b79fe80 100644
--- a/src/rvi.app.src
+++ b/src/rvi.app.src
@@ -22,8 +22,8 @@
service_edge,
service_discovery,
authorize,
- data_link_bert_rpc,
- protocol
+ dlink_tcp,
+ proto_bert
]},
{mod, { rvi_app, []}}
]}.