diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-13 19:41:47 -0700 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-13 19:41:47 -0700 |
commit | 99c5161f03df5cae9d8567c155083d0fe77d2c7c (patch) | |
tree | 6cfe4079545110ae0d42915462eda84d5c80e57c | |
parent | 635cec8520c0372dc6071644ff9523351744c3aa (diff) | |
parent | 00b9ec4b0b6c3cd05dcb8f2eb814360a3a4980c1 (diff) | |
download | rvi_core-99c5161f03df5cae9d8567c155083d0fe77d2c7c.tar.gz |
Merged in 0.3.1 to reverted master as a basis for develop and 0.3.2
-rw-r--r-- | components/authorize/src/authorize_rpc.erl | 4 | ||||
-rw-r--r-- | components/data_link_bert_rpc/src/connection.erl | 16 | ||||
-rw-r--r-- | components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl | 133 | ||||
-rw-r--r-- | components/rvi_common/src/rvi_common.erl | 2 | ||||
-rw-r--r-- | components/schedule/src/schedule.erl | 56 | ||||
-rw-r--r-- | components/service_discovery/src/service_discovery_rpc.erl | 151 | ||||
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 22 |
7 files changed, 225 insertions, 159 deletions
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index 50cd473..287c519 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -148,8 +148,8 @@ handle_rpc(Other, _Args) -> %% handle_call({rvi_call, authorize_local_message, Args}, _From, State) -> {_, ServiceName} = lists:keyfind(service_name, 1, Args), - ?info("authorize_rpc:authorize_local_message(gen_server): args: ~p", [ Args]), - ?info("authorize_rpc:authorize_local_message(gen_server): service name: ~p", [ ServiceName]), + ?debug("authorize_rpc:authorize_local_message(gen_server): args: ~p", [ Args]), + ?debug("authorize_rpc:authorize_local_message(gen_server): service name: ~p", [ ServiceName]), {reply, authorize_local_message(ServiceName), State}; handle_call({rvi_call, authorize_remote_message, Args}, _From, State) -> diff --git a/components/data_link_bert_rpc/src/connection.erl b/components/data_link_bert_rpc/src/connection.erl index aec31a8..7fd63cb 100644 --- a/components/data_link_bert_rpc/src/connection.erl +++ b/components/data_link_bert_rpc/src/connection.erl @@ -28,6 +28,8 @@ -export([setup/6]). -export([send/2]). -export([send/3]). +-export([is_connection_up/1]). +-export([is_connection_up/2]). -export([terminate_connection/1]). -export([terminate_connection/2]). @@ -84,6 +86,19 @@ terminate_connection(IP, Port) -> _Err -> {error, connection_not_found} end. + +is_connection_up(Pid) when is_pid(Pid) -> + is_process_alive(Pid). + +is_connection_up(IP, Port) -> + case connection_manager:find_connection_by_address(IP, Port) of + {ok, Pid} -> + is_connection_up(Pid); + + _Err -> + false + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -169,6 +184,7 @@ handle_cast({send, Data}, St) -> [ ?MODULE, Data]), gen_tcp:send(St#st.sock, term_to_binary(Data)), + {noreply, St}; handle_cast(_Msg, State) -> diff --git a/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl b/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl index 7d8b2c8..6b59352 100644 --- a/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl +++ b/components/data_link_bert_rpc/src/data_link_bert_rpc_rpc.erl @@ -20,16 +20,16 @@ terminate/2, code_change/3]). -include_lib("lager/include/log.hrl"). +-behavior(gen_server). -define(DEFAULT_BERT_RPC_PORT, 9999). -define(DEFAULT_RECONNECT_INTERVAL, 5000). -define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0"). - +-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). -record(st, { }). - start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -135,11 +135,9 @@ connect_remote(IP, Port) -> setup_data_link(RemoteAddress, RemotePort, Service) -> { LocalAddress, LocalPort} = rvi_common:node_address_tuple(), - ?info("data_link_bert:setup_data_link(): Remote Address: ~p", [ RemoteAddress]), - ?info("data_link_bert:setup_data_link(): Remote Port: ~p", [ RemotePort]), - ?info("data_link_bert:setup_data_link(): Local Address: ~p", [ LocalAddress]), - ?info("data_link_bert:setup_data_link(): Local Port: ~p", [ LocalPort]), - ?info("data_link_bert:setup_data_link(): service: ~p", [ Service]), + ?info("data_link_bert:setup_data_link(): Link: ~p:~p -> ~p:~p", + [ LocalAddress, LocalPort, RemoteAddress, RemotePort]), + ?info("data_link_bert:setup_data_link(): Service: ~p", [ Service]), case connect_remote(RemoteAddress, RemotePort) of @@ -150,39 +148,42 @@ setup_data_link(RemoteAddress, RemotePort, Service) -> ?info("data_link_bert:setup_data_link(): New connection!"), %% Follow up with an authorize. - ?info("data_link_bert:setup_data_link(): ---------------"), - ?info("data_link_bert:setup_data_link(): Sending authorize()"), - ?info("data_link_bert:setup_data_link(): ---------------"), + ?debug("data_link_bert:setup_data_link(): Sending authorize()"), connection:send(Pid, { authorize, 1, LocalAddress, LocalPort, rvi_binary, {certificate, {}}, { signature, {}} }), {ok, [ { status, rvi_common:json_rpc_status(ok)}]}; + { error, _ } -> {error, [ { status, rvi_common:json_rpc_status(not_available)}]} end. disconnect_data_link(RemoteAddress, RemotePort) -> - ?info("data_link_bert:disconnect_data_link(): Remote Address: ~p", [ RemoteAddress]), - ?info("data_link_bert:disconnect_data_link(): Remote Port: ~p", [ RemotePort]), + ?info("data_link_bert:disconnect_data_link(): Remote: ~p:~p", [ RemoteAddress, RemotePort]), {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. send_data(RemoteAddress, RemotePort, Data) -> - ?info("data_link_bert:send_data(): Remote Address: ~p", [ RemoteAddress]), - ?info("data_link_bert:send_data(): Remote Port: ~p", [ RemotePort]), + ?info("data_link_bert:send_data(): Remote: ~p:~p", [ RemoteAddress, RemotePort]), %% ?info("data_link_bert:send_data(): Data: ~p", [ Data]), Res = connection:send(RemoteAddress, RemotePort, {receive_data, Data}), - ?info ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ]), + case Res of + ok -> + ?debug ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ]); + _ -> + ?info ("data_link_bert:send_data(): bert-rpc result: ~p", [ Res ]) + end, + {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. announce_local_service(Service, Availability) -> - ?info("data_link_bert:announce_local_service(~p): Service ~p", [Availability, Service]), + ?debug("data_link_bert:announce_local_service(~p): Service: ~p", [Availability, Service]), %% Grab our local address. { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), @@ -192,8 +193,6 @@ announce_local_service(Service, Availability) -> case rvi_common:send_component_request(service_discovery, get_remote_network_addresses, [], [ addresses ]) of { ok, _, [ Addresses ] } -> - ?info("data_link_bert:announce_local_service(~p): Addresses ~p", - [ Availability, Addresses]), %% Grab our local address. { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), @@ -201,8 +200,8 @@ announce_local_service(Service, Availability) -> %% Loop over all returned addresses lists:map( fun(Address) -> - ?debug("data_link_bert:announce_local_service(~p): Sending to ~p", - [ Availability, Address]), + ?info("data_link_bert:announce_local_service(~p): Announcing ~p to ~p", + [ Availability, Service, Address]), %% Split the address into host and port [ RemoteAddress, RemotePort] = string:tokens(Address, ":"), @@ -213,9 +212,7 @@ announce_local_service(Service, Availability) -> {service_announce, 3, Availability, [Service], { signature, {}}}), ?debug("data_link_bert:announce_local_service(~p): Res ~p", - [ Availability, Res]), - ok - + [ Availability, Res]) end, Addresses), @@ -229,7 +226,9 @@ announce_local_service(Service, Availability) -> end. - +handle_socket(_FromPid, PeerIP, PeerPort, data, ping, _ExtraArgs) -> + ?info("data_link_bert:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]), + ok; handle_socket(FromPid, PeerIP, PeerPort, data, { authorize, @@ -240,13 +239,13 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Certificate, Signature}, _ExtraArgs) -> - ?info("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]), - ?info("data_link_bert:authorize(): Peer Address: {~p, ~p}", [PeerIP, PeerPort ]), - ?info("data_link_bert:authorize(): Remote Address: ~p", [ RemoteAddress ]), - ?info("data_link_bert:authorize(): Remote Port: ~p", [ RemotePort ]), + ?info("data_link_bert:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), + ?info("data_link_bert:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("data_link_bert:authorize(): Protocol: ~p", [ Protocol ]), - ?info("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]), - ?info("data_link_bert:authorize(): Signature: ~p", [ Signature ]), + ?debug("data_link_bert:authorize(): TransactionID: ~p", [ TransactionID ]), + ?debug("data_link_bert:authorize(): Certificate: ~p", [ Certificate ]), + ?debug("data_link_bert:authorize(): Signature: ~p", [ Signature ]), + { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), @@ -258,9 +257,8 @@ handle_socket(FromPid, PeerIP, PeerPort, data, case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> - ?info("data_link_bert:authorize(): Remote Address is nil."), - ?info("data_link_bert:authorize(): Will use ~p:~p", - [PeerIP, PeerPort]), + ?info("data_link_bert:authorize(): Remote is behind firewall. Will use ~p:~p", + [ PeerIP, PeerPort]), { PeerIP, PeerPort }; _ -> { RemoteAddress, RemotePort} @@ -276,12 +274,12 @@ handle_socket(FromPid, PeerIP, PeerPort, data, not_found -> ?info("data_link_bert:authorize(): New connection!"), connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid), - ?info("data_link_bert:authorize(): Sending authorize."), + ?debug("data_link_bert:authorize(): Sending authorize."), Res = connection:send(FromPid, { authorize, 1, LocalAddress, LocalPort, rvi_binary, {certificate, {}}, { signature, {}}}), - ?info("data_link_bert:authorize(): Sending authorize: ~p", [ Res]), + ?debug("data_link_bert:authorize(): Sending authorize: ~p", [ Res]), ok; _ -> ok end, @@ -304,16 +302,13 @@ handle_socket(FromPid, PeerIP, PeerPort, data, end, [], JSONSvc), - ?info("data_link_bert:authorize(): Local Services: ~p", - [ LocalServices ]), - %% Grab our local address. { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), %% Send an authorize back to the remote node - ?info("data_link_bert:authorize(): -------------------"), - ?info("data_link_bert:authorize(): Sending announce()"), - ?info("data_link_bert:authorize(): -------------------"), + ?info("data_link_bert:authorize(): Announcing local services: ~p to remote ~p:~p", + [LocalServices, NRemoteAddress, NRemotePort]), + connection:send(FromPid, { service_announce, 2, available, LocalServices, { signature, {}}}); @@ -324,6 +319,9 @@ handle_socket(FromPid, PeerIP, PeerPort, data, [ Err ]), ok end, + + %% Setup ping interval + gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }), ok; handle_socket(_FromPid, RemoteIP, RemotePort, data, @@ -332,11 +330,12 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data, available, Services, Signature}, _ExtraArgs) -> - ?info("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]), - ?info("data_link_bert:service_announce(available): Remote IP: ~p", [ RemoteIP ]), - ?info("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]), - ?info("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]), - ?info("data_link_bert:service_announce(available): Services: ~p", [ Services ]), + ?debug("data_link_bert:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]), + ?debug("data_link_bert:service_announce(available): Remote Port: ~p", [ RemotePort ]), + ?debug("data_link_bert:service_announce(available): TransactionID: ~p", [ TransactionID ]), + ?debug("data_link_bert:service_announce(available): Signature: ~p", [ Signature ]), + ?debug("data_link_bert:service_announce(available): Service: ~p", [ Services ]), + %% Register the received services with all relevant components @@ -355,11 +354,11 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data, unavailable, Services, Signature}, _ExtraArgs) -> - ?info("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]), - ?info("data_link_bert:service_announce(unavailable): Remote IP: ~p", [ RemoteIP ]), - ?info("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]), - ?info("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]), - ?info("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]), + ?debug("data_link_bert:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]), + ?debug("data_link_bert:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]), + ?debug("data_link_bert:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]), + ?debug("data_link_bert:service_announce(unavailable): Signature: ~p", [ Signature ]), + ?debug("data_link_bert:service_announce(unavailable): Service: ~p", [ Services ]), %% Register the received services with all relevant components @@ -373,7 +372,7 @@ handle_socket(_FromPid, RemoteIP, RemotePort, data, handle_socket(_FromPid, SetupIP, SetupPort, data, { receive_data, Data}, _ExtraArgs) -> %% ?info("data_link_bert:receive_data(): ~p", [ Data ]), - ?info("data_lnik_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?debug("data_link_bert:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), case rvi_common:send_component_request(protocol, receive_message, [ @@ -387,6 +386,7 @@ handle_socket(_FromPid, SetupIP, SetupPort, data, end, ok; + handle_socket(_FromPid, SetupIP, SetupPort, data, Data, _ExtraArgs) -> ?warning("data_link_bert:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), ?warning("data_link_bert:unknown_data(): Unknown data: ~p", [ Data]), @@ -454,7 +454,7 @@ handle_rpc("send_data", Args) -> send_data(RemoteAddress, list_to_integer(RemotePort), Data); handle_rpc(Other, _Args) -> - ?info("data_link_bert:handle_rpc(~p)", [ Other ]), + ?info("data_link_bert:handle_rpc(~p): unknown", [ Other ]), { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. @@ -491,6 +491,18 @@ handle_call({rvi_call, send_data, Args}, _From, State) -> list_to_integer(RemotePort), Data), State }; +handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) -> + %% Create a timer to handle periodic pings. + {ok, ServerOpts } = rvi_common:get_component_config(data_link, bert_rpc_server, []), + Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), + + ?info("data_link_bert_rpc_rpc:setup_ping(): ~p:~p will be pinged every ~p msec", + [ Address, Port, Timeout] ), + + erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }), + + {reply, ok, St}; + handle_call(Other, _From, State) -> ?warning("data_link_bert_rpc_rpc:handle_rpc(~p): unknown", [ Other ]), { reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, State}. @@ -498,6 +510,21 @@ handle_call(Other, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +%% Ping time +handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> + + %% Check that connection is up + case connection:is_connection_up(Pid) of + true -> + ?info("data_link_bert_rpc_rpc:ping(): Pinging: ~p:~p", [Address, Port]), + connection:send(Pid, ping), + erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); + + false -> + ok + end, + {noreply, St}; + handle_info(_Info, State) -> {noreply, State}. diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index 8c772ef..c8525c8 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -174,7 +174,7 @@ send_component_request(Component, Service, ArgList, ReturnParams) -> end; { Proc, _ } -> - ?info("Sending ~p:~p to ~p", [Component, Service, Proc]), + ?debug("Sending ~p:~p to ~p", [Component, Service, Proc]), { Reply, ReplyArg} = gen_server:call(Proc, { rvi_call, Service, ArgList }), %% Retrieve the status from the reply [ Status | ReturnValues ] = retrieve_reply_elements([ status | ReturnParams], ReplyArg), diff --git a/components/schedule/src/schedule.erl b/components/schedule/src/schedule.erl index b110e8e..b7fcef2 100644 --- a/components/schedule/src/schedule.erl +++ b/components/schedule/src/schedule.erl @@ -138,8 +138,8 @@ handle_call( { schedule_message, Certificate }, _From, St) -> - ?info("schedule:sched_msg(): service_name: ~p", [SvcName]), - ?info("schedule:sched_msg(): timeout: ~p", [Timeout]), + ?debug("schedule:sched_msg(): service_name: ~p", [SvcName]), + ?debug("schedule:sched_msg(): timeout: ~p", [Timeout]), ?debug("schedule:sched_msg(): callback: ~p", [Callback]), ?debug("schedule:sched_msg(): parameters: ~p", [Parameters]), ?debug("schedule:sched_msg(): signature: ~p", [Signature]), @@ -180,8 +180,7 @@ handle_call({rvi_call, schedule_message, Args}, From, St) -> handle_call( {register_remote_services, NetworkAddress, AvailableServices}, _From, St) -> - ?info("schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]), - ?info("schedule:register_remote_services(): AvailableService: ~p", [AvailableServices]), + ?info("schedule:register_remote_services(): services(~p) -> ~p", [AvailableServices, NetworkAddress]), {ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St), {reply, ok, NSt}; @@ -196,7 +195,7 @@ handle_call({rvi_call, register_remote_services, Args}, From, St) -> { reply, { ok, [{ status, rvi_common:json_rpc_status(ok) }] } , NSt}; handle_call( {unregister_remote_services, ServiceNames}, _From, St) -> - ?info("schedule:unregister_remote_services(): Services: ~p", [ServiceNames]), + ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]), {ok, NSt} = multiple_services_unavailable(ServiceNames, St), {reply, ok, NSt }; @@ -240,18 +239,17 @@ handle_cast(_Msg, St) -> %% Handle timeouts handle_info({ rvi_message_timeout, SvcName, TransID}, #st { services_tid = SvcTid } = St) -> - ?info("schedule:timeout(): service: ~p", [ SvcName]), - ?info("schedule:timeout(): trans_id: ~p", [ TransID]), case ets:lookup(SvcTid, SvcName) of [ Svc ] -> %% Delete from ets. case ets:lookup(Svc#service.messages_tid, TransID) of [ Msg ] -> + ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), do_timeout_callback(Svc, Msg), ets:delete(Svc#service.messages_tid, TransID); _ -> - ?info("schedule:timeout(): Message yanked while processing timeout: ~p", [ TransID]), + ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), ok end; _-> ok @@ -304,12 +302,11 @@ queue_message(SvcName, Parameters, Signature, Certificate, St) -> - ?info("schedule:sched_msg(): service: ~p", [SvcName]), - ?info("schedule:sched_msg(): timeout: ~p", [Timeout]), - ?info("schedule:sched_msg(): callback: ~p", [Callback]), +%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), +%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), %% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), - ?info("schedule:sched_msg(): signature: ~p", [Signature]), - ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), +%% ?info("schedule:sched_msg(): signature: ~p", [Signature]), +%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), %% Create a new transaction ID. {NewTransID, NSt1} = create_transaction_id(St), @@ -354,7 +351,7 @@ queue_message(SvcName, %% Attempt to send the message {_, NSt3 } = try_sending_messages(Service, NSt2), - + %% Return { ok, NewTransID, NSt3}. @@ -405,12 +402,14 @@ try_sending_messages(#service { try_sending_messages(Service, St); Err -> + ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]), %% Failed to send message, leave in queue and err out. { {send_failed, Err}, St} end; _ -> ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), - ok + { ok, St} + end end. @@ -426,8 +425,7 @@ try_sending_messages(#service { %% to send any messages queued inside it. %% service_available(SvcName, NetworkAddress, St) -> - ?info("schedule:service_available(): service: ~p", [ SvcName ]), - ?info("schedule:service_available(): network_address: ~p", [ NetworkAddress ]), + ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]), %% Find or create the service. {ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St), @@ -435,7 +433,7 @@ service_available(SvcName, NetworkAddress, St) -> try_sending_messages(Svc, NSt1). service_unavailable(SvcName, #st { services_tid = SvcTid } = St) -> - ?info("schedule:service_unavailable(): Service: ~p", [ SvcName ]), + ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]), ets:delete(SvcTid, SvcName), { ok, St }. @@ -458,13 +456,13 @@ bring_up_data_link(SvcName) -> {ok, already_connected } -> already_connected; Que -> - ?info("schedule:bring_up_data_link() Que:~p.", [Que]), + ?info("schedule:bring_up_data_link() Failed:~p.", [Que]), ok end; {ok, not_found, _ } -> ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p." - "Service not found.", + " Service not found.", [ SvcName ]), not_found; @@ -477,12 +475,12 @@ bring_up_data_link(SvcName) -> send_message(NetworkAddress, SvcName, Timeout, Parameters, Signature, Certificate) -> - ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]), - ?info("schedule:send_message(): service: ~p", [SvcName]), - ?info("schedule:send_message(): timeout: ~p", [Timeout]), + ?info("schedule:send_message(): service(~p) -> addr(~p) ", [SvcName, NetworkAddress]), +%% ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]), +%% ?info("schedule:send_message(): timeout: ~p", [Timeout]), %% ?info("schedule:send_message(): parameters: ~p", [Parameters]), - ?info("schedule:send_message(): signature: ~p", [Signature]), - ?info("schedule:send_message(): certificate: ~p", [Certificate]), +%% ?info("schedule:send_message(): signature: ~p", [Signature]), +%% ?info("schedule:send_message(): certificate: ~p", [Certificate]), case rvi_common:send_component_request( protocol, send_message, @@ -593,16 +591,14 @@ create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) ets:insert(SvcsTid, Svc), %% Return new service and existing state. - ?info("schedule:create_service(): SvcName: ~p", [ ServiceName]), - ?info("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), - Svcs = ets:foldr(fun(SvcElem, Acc) -> [ SvcElem | Acc ] end, [], SvcsTid), - ?debug("schedule:create_service(): Services: ~p", [ Svcs]), + ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]), + ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), { ok, Svc, true, St}. %% True indicates that the service is newly created. %% Create a new and unique transaction id create_transaction_id(St) -> - ?info("schedule:create_transaction_id(): St: ~p", [ St ]), + ?debug("schedule:create_transaction_id(): St: ~p", [ St ]), ID = St#st.next_transaction_id, %% FIXME: Maybe interate pid into transaction to handle multiple diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index 3bcddae..19bebb0 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -65,8 +65,7 @@ dump_table(Table) -> dump_table(Table, ets:first(Table)). register_remote_service(NetworkAddress) -> - ?info("service_discovery_rpc:register_remote_service(): service: empty"), - ?info("service_discovery_rpc:register_remote_service(): network_address: ~p", [NetworkAddress]), + ?info("service_discovery_rpc:register_remote_service(): service(n/a) -> ~p", [NetworkAddress]), ets:insert(?REMOTE_ADDRESS_TABLE, #service_entry { @@ -79,8 +78,7 @@ register_remote_service(NetworkAddress) -> register_remote_service(Service, NetworkAddress) -> - ?info("service_discovery_rpc:register_remote_service(): service: ~p", [Service]), - ?info("service_discovery_rpc:register_remote_service(): network_address: ~p", [NetworkAddress]), + ?info("service_discovery_rpc:register_remote_service(): service(~p) -> ~p", [Service, NetworkAddress]), FullSvcName = rvi_common:remote_service_to_string(Service), @@ -117,21 +115,63 @@ register_remote_service(Service, NetworkAddress) -> unregister_remote_services_by_address(NetworkAddress) -> - ?info("service_discovery_rpc:unregister_remote_services_by_address(): network_address: ~p", - [NetworkAddress]), %% Delete all services registered under the given address. Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, NetworkAddress), + %% We now have a bunch of service records, convert them to a list of service + %% names and send them of to schedule for deregistration + AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) -> + [SvcName | Acc] + end, [], Svcs), + ?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p", - [NetworkAddress, Svcs]), + [NetworkAddress, AllSvcNames]), + + %% We need to filter AllSvcNames to remove all service entries that have + %% been registered under another name. + %% We do this by creating a list of all matching entries associated + %% with a network address not matching the disconnected NetworkAddress + %% + %% See issue https://github.com/PDXostc/rvi/issues/14 for details + FilterSvc = + lists:foldr( + fun(Service, Acc) -> + + %% Lookup the service in the service table. + case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of + + %% Not found. Do not filter out. + [] -> + Acc; + + %% We found or own entry, tiet to the disconnected address. + %% Do not add to addresses to be removed. + [ #service_entry { network_address = NetworkAddress } ] -> + Acc; + + %% We found an entry that does not the disconnected + %% network address. This one should be filtered out + [ _ ] -> + [ Service | Acc ] + + end + end, [], AllSvcNames), + SvcNames = AllSvcNames -- FilterSvc, - %% We now have a bunch of service records, convert them to a list of service - %% names and send them of to schedule for deregistration - SvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) -> - [SvcName | Acc] - end, [], Svcs), + + case FilterSvc of + [] -> ok; + + _ -> + ?info("service_discovery_rpc:unregister_remote_services_by_address(): Resurrected services: ~p", + [FilterSvc]), + + ?info("service_discovery_rpc:unregister_remote_services_by_address(): Filtered services to be deleted: ~p", + [SvcNames]) + end, + %% Delete any addresses stored with an empty service name, %% installed with register_remote_service/1, since we now have at @@ -143,10 +183,11 @@ unregister_remote_services_by_address(NetworkAddress) -> }), ets:delete(?REMOTE_ADDRESS_TABLE, NetworkAddress), - case Svcs of + case SvcNames of [] -> true; _ -> + rvi_common:send_component_request(schedule, unregister_remote_services, [ { services, SvcNames } @@ -161,15 +202,15 @@ unregister_remote_services_by_address(NetworkAddress) -> LocalSvcAddresses = ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE) + [], ?LOCAL_SERVICE_TABLE), %% Call service edge with local addresses (sorted and de-duped) and %% the services to register. - %% rvi_common:send_component_request(service_edge, unregister_remote_services, - %% [ - %% { local_service_addresses, lists:usort(LocalSvcAddresses)}, - %% { services, SvcNames} - %% ]) + rvi_common:send_component_request(service_edge, unregister_remote_services, + [ + { local_service_addresses, lists:usort(LocalSvcAddresses)}, + { services, SvcNames} + ]) end, {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. @@ -187,11 +228,6 @@ unregister_single_remote_service_by_name_(Service) -> network_address = '_' }), - Prior = ets:foldl(fun(#service_entry { service = Svc }, Acc) -> - [ Svc | Acc ] end, - [], ?REMOTE_SERVICE_TABLE), - - ?debug("Before removing ~p: ~p", [ Service, Prior ]), ets:delete(?REMOTE_SERVICE_TABLE, Service), After = ets:foldl(fun(#service_entry { service = Svc }, Acc) -> [ Svc | Acc ] end, @@ -210,11 +246,11 @@ unregister_single_remote_service_by_name_(Service) -> %% Call service edge with local addresses (sorted and de-duped) and %% the services to register. - %% rvi_common:send_component_request(service_edge, unregister_remote_services, - %% [ - %% { local_service_addresses, lists:usort(LocalSvcAddresses)}, - %% { services, [Service]} - %% ]), + rvi_common:send_component_request(service_edge, unregister_remote_services, + [ + { local_service_addresses, lists:usort(LocalSvcAddresses)}, + { services, [Service]} + ]), ok. @@ -224,7 +260,7 @@ unregister_remote_services_by_name(Services) -> {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. unregister_local_service(Service) -> - ?info("service_discovery_rpc:unregister_local_service(): Service~p", + ?info("service_discovery_rpc:unregister_local_service(): ~p", [Service]), ets:delete(?LOCAL_SERVICE_TABLE, Service), @@ -232,8 +268,7 @@ unregister_local_service(Service) -> register_local_service(NetworkAddress, Service) -> - ?info("service_discovery_rpc:register_local_service(): service: ~p", [Service]), - ?info("service_discovery_rpc:register_local_service(): network_address: ~p", [NetworkAddress]), + ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", [Service, NetworkAddress]), FullSvcName = rvi_common:local_service_to_string(Service), @@ -248,8 +283,8 @@ register_local_service(NetworkAddress, Service) -> resolve_local_service(RawService) -> Service = rvi_common:sanitize_service_string(RawService), - ?info("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]), - ?info("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]), + ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]), + ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]), case resolve_service(?LOCAL_SERVICE_TABLE, Service) of not_found -> { ok, [ { status, rvi_common:json_rpc_status(not_found) }]}; @@ -261,15 +296,15 @@ resolve_local_service(RawService) -> resolve_remote_service(RawService) -> Service = rvi_common:sanitize_service_string(RawService), - ?info("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]), - ?info("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]), + ?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]), + ?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]), case resolve_service(?REMOTE_SERVICE_TABLE, Service) of {ok, NetworkAddress } -> {ok, [ { status, rvi_common:json_rpc_status(ok) }, { network_address, NetworkAddress }]}; not_found -> - ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. " + ?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. " "Trying static nodes", [Service]), @@ -277,13 +312,13 @@ resolve_remote_service(RawService) -> %% Check if this is a service residing on the backend server case rvi_common:get_static_node(Service) of not_found -> %% Not found - ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found in static nodes.", + ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.", [Service]), { ok, [ { status, rvi_common:json_rpc_status(not_found) }]}; NetworkAddress -> %% Found - ?info("service_discovery_rpc:resolve_service(~p): Service is on static node ~p", + ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p", [Service, NetworkAddress]), {ok, [ { status, rvi_common:json_rpc_status(ok) }, @@ -315,40 +350,32 @@ register_remote_services(Address, Services) -> LocalSvcAddresses = ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE) + [], ?LOCAL_SERVICE_TABLE), %% Call service edge with local addresses (sorted and de-duped) and %% the services to register. - %% rvi_common:send_component_request(service_edge, register_remote_services, - %% [ - %% { local_service_addresses, lists:usort(LocalSvcAddresses)}, - %% { services, Services} - %% ]) + rvi_common:send_component_request(service_edge, register_remote_services, + [ + { local_service_addresses, lists:usort(LocalSvcAddresses)}, + { services, Services} + ]) end, {ok, [ { status, rvi_common:json_rpc_status(ok) } ]}. resolve_service(Table, Service) -> - - ?info("service_discovery_rpc:resolve_service(): CleanedService: ~p", [Service]), - - %% For info purposes only - Svcs = ets:foldl(fun({service_entry, ServiceName, ServiceAddr}, Acc) -> - [ {ServiceName, ServiceAddr} | Acc ] end, - [], Table), - ?info("service_discovery_rpc:resolve_service(): Services: ~p", [Svcs]), - - case ets:lookup(Table, Service) of %% We found a service entry, report it back [#service_entry { network_address = NetworkAddress }] -> - ?info("service_discovery_rpc:resolve_service(): service: ~p -> ~p", - [ Service, NetworkAddress ]), + ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> ~p", + [ Table, Service, NetworkAddress ]), {ok, NetworkAddress }; %% We did not find a service entry, check statically configured nodes. [] -> + ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> Not Found", + [ Table, Service ]), not_found end. @@ -360,7 +387,7 @@ get_services(Table) -> [ {ServiceName, ServiceAddr } | Acc ] end, [], Table), - ?info("service_discovery_rpc:get_services(): ~p", [ Services]), + ?debug("service_discovery_rpc:get_services(): ~p", [ Services ]), Services. get_all_services() -> @@ -373,7 +400,7 @@ get_all_services() -> [], ?LOCAL_SERVICE_TABLE), Services = RemoteSvc++LocalSvc, - ?info("service_discovery_rpc:get_all_services(): ~p", [ Services]), + ?debug("service_discovery_rpc:get_all_services(): ~p", [ Services]), Services. @@ -387,7 +414,7 @@ get_json_services(Table) -> ] } | Acc ] end, [], Table), - ?info("service_discovery_rpc:get_services(): ~p", [ Services]), + ?debug("service_discovery_rpc:get_services(): ~p", [ Services]), {ok, [ { status, rvi_common:json_rpc_status(ok) }, { services, {array, Services }}]}. @@ -409,7 +436,7 @@ get_network_addresses_(Table) -> %% Return a dup-scrubbed list. Addresses = sets:to_list(sets:from_list(AddrList)), - ?info("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]), + ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]), Addresses. @@ -522,7 +549,7 @@ handle_rpc("get_local_network_addresses", _Args) -> %% Handle the rest. %% handle_rpc( Other, _Args) -> - ?info("service_discovery_rpc:handle_rpc(~p)", [ Other ]), + ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]), { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 5ec2396..e8d98b5 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -307,25 +307,25 @@ flatten_ws_args(Args) -> -dispatch_to_local_service([ $w, $s, $: | _WSPidStr], services_available, +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, [{ services, Services}] ) -> - ?info("service_edge:dispatch_to_local_service(service_available): Websocket!: ~p", [ Services]), - %% wse:call(list_to_pid(WSPidStr), wse:window(), - %% "services_available", - %% [ "services", Services ]), + ?info("service_edge:dispatch_to_local_service(service_available, websock): ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_available", + [ "services", Services ]), ok; -dispatch_to_local_service([ $w, $s, $: | _WSPidStr], services_unavailable, +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, [{ services, Services}] ) -> - ?info("service_edge:dispatch_to_local_service(service_unavailable): Websocket!: ~p", [ Services]), - %% wse:call(list_to_pid(WSPidStr), wse:window(), - %% "services_unavailable", - %% [ "services", Services ]), + ?info("service_edge:dispatch_to_local_service(service_unavailable, websock): ~p", [ Services]), + wse:call(list_to_pid(WSPidStr), wse:window(), + "services_unavailable", + [ "services", Services ]), ok; dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, [{ service_name, SvcName}, { parameters, Args}] ) -> - ?info("service_edge:dispatch_to_local_service(message): Websocket!:~p", [Args]), + ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]), wse:call(list_to_pid(WSPidStr), wse:window(), "message", [ "service_name", SvcName ] ++ flatten_ws_args(Args)), |