summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-03-13 19:41:47 -0700
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-03-13 19:41:47 -0700
commit99c5161f03df5cae9d8567c155083d0fe77d2c7c (patch)
tree6cfe4079545110ae0d42915462eda84d5c80e57c
parent635cec8520c0372dc6071644ff9523351744c3aa (diff)
parent00b9ec4b0b6c3cd05dcb8f2eb814360a3a4980c1 (diff)
downloadrvi_core-99c5161f03df5cae9d8567c155083d0fe77d2c7c.tar.gz
Merged in 0.3.1 to reverted master as a basis for develop and 0.3.2
-rw-r--r--components/authorize/src/authorize_rpc.erl4
-rw-r--r--components/data_link_bert_rpc/src/connection.erl16
-rw-r--r--components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl133
-rw-r--r--components/rvi_common/src/rvi_common.erl2
-rw-r--r--components/schedule/src/schedule.erl56
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl151
-rw-r--r--components/service_edge/src/service_edge_rpc.erl22
7 files changed, 225 insertions, 159 deletions
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index 50cd473..287c519 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -148,8 +148,8 @@ handle_rpc(Other, _Args) ->
%%
handle_call({rvi_call, authorize_local_message, Args}, _From, State) ->
{_, ServiceName} = lists:keyfind(service_name, 1, Args),
- ?info("authorize_rpc:authorize_local_message(gen_server): args: ~p", [ Args]),
- ?info("authorize_rpc:authorize_local_message(gen_server): service name: ~p", [ ServiceName]),
+ ?debug("authorize_rpc:authorize_local_message(gen_server): args: ~p", [ Args]),
+ ?debug("authorize_rpc:authorize_local_message(gen_server): service name: ~p", [ ServiceName]),
{reply, authorize_local_message(ServiceName), State};
handle_call({rvi_call, authorize_remote_message, Args}, _From, State) ->
diff --git a/components/data_link_bert_rpc/src/connection.erl b/components/data_link_bert_rpc/src/connection.erl
index aec31a8..7fd63cb 100644
--- a/components/data_link_bert_rpc/src/connection.erl
+++ b/components/data_link_bert_rpc/src/connection.erl
@@ -28,6 +28,8 @@
-export([setup/6]).
-export([send/2]).
-export([send/3]).
+-export([is_connection_up/1]).
+-export([is_connection_up/2]).
-export([terminate_connection/1]).
-export([terminate_connection/2]).
@@ -84,6 +86,19 @@ terminate_connection(IP, Port) ->
_Err -> {error, connection_not_found}
end.
+
+is_connection_up(Pid) when is_pid(Pid) ->
+ is_process_alive(Pid).
+
+is_connection_up(IP, Port) ->
+ case connection_manager:find_connection_by_address(IP, Port) of
+ {ok, Pid} ->
+ is_connection_up(Pid);
+
+ _Err ->
+ false
+ end.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -169,6 +184,7 @@ handle_cast({send, Data}, St) ->
[ ?MODULE, Data]),
gen_tcp:send(St#st.sock, term_to_binary(Data)),
+
{noreply, St};
handle_cast(_Msg, State) ->
diff --git a/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl b/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl
index 7d8b2c8..6b59352 100644
--- a/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl
+++ b/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl
@@ -20,16 +20,16 @@
terminate/2, code_change/3]).
-include_lib("lager/include/log.hrl").
+-behavior(gen_server).
-define(DEFAULT_BERT_RPC_PORT, 9999).
-define(DEFAULT_RECONNECT_INTERVAL, 5000).
-define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0").
-
+-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
-record(st, { }).
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -135,11 +135,9 @@ connect_remote(IP, Port) ->
setup_data_link(RemoteAddress, RemotePort, Service) ->
{ LocalAddress, LocalPort} = rvi_common:node_address_tuple(),
- ?info("data_link_bert:setup_data_link(): Remote Address: ~p", [ RemoteAddress]),
- ?info("data_link_bert:setup_data_link(): Remote Port: ~p", [ RemotePort]),
- ?info("data_link_bert:setup_data_link(): Local Address: ~p", [ LocalAddress]),
- ?info("data_link_bert:setup_data_link(): Local Port: ~p", [ LocalPort]),
- ?info("data_link_bert:setup_data_link(): service: ~p", [ Service]),
+ ?info("data_link_bert:setup_data_link(): Link: ~p:~p -> ~p:~p",
+ [ LocalAddress, LocalPort, RemoteAddress, RemotePort]),
+ ?info("data_link_bert:setup_data_link(): Service: ~p", [ Service]),
case connect_remote(RemoteAddress, RemotePort) of
@@ -150,39 +148,42 @@ setup_data_link(RemoteAddress, RemotePort, Service) ->
?info("data_link_bert:setup_data_link(): New connection!"),
%% Follow up with an authorize.
- ?info("data_link_bert:setup_data_link(): ---------------"),
- ?info("data_link_bert:setup_data_link(): Sending authorize()"),
- ?info("data_link_bert:setup_data_link(): ---------------"),
+ ?debug("data_link_bert:setup_data_link(): Sending authorize()"),
connection:send(Pid, { authorize,
1, LocalAddress, LocalPort, rvi_binary,
{certificate, {}}, { signature, {}} }),
{ok, [ { status, rvi_common:json_rpc_status(ok)}]};
+
{ error, _ } ->
{error, [ { status, rvi_common:json_rpc_status(not_available)}]}
end.
disconnect_data_link(RemoteAddress, RemotePort) ->
- ?info("data_link_bert:disconnect_data_link(): Remote Address: ~p", [ RemoteAddress]),
- ?info("data_link_bert:disconnect_data_link(): Remote Port: ~p", [ RemotePort]),
+ ?info("data_link_bert:disconnect_data_link(): Remote: ~p:~p", [ RemoteAddress, RemotePort]),
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
send_data(RemoteAddress, RemotePort, Data) ->
- ?info("data_link_bert:send_data(): Remote Address: ~p", [ RemoteAddress]),
- ?info("data_link_bert:send_data(): Remote Port: ~p", [ RemotePort]),
+ ?info("data_link_bert:send_data(): Remote: ~p:~p", [ RemoteAddress, RemotePort]),
%% ?info("data_link_bert:send_data(): Data: ~p", [ Data]),
Res = connection:send(RemoteAddress, RemotePort, {receive_data, Data}),
- ?info ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ]),
+ case Res of
+ ok ->
+ ?debug ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ]);
+ _ ->
+ ?info ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ])
+ end,
+
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
announce_local_service(Service, Availability) ->
- ?info("data_link_bert:announce_local_service(~p): Service ~p", [Availability, Service]),
+ ?debug("data_link_bert:announce_local_service(~p): Service: ~p", [Availability, Service]),
%% Grab our local address.
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
@@ -192,8 +193,6 @@ announce_local_service(Service, Availability) ->
case rvi_common:send_component_request(service_discovery, get_remote_network_addresses, [],
[ addresses ]) of
{ ok, _, [ Addresses ] } ->
- ?info("data_link_bert:announce_local_service(~p): Addresses ~p",
- [ Availability, Addresses]),
%% Grab our local address.
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
@@ -201,8 +200,8 @@ announce_local_service(Service, Availability) ->
%% Loop over all returned addresses
lists:map(
fun(Address) ->
- ?debug("data_link_bert:announce_local_service(~p): Sending to ~p",
- [ Availability, Address]),
+ ?info("data_link_bert:announce_local_service(~p): Announcing ~p to ~p",
+ [ Availability, Service, Address]),
%% Split the address into host and port
[ RemoteAddress, RemotePort] = string:tokens(Address, ":"),
@@ -213,9 +212,7 @@ announce_local_service(Service, Availability) ->
{service_announce, 3, Availability,
[Service], { signature, {}}}),
?debug("data_link_bert:announce_local_service(~p): Res ~p",
- [ Availability, Res]),
- ok
-
+ [ Availability, Res])
end,
Addresses),
@@ -229,7 +226,9 @@ announce_local_service(Service, Availability) ->
end.
-
+handle_socket(_FromPid, PeerIP, PeerPort, data, ping, _ExtraArgs) ->
+ ?info("data_link_bert:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]),
+ ok;
handle_socket(FromPid, PeerIP, PeerPort, data,
{ authorize,
@@ -240,13 +239,13 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
Certificate,
Signature}, _ExtraArgs) ->
- ?info("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?info("data_link_bert:authorize(): Peer Address: {~p, ~p}", [PeerIP, PeerPort ]),
- ?info("data_link_bert:authorize(): Remote Address: ~p", [ RemoteAddress ]),
- ?info("data_link_bert:authorize(): Remote Port: ~p", [ RemotePort ]),
+ ?info("data_link_bert:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
+ ?info("data_link_bert:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("data_link_bert:authorize(): Protocol: ~p", [ Protocol ]),
- ?info("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]),
- ?info("data_link_bert:authorize(): Signature: ~p", [ Signature ]),
+ ?debug("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]),
+ ?debug("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]),
+ ?debug("data_link_bert:authorize(): Signature: ~p", [ Signature ]),
+
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
@@ -258,9 +257,8 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
- ?info("data_link_bert:authorize(): Remote Address is nil."),
- ?info("data_link_bert:authorize(): Will use ~p:~p",
- [PeerIP, PeerPort]),
+ ?info("data_link_bert:authorize(): Remote is behind firewall. Will use ~p:~p",
+ [ PeerIP, PeerPort]),
{ PeerIP, PeerPort };
_ -> { RemoteAddress, RemotePort}
@@ -276,12 +274,12 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
not_found ->
?info("data_link_bert:authorize(): New connection!"),
connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid),
- ?info("data_link_bert:authorize(): Sending authorize."),
+ ?debug("data_link_bert:authorize(): Sending authorize."),
Res = connection:send(FromPid,
{ authorize,
1, LocalAddress, LocalPort, rvi_binary,
{certificate, {}}, { signature, {}}}),
- ?info("data_link_bert:authorize(): Sending authorize: ~p", [ Res]),
+ ?debug("data_link_bert:authorize(): Sending authorize: ~p", [ Res]),
ok;
_ -> ok
end,
@@ -304,16 +302,13 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
end,
[], JSONSvc),
- ?info("data_link_bert:authorize(): Local Services: ~p",
- [ LocalServices ]),
-
%% Grab our local address.
{ LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
%% Send an authorize back to the remote node
- ?info("data_link_bert:authorize(): -------------------"),
- ?info("data_link_bert:authorize(): Sending announce()"),
- ?info("data_link_bert:authorize(): -------------------"),
+ ?info("data_link_bert:authorize(): Announcing local services: ~p to remote ~p:~p",
+ [LocalServices, NRemoteAddress, NRemotePort]),
+
connection:send(FromPid,
{ service_announce, 2, available,
LocalServices, { signature, {}}});
@@ -324,6 +319,9 @@ handle_socket(FromPid, PeerIP, PeerPort, data,
[ Err ]),
ok
end,
+
+ %% Setup ping interval
+ gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }),
ok;
handle_socket(_FromPid, RemoteIP, RemotePort, data,
@@ -332,11 +330,12 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data,
available,
Services,
Signature}, _ExtraArgs) ->
- ?info("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]),
- ?info("data_link_bert:service_announce(available): Remote IP: ~p", [ RemoteIP ]),
- ?info("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]),
- ?info("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]),
- ?info("data_link_bert:service_announce(available): Services: ~p", [ Services ]),
+ ?debug("data_link_bert:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]),
+ ?debug("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]),
+ ?debug("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]),
+ ?debug("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]),
+ ?debug("data_link_bert:service_announce(available): Service: ~p", [ Services ]),
+
%% Register the received services with all relevant components
@@ -355,11 +354,11 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data,
unavailable,
Services,
Signature}, _ExtraArgs) ->
- ?info("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]),
- ?info("data_link_bert:service_announce(unavailable): Remote IP: ~p", [ RemoteIP ]),
- ?info("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]),
- ?info("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]),
- ?info("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]),
+ ?debug("data_link_bert:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]),
+ ?debug("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]),
+ ?debug("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]),
+ ?debug("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]),
+ ?debug("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]),
%% Register the received services with all relevant components
@@ -373,7 +372,7 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data,
handle_socket(_FromPid, SetupIP, SetupPort, data,
{ receive_data, Data}, _ExtraArgs) ->
%% ?info("data_link_bert:receive_data(): ~p", [ Data ]),
- ?info("data_lnik_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?debug("data_link_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
case
rvi_common:send_component_request(protocol, receive_message,
[
@@ -387,6 +386,7 @@ handle_socket(_FromPid, SetupIP, SetupPort, data,
end,
ok;
+
handle_socket(_FromPid, SetupIP, SetupPort, data, Data, _ExtraArgs) ->
?warning("data_link_bert:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
?warning("data_link_bert:unknown_data(): Unknown data: ~p", [ Data]),
@@ -454,7 +454,7 @@ handle_rpc("send_data", Args) ->
send_data(RemoteAddress, list_to_integer(RemotePort), Data);
handle_rpc(Other, _Args) ->
- ?info("data_link_bert:handle_rpc(~p)", [ Other ]),
+ ?info("data_link_bert:handle_rpc(~p): unknown", [ Other ]),
{ ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
@@ -491,6 +491,18 @@ handle_call({rvi_call, send_data, Args}, _From, State) ->
list_to_integer(RemotePort), Data), State };
+handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) ->
+ %% Create a timer to handle periodic pings.
+ {ok, ServerOpts } = rvi_common:get_component_config(data_link, bert_rpc_server, []),
+ Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
+
+ ?info("data_link_bert_rpc_rpc:setup_ping(): ~p:~p will be pinged every ~p msec",
+ [ Address, Port, Timeout] ),
+
+ erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }),
+
+ {reply, ok, St};
+
handle_call(Other, _From, State) ->
?warning("data_link_bert_rpc_rpc:handle_rpc(~p): unknown", [ Other ]),
{ reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, State}.
@@ -498,6 +510,21 @@ handle_call(Other, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+%% Ping time
+handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
+
+ %% Check that connection is up
+ case connection:is_connection_up(Pid) of
+ true ->
+ ?info("data_link_bert_rpc_rpc:ping(): Pinging: ~p:~p", [Address, Port]),
+ connection:send(Pid, ping),
+ erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout });
+
+ false ->
+ ok
+ end,
+ {noreply, St};
+
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index 8c772ef..c8525c8 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -174,7 +174,7 @@ send_component_request(Component, Service, ArgList, ReturnParams) ->
end;
{ Proc, _ } ->
- ?info("Sending ~p:~p to ~p", [Component, Service, Proc]),
+ ?debug("Sending ~p:~p to ~p", [Component, Service, Proc]),
{ Reply, ReplyArg} = gen_server:call(Proc, { rvi_call, Service, ArgList }),
%% Retrieve the status from the reply
[ Status | ReturnValues ] = retrieve_reply_elements([ status | ReturnParams], ReplyArg),
diff --git a/components/schedule/src/schedule.erl b/components/schedule/src/schedule.erl
index b110e8e..b7fcef2 100644
--- a/components/schedule/src/schedule.erl
+++ b/components/schedule/src/schedule.erl
@@ -138,8 +138,8 @@ handle_call( { schedule_message,
Certificate
}, _From, St) ->
- ?info("schedule:sched_msg(): service_name: ~p", [SvcName]),
- ?info("schedule:sched_msg(): timeout: ~p", [Timeout]),
+ ?debug("schedule:sched_msg(): service_name: ~p", [SvcName]),
+ ?debug("schedule:sched_msg(): timeout: ~p", [Timeout]),
?debug("schedule:sched_msg(): callback: ~p", [Callback]),
?debug("schedule:sched_msg(): parameters: ~p", [Parameters]),
?debug("schedule:sched_msg(): signature: ~p", [Signature]),
@@ -180,8 +180,7 @@ handle_call({rvi_call, schedule_message, Args}, From, St) ->
handle_call( {register_remote_services, NetworkAddress, AvailableServices}, _From, St) ->
- ?info("schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]),
- ?info("schedule:register_remote_services(): AvailableService: ~p", [AvailableServices]),
+ ?info("schedule:register_remote_services(): services(~p) -> ~p", [AvailableServices, NetworkAddress]),
{ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St),
{reply, ok, NSt};
@@ -196,7 +195,7 @@ handle_call({rvi_call, register_remote_services, Args}, From, St) ->
{ reply, { ok, [{ status, rvi_common:json_rpc_status(ok) }] } , NSt};
handle_call( {unregister_remote_services, ServiceNames}, _From, St) ->
- ?info("schedule:unregister_remote_services(): Services: ~p", [ServiceNames]),
+ ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]),
{ok, NSt} = multiple_services_unavailable(ServiceNames, St),
{reply, ok, NSt };
@@ -240,18 +239,17 @@ handle_cast(_Msg, St) ->
%% Handle timeouts
handle_info({ rvi_message_timeout, SvcName, TransID}, #st { services_tid = SvcTid } = St) ->
- ?info("schedule:timeout(): service: ~p", [ SvcName]),
- ?info("schedule:timeout(): trans_id: ~p", [ TransID]),
case ets:lookup(SvcTid, SvcName) of
[ Svc ] ->
%% Delete from ets.
case ets:lookup(Svc#service.messages_tid, TransID) of
[ Msg ] ->
+ ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]),
do_timeout_callback(Svc, Msg),
ets:delete(Svc#service.messages_tid, TransID);
_ ->
- ?info("schedule:timeout(): Message yanked while processing timeout: ~p", [ TransID]),
+ ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]),
ok
end;
_-> ok
@@ -304,12 +302,11 @@ queue_message(SvcName,
Parameters,
Signature,
Certificate, St) ->
- ?info("schedule:sched_msg(): service: ~p", [SvcName]),
- ?info("schedule:sched_msg(): timeout: ~p", [Timeout]),
- ?info("schedule:sched_msg(): callback: ~p", [Callback]),
+%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]),
+%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]),
%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]),
- ?info("schedule:sched_msg(): signature: ~p", [Signature]),
- ?info("schedule:sched_msg(): certificate: ~p", [Certificate]),
+%% ?info("schedule:sched_msg(): signature: ~p", [Signature]),
+%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]),
%% Create a new transaction ID.
{NewTransID, NSt1} = create_transaction_id(St),
@@ -354,7 +351,7 @@ queue_message(SvcName,
%% Attempt to send the message
{_, NSt3 } = try_sending_messages(Service, NSt2),
-
+
%% Return
{ ok, NewTransID, NSt3}.
@@ -405,12 +402,14 @@ try_sending_messages(#service {
try_sending_messages(Service, St);
Err ->
+ ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]),
%% Failed to send message, leave in queue and err out.
{ {send_failed, Err}, St}
end;
_ ->
?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]),
- ok
+ { ok, St}
+
end
end.
@@ -426,8 +425,7 @@ try_sending_messages(#service {
%% to send any messages queued inside it.
%%
service_available(SvcName, NetworkAddress, St) ->
- ?info("schedule:service_available(): service: ~p", [ SvcName ]),
- ?info("schedule:service_available(): network_address: ~p", [ NetworkAddress ]),
+ ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]),
%% Find or create the service.
{ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St),
@@ -435,7 +433,7 @@ service_available(SvcName, NetworkAddress, St) ->
try_sending_messages(Svc, NSt1).
service_unavailable(SvcName, #st { services_tid = SvcTid } = St) ->
- ?info("schedule:service_unavailable(): Service: ~p", [ SvcName ]),
+ ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]),
ets:delete(SvcTid, SvcName),
{ ok, St }.
@@ -458,13 +456,13 @@ bring_up_data_link(SvcName) ->
{ok, already_connected } ->
already_connected;
Que ->
- ?info("schedule:bring_up_data_link() Que:~p.", [Que]),
+ ?info("schedule:bring_up_data_link() Failed:~p.", [Que]),
ok
end;
{ok, not_found, _ } ->
?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p."
- "Service not found.",
+ " Service not found.",
[ SvcName ]),
not_found;
@@ -477,12 +475,12 @@ bring_up_data_link(SvcName) ->
send_message(NetworkAddress, SvcName, Timeout,
Parameters, Signature, Certificate) ->
- ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]),
- ?info("schedule:send_message(): service: ~p", [SvcName]),
- ?info("schedule:send_message(): timeout: ~p", [Timeout]),
+ ?info("schedule:send_message(): service(~p) -> addr(~p) ", [SvcName, NetworkAddress]),
+%% ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]),
+%% ?info("schedule:send_message(): timeout: ~p", [Timeout]),
%% ?info("schedule:send_message(): parameters: ~p", [Parameters]),
- ?info("schedule:send_message(): signature: ~p", [Signature]),
- ?info("schedule:send_message(): certificate: ~p", [Certificate]),
+%% ?info("schedule:send_message(): signature: ~p", [Signature]),
+%% ?info("schedule:send_message(): certificate: ~p", [Certificate]),
case rvi_common:send_component_request(
protocol, send_message,
@@ -593,16 +591,14 @@ create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St)
ets:insert(SvcsTid, Svc),
%% Return new service and existing state.
- ?info("schedule:create_service(): SvcName: ~p", [ ServiceName]),
- ?info("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]),
- Svcs = ets:foldr(fun(SvcElem, Acc) -> [ SvcElem | Acc ] end, [], SvcsTid),
- ?debug("schedule:create_service(): Services: ~p", [ Svcs]),
+ ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]),
+ ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]),
{ ok, Svc, true, St}. %% True indicates that the service is newly created.
%% Create a new and unique transaction id
create_transaction_id(St) ->
- ?info("schedule:create_transaction_id(): St: ~p", [ St ]),
+ ?debug("schedule:create_transaction_id(): St: ~p", [ St ]),
ID = St#st.next_transaction_id,
%% FIXME: Maybe interate pid into transaction to handle multiple
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index 3bcddae..19bebb0 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -65,8 +65,7 @@ dump_table(Table) ->
dump_table(Table, ets:first(Table)).
register_remote_service(NetworkAddress) ->
- ?info("service_discovery_rpc:register_remote_service(): service: empty"),
- ?info("service_discovery_rpc:register_remote_service(): network_address: ~p", [NetworkAddress]),
+ ?info("service_discovery_rpc:register_remote_service(): service(n/a) -> ~p", [NetworkAddress]),
ets:insert(?REMOTE_ADDRESS_TABLE,
#service_entry {
@@ -79,8 +78,7 @@ register_remote_service(NetworkAddress) ->
register_remote_service(Service, NetworkAddress) ->
- ?info("service_discovery_rpc:register_remote_service(): service: ~p", [Service]),
- ?info("service_discovery_rpc:register_remote_service(): network_address: ~p", [NetworkAddress]),
+ ?info("service_discovery_rpc:register_remote_service(): service(~p) -> ~p", [Service, NetworkAddress]),
FullSvcName = rvi_common:remote_service_to_string(Service),
@@ -117,21 +115,63 @@ register_remote_service(Service, NetworkAddress) ->
unregister_remote_services_by_address(NetworkAddress) ->
- ?info("service_discovery_rpc:unregister_remote_services_by_address(): network_address: ~p",
- [NetworkAddress]),
%% Delete all services registered under the given address.
Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, NetworkAddress),
+ %% We now have a bunch of service records, convert them to a list of service
+ %% names and send them of to schedule for deregistration
+ AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) ->
+ [SvcName | Acc]
+ end, [], Svcs),
+
?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p",
- [NetworkAddress, Svcs]),
+ [NetworkAddress, AllSvcNames]),
+
+ %% We need to filter AllSvcNames to remove all service entries that have
+ %% been registered under another name.
+ %% We do this by creating a list of all matching entries associated
+ %% with a network address not matching the disconnected NetworkAddress
+ %%
+ %% See issue https://github.com/PDXostc/rvi/issues/14 for details
+ FilterSvc =
+ lists:foldr(
+ fun(Service, Acc) ->
+
+ %% Lookup the service in the service table.
+ case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of
+
+ %% Not found. Do not filter out.
+ [] ->
+ Acc;
+
+ %% We found or own entry, tiet to the disconnected address.
+ %% Do not add to addresses to be removed.
+ [ #service_entry { network_address = NetworkAddress } ] ->
+ Acc;
+
+ %% We found an entry that does not the disconnected
+ %% network address. This one should be filtered out
+ [ _ ] ->
+ [ Service | Acc ]
+
+ end
+ end, [], AllSvcNames),
+ SvcNames = AllSvcNames -- FilterSvc,
- %% We now have a bunch of service records, convert them to a list of service
- %% names and send them of to schedule for deregistration
- SvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) ->
- [SvcName | Acc]
- end, [], Svcs),
+
+ case FilterSvc of
+ [] -> ok;
+
+ _ ->
+ ?info("service_discovery_rpc:unregister_remote_services_by_address(): Resurrected services: ~p",
+ [FilterSvc]),
+
+ ?info("service_discovery_rpc:unregister_remote_services_by_address(): Filtered services to be deleted: ~p",
+ [SvcNames])
+ end,
+
%% Delete any addresses stored with an empty service name,
%% installed with register_remote_service/1, since we now have at
@@ -143,10 +183,11 @@ unregister_remote_services_by_address(NetworkAddress) ->
}),
ets:delete(?REMOTE_ADDRESS_TABLE, NetworkAddress),
- case Svcs of
+ case SvcNames of
[] ->
true;
_ ->
+
rvi_common:send_component_request(schedule, unregister_remote_services,
[
{ services, SvcNames }
@@ -161,15 +202,15 @@ unregister_remote_services_by_address(NetworkAddress) ->
LocalSvcAddresses =
ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
[ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE)
+ [], ?LOCAL_SERVICE_TABLE),
%% Call service edge with local addresses (sorted and de-duped) and
%% the services to register.
- %% rvi_common:send_component_request(service_edge, unregister_remote_services,
- %% [
- %% { local_service_addresses, lists:usort(LocalSvcAddresses)},
- %% { services, SvcNames}
- %% ])
+ rvi_common:send_component_request(service_edge, unregister_remote_services,
+ [
+ { local_service_addresses, lists:usort(LocalSvcAddresses)},
+ { services, SvcNames}
+ ])
end,
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
@@ -187,11 +228,6 @@ unregister_single_remote_service_by_name_(Service) ->
network_address = '_'
}),
- Prior = ets:foldl(fun(#service_entry { service = Svc }, Acc) ->
- [ Svc | Acc ] end,
- [], ?REMOTE_SERVICE_TABLE),
-
- ?debug("Before removing ~p: ~p", [ Service, Prior ]),
ets:delete(?REMOTE_SERVICE_TABLE, Service),
After = ets:foldl(fun(#service_entry { service = Svc }, Acc) ->
[ Svc | Acc ] end,
@@ -210,11 +246,11 @@ unregister_single_remote_service_by_name_(Service) ->
%% Call service edge with local addresses (sorted and de-duped) and
%% the services to register.
- %% rvi_common:send_component_request(service_edge, unregister_remote_services,
- %% [
- %% { local_service_addresses, lists:usort(LocalSvcAddresses)},
- %% { services, [Service]}
- %% ]),
+ rvi_common:send_component_request(service_edge, unregister_remote_services,
+ [
+ { local_service_addresses, lists:usort(LocalSvcAddresses)},
+ { services, [Service]}
+ ]),
ok.
@@ -224,7 +260,7 @@ unregister_remote_services_by_name(Services) ->
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
unregister_local_service(Service) ->
- ?info("service_discovery_rpc:unregister_local_service(): Service~p",
+ ?info("service_discovery_rpc:unregister_local_service(): ~p",
[Service]),
ets:delete(?LOCAL_SERVICE_TABLE, Service),
@@ -232,8 +268,7 @@ unregister_local_service(Service) ->
register_local_service(NetworkAddress, Service) ->
- ?info("service_discovery_rpc:register_local_service(): service: ~p", [Service]),
- ?info("service_discovery_rpc:register_local_service(): network_address: ~p", [NetworkAddress]),
+ ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", [Service, NetworkAddress]),
FullSvcName = rvi_common:local_service_to_string(Service),
@@ -248,8 +283,8 @@ register_local_service(NetworkAddress, Service) ->
resolve_local_service(RawService) ->
Service = rvi_common:sanitize_service_string(RawService),
- ?info("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]),
- ?info("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]),
+ ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]),
+ ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]),
case resolve_service(?LOCAL_SERVICE_TABLE, Service) of
not_found ->
{ ok, [ { status, rvi_common:json_rpc_status(not_found) }]};
@@ -261,15 +296,15 @@ resolve_local_service(RawService) ->
resolve_remote_service(RawService) ->
Service = rvi_common:sanitize_service_string(RawService),
- ?info("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]),
- ?info("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]),
+ ?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]),
+ ?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]),
case resolve_service(?REMOTE_SERVICE_TABLE, Service) of
{ok, NetworkAddress } ->
{ok, [ { status, rvi_common:json_rpc_status(ok) },
{ network_address, NetworkAddress }]};
not_found ->
- ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. "
+ ?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. "
"Trying static nodes",
[Service]),
@@ -277,13 +312,13 @@ resolve_remote_service(RawService) ->
%% Check if this is a service residing on the backend server
case rvi_common:get_static_node(Service) of
not_found -> %% Not found
- ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found in static nodes.",
+ ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.",
[Service]),
{ ok, [ { status, rvi_common:json_rpc_status(not_found) }]};
NetworkAddress -> %% Found
- ?info("service_discovery_rpc:resolve_service(~p): Service is on static node ~p",
+ ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p",
[Service, NetworkAddress]),
{ok, [ { status, rvi_common:json_rpc_status(ok) },
@@ -315,40 +350,32 @@ register_remote_services(Address, Services) ->
LocalSvcAddresses =
ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
[ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE)
+ [], ?LOCAL_SERVICE_TABLE),
%% Call service edge with local addresses (sorted and de-duped) and
%% the services to register.
- %% rvi_common:send_component_request(service_edge, register_remote_services,
- %% [
- %% { local_service_addresses, lists:usort(LocalSvcAddresses)},
- %% { services, Services}
- %% ])
+ rvi_common:send_component_request(service_edge, register_remote_services,
+ [
+ { local_service_addresses, lists:usort(LocalSvcAddresses)},
+ { services, Services}
+ ])
end,
{ok, [ { status, rvi_common:json_rpc_status(ok) } ]}.
resolve_service(Table, Service) ->
-
- ?info("service_discovery_rpc:resolve_service(): CleanedService: ~p", [Service]),
-
- %% For info purposes only
- Svcs = ets:foldl(fun({service_entry, ServiceName, ServiceAddr}, Acc) ->
- [ {ServiceName, ServiceAddr} | Acc ] end,
- [], Table),
- ?info("service_discovery_rpc:resolve_service(): Services: ~p", [Svcs]),
-
-
case ets:lookup(Table, Service) of
%% We found a service entry, report it back
[#service_entry { network_address = NetworkAddress }] ->
- ?info("service_discovery_rpc:resolve_service(): service: ~p -> ~p",
- [ Service, NetworkAddress ]),
+ ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> ~p",
+ [ Table, Service, NetworkAddress ]),
{ok, NetworkAddress };
%% We did not find a service entry, check statically configured nodes.
[] ->
+ ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> Not Found",
+ [ Table, Service ]),
not_found
end.
@@ -360,7 +387,7 @@ get_services(Table) ->
[ {ServiceName, ServiceAddr } | Acc ] end,
[], Table),
- ?info("service_discovery_rpc:get_services(): ~p", [ Services]),
+ ?debug("service_discovery_rpc:get_services(): ~p", [ Services ]),
Services.
get_all_services() ->
@@ -373,7 +400,7 @@ get_all_services() ->
[], ?LOCAL_SERVICE_TABLE),
Services = RemoteSvc++LocalSvc,
- ?info("service_discovery_rpc:get_all_services(): ~p", [ Services]),
+ ?debug("service_discovery_rpc:get_all_services(): ~p", [ Services]),
Services.
@@ -387,7 +414,7 @@ get_json_services(Table) ->
]
} | Acc ] end,
[], Table),
- ?info("service_discovery_rpc:get_services(): ~p", [ Services]),
+ ?debug("service_discovery_rpc:get_services(): ~p", [ Services]),
{ok, [ { status, rvi_common:json_rpc_status(ok) },
{ services, {array, Services }}]}.
@@ -409,7 +436,7 @@ get_network_addresses_(Table) ->
%% Return a dup-scrubbed list.
Addresses = sets:to_list(sets:from_list(AddrList)),
- ?info("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]),
+ ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]),
Addresses.
@@ -522,7 +549,7 @@ handle_rpc("get_local_network_addresses", _Args) ->
%% Handle the rest.
%%
handle_rpc( Other, _Args) ->
- ?info("service_discovery_rpc:handle_rpc(~p)", [ Other ]),
+ ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]),
{ ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 5ec2396..e8d98b5 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -307,25 +307,25 @@ flatten_ws_args(Args) ->
-dispatch_to_local_service([ $w, $s, $: | _WSPidStr], services_available,
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
[{ services, Services}] ) ->
- ?info("service_edge:dispatch_to_local_service(service_available): Websocket!: ~p", [ Services]),
- %% wse:call(list_to_pid(WSPidStr), wse:window(),
- %% "services_available",
- %% [ "services", Services ]),
+ ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_available",
+ [ "services", Services ]),
ok;
-dispatch_to_local_service([ $w, $s, $: | _WSPidStr], services_unavailable,
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
[{ services, Services}] ) ->
- ?info("service_edge:dispatch_to_local_service(service_unavailable): Websocket!: ~p", [ Services]),
- %% wse:call(list_to_pid(WSPidStr), wse:window(),
- %% "services_unavailable",
- %% [ "services", Services ]),
+ ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]),
+ wse:call(list_to_pid(WSPidStr), wse:window(),
+ "services_unavailable",
+ [ "services", Services ]),
ok;
dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
[{ service_name, SvcName}, { parameters, Args}] ) ->
- ?info("service_edge:dispatch_to_local_service(message): Websocket!:~p", [Args]),
+ ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]),
wse:call(list_to_pid(WSPidStr), wse:window(),
"message",
[ "service_name", SvcName ] ++ flatten_ws_args(Args)),