summaryrefslogtreecommitdiff
path: root/components/service_discovery
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-03-18 20:02:21 -0700
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-03-18 20:02:21 -0700
commitd0bb358a78076972e1aaeaa769c93486b5f83110 (patch)
tree371101db4fd211a14c236e7a2c9757dbf6eb1486 /components/service_discovery
parent0a24e9c82235bdfb64ce03382e0073af63aba719 (diff)
downloadrvi_core-d0bb358a78076972e1aaeaa769c93486b5f83110.tar.gz
Rewrite to adapt to gen_server model
Diffstat (limited to 'components/service_discovery')
-rw-r--r--components/service_discovery/src/service_discovery.app.src2
-rw-r--r--components/service_discovery/src/service_discovery_app.erl4
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl854
3 files changed, 438 insertions, 422 deletions
diff --git a/components/service_discovery/src/service_discovery.app.src b/components/service_discovery/src/service_discovery.app.src
index 6d41bb4..4b32eed 100644
--- a/components/service_discovery/src/service_discovery.app.src
+++ b/components/service_discovery/src/service_discovery.app.src
@@ -19,5 +19,5 @@
rvi_common
]},
{mod, { service_discovery_app, []}},
- {start_phases, [{init, []}]}
+ {start_phases, [{json_rpc, []}]}
]}.
diff --git a/components/service_discovery/src/service_discovery_app.erl b/components/service_discovery/src/service_discovery_app.erl
index 7bd7fe5..75e2ab3 100644
--- a/components/service_discovery/src/service_discovery_app.erl
+++ b/components/service_discovery/src/service_discovery_app.erl
@@ -23,8 +23,8 @@
start(_StartType, _StartArgs) ->
service_discovery_sup:start_link().
-start_phase(init, _, _) ->
- service_discovery_rpc:init_rvi_component(),
+start_phase(json_rpc, _, _) ->
+ service_discovery_rpc:start_json_server(),
ok.
stop(_State) ->
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index 19bebb0..265bffa 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -15,7 +15,17 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([init_rvi_component/0]).
+-export([get_all_services/1,
+ get_local_network_addresses/1,
+ resolve_local_service/2,
+ resolve_remote_service/2,
+ register_remote_services/3,
+ register_local_service/3,
+ unregister_remote_services_by_address/2,
+ unregister_remote_service_by_name/2,
+ unregister_local_service/2]).
+
+-export([start_json_server/0]).
-include_lib("lager/include/log.hrl").
-define(LOCAL_SERVICE_TABLE, rvi_local_services).
@@ -35,109 +45,281 @@ start_link() ->
init([]) ->
?debug("service_discovery_rpc:init(): called."),
- {ok, #st {}}.
-
-%% Called by service_discovery_app:start_phase().
-init_rvi_component() ->
ets:new(?LOCAL_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]),
ets:new(?REMOTE_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]),
ets:new(?REMOTE_ADDRESS_TABLE, [duplicate_bag, public, named_table,
{keypos, #service_entry.network_address}]),
+ {ok, #st {}}.
- case rvi_common:get_component_config(service_discovery, exo_http_opts) of
- { ok, ExoHttpOpts } ->
- exoport_exo_http:instance(service_discovery_sup,
- service_discovery_rpc,
- ExoHttpOpts);
- Err -> Err
- end,
- ok.
-dump_table(_Table, '$end_of_table') ->
- true;
+start_json_server() ->
+ rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup).
-dump_table(Table, Key) ->
- Val = ets:lookup(Table, Key),
- ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]),
- dump_table(Table, ets:next(Table, Key)).
+get_all_services(CompSpec) ->
+ rvi_common:request(service_discovery, ?MODULE,
+ get_all_services, [], [status], CompSpec).
-dump_table(Table) ->
- dump_table(Table, ets:first(Table)).
+get_local_network_addresses(CompSpec) ->
+ rvi_common:request(service_discovery, ?MODULE,
+ get_local_network_addresses, [], [status], CompSpec).
-register_remote_service(NetworkAddress) ->
- ?info("service_discovery_rpc:register_remote_service(): service(n/a) -> ~p", [NetworkAddress]),
- ets:insert(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = "",
- network_address = NetworkAddress
- }),
+resolve_local_service(CompSpec, RawService) ->
+ rvi_common:request(service_discovery, ?MODULE, resove_local_service,
+ [RawService], [service],
+ [status, full_service], CompSpec).
- dump_table(?REMOTE_ADDRESS_TABLE),
- {ok, [ {service, ""}, { status, rvi_common:json_rpc_status(ok)}]}.
+resolve_remote_service(CompSpec, RawService) ->
+ rvi_common:request(service_discovery, ?MODULE, resolve_remote_service,
+ [RawService], [service],
+ [status], CompSpec).
-register_remote_service(Service, NetworkAddress) ->
- ?info("service_discovery_rpc:register_remote_service(): service(~p) -> ~p", [Service, NetworkAddress]),
+register_remote_services(CompSpec, Address, Services) ->
+ rvi_common:request(service_discovery, ?MODULE, register_remote_services,
+ [Address, Services], [network_address, services],
+ [status], CompSpec).
- FullSvcName = rvi_common:remote_service_to_string(Service),
- ets:insert(?REMOTE_SERVICE_TABLE,
- #service_entry {
- service = FullSvcName,
- network_address = NetworkAddress
- }),
+register_local_service(CompSpec, Address, Services) ->
+ rvi_common:request(service_discovery, ?MODULE, register_local_service,
+ [Address, Services], [network_address, service],
+ [status], CompSpec).
- %% Delete any addresses stored with an empty service name,
- %% installed with register_remote_service/1, since we now have at
- %% least one service name.
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = [],
- network_address = NetworkAddress
- }),
- %% Delete any previous instances of the given entry, in case
- %% the service registers multiple times
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = FullSvcName,
- network_address = NetworkAddress
- }),
- ets:insert(?REMOTE_ADDRESS_TABLE,
+unregister_remote_services_by_address(CompSpec, Address) ->
+ rvi_common:request(service_discovery, ?MODULE, unregister_remote_services_by_address,
+ [Address], [network_address],
+ [status], CompSpec).
+
+unregister_remote_service_by_name(CompSpec, Service) ->
+ rvi_common:request(service_discovery, ?MODULE, unregister_remote_service_by_name,
+ [Service], [service],
+ [status], CompSpec).
+
+unregister_local_service(CompSpec, Service) ->
+ rvi_common:request(service_discovery, ?MODULE, unregister_local_service,
+ [Service], [service],
+ [status], CompSpec).
+
+
+%% JSON-RPC entry point
+%% Called by local exo http server
+
+%% Register local services
+
+handle_rpc("register_local_service", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
+ [ok, FullSvcName] = gen_server:call(?SERVER, { rvi_call, register_local_service,
+ [ Service, Address ]}),
+ [ {status, ok }, { full_service_name, FullSvcName }];
+
+%% Register remote services
+
+handle_rpc("register_remote_services", Args) ->
+ {ok, Services} = rvi_common:get_json_element(["services"], Args),
+ {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
+
+ [ok ] = gen_server:call(?SERVER, { rvi_call, register_remote_services,
+ [ Services, Address ]}),
+ [ {status, ok} ];
+
+
+handle_rpc("unregister_remote_services_by_address", Args) ->
+ {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
+ [ok] = gen_server:call(?SERVER, { rvi_call, unregister_remote_services_by_address,
+ [ Address ]}),
+ [ {status, ok} ];
+
+handle_rpc("unregister_remote_service_by_name", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ [ok ] = gen_server:call(?SERVER, { rvi_call, unregister_remote_service_by_Name,
+ [ Service ]}),
+ [ {status, ok} ];
+
+
+handle_rpc("unregister_local_service", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ %% De-register service
+ [ok ] = gen_server:call(?SERVER, { rvi_call, unregister_local_service,
+ [ Service ]}),
+ [ {status, ok} ];
+
+
+%%
+%% Get remote services
+%%
+handle_rpc("get_remote_services", _Args) ->
+ [ok, Services ] = gen_server:call(?SERVER, { rvi_call, get_remote_services,
+ [ ]}),
+ [ {status, ok} , { services, { array, Services } }];
+
+%%
+%% Get all services
+%%
+handle_rpc("get_all_services", _Args) ->
+ [ok, Services ] = gen_server:call(?SERVER, { rvi_call, get_all_services,
+ []}),
+ [ {status, ok} , { services, { array, Services } }];
+
+
+%%
+%% Get remote network addresses
+%%
+handle_rpc("get_remote_network_addresses", _Args) ->
+ [ok, Addresses ] = gen_server:call(?SERVER, { rvi_call, get_remote_network_addresses,
+ []}),
+ [ {status, ok}, { addresses, { array, Addresses }}];
+
+%%
+%% Resolve remote service
+%%
+handle_rpc("resolve_remote_service", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+
+ case gen_server:call(?SERVER, { rvi_call, resolve_remote_service,
+ [Service]}) of
+ [ok, Addresses ] ->
+ [ {status, ok}, { addresses, { array, Addresses }}];
+
+ [ Other ] -> [ {status, Other} ]
+ end;
+
+
+
+%%
+%% Resolve local service
+%%
+handle_rpc("resolve_local_service", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+
+ case gen_server:call(?SERVER, { rvi_call, resolve_local_service,
+ [Service]}) of
+ [ok, Addresses ] ->
+ [ {status, ok}, { addresses, { array, Addresses }}];
+
+ [ Other ] -> [ {status, Other} ]
+ end;
+
+
+
+
+%%
+%% Get local services
+%%
+handle_rpc("get_local_services", _Args) ->
+ [ok, LocalServices ] =
+ gen_server:call(?SERVER, { rvi_call, get_local_services, []}),
+ [ {status, ok} , { services, { array, LocalServices }}];
+
+
+
+%%
+%% Get local network addresses
+%%
+handle_rpc("get_local_network_addresses", _Args) ->
+ [ok, LocalAddresses ] =
+ gen_server:call(?SERVER, { rvi_call, get_local_network_addresses, []}),
+
+ [ {status, ok} , { network_addresses, { array, LocalAddresses }}];
+
+
+
+%%
+%% Handle the rest.
+%%
+handle_rpc( Other, _Args) ->
+ ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]),
+ [ { status, invalid_command } ].
+
+
+%% Handle calls received through regular gen_server calls, routed by
+%% rvi_common:send_component_request()
+
+handle_call({rvi_call, register_local_service, [Service, Address] }, _From, State) ->
+ ?info("service_discovery_rpc:register_local_service(): ~p -> ~p",
+ [Service, Address]),
+
+ FullSvcName = rvi_common:local_service_to_string(Service),
+
+ ets:insert(?LOCAL_SERVICE_TABLE,
#service_entry {
service = FullSvcName,
- network_address = NetworkAddress
+ network_address = Address
}),
+ {reply, [ ok, FullSvcName ], State };
+
+
+handle_call({rvi_call, register_remote_services, [Services, Address] }, _From, State) ->
+ case Services of
+ %% We have zero services associated with address.
+ %% Just register the address.
+ [] ->
+ ?info("service_discovery_rpc:register_remote_service_(): service(n/a) -> ~p",
+ [Address]),
+
+ ets:insert(?REMOTE_ADDRESS_TABLE,
+ #service_entry {
+ service = "",
+ network_address = Address
+ }),
+
+ dump_table(?REMOTE_ADDRESS_TABLE),
+ ok;
+
+ %% Loop through the services and register them.
+ _ ->
+ lists:map(fun(Svc) ->
+ register_remote_service_(Svc, Address)
+ end, Services),
+
+ %% Forward to scheduler now that we have updated our own state
+ schedule_rpc:register_remote_services(Address, Services),
+
+ %% Forward to service edge so that it can inform its locally
+ %% connected services.
+ %% Build a list of all our local services' addresses to provide
+ %% to service edge so that it knows where to send the,
+ LocalSvcAddresses =
+ ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
+ [ LocalAddress | Acc ] end,
+ [], ?LOCAL_SERVICE_TABLE),
- {ok, [ {service, FullSvcName}, { status, rvi_common:json_rpc_status(ok)}]}.
+ %% Call service edge with local addresses (sorted and de-duped) and
+ %% the services to register.
+ service_edge_rpc:register_remote_services(Services, lists:usort(LocalSvcAddresses))
+ end,
+ {reply, [ok], State };
-unregister_remote_services_by_address(NetworkAddress) ->
+%%
+%% Delete all services registered under the given address
+%%
+handle_call({rvi_call, unregister_remote_services_by_address, [Address]}, _From, State) ->
- %% Delete all services registered under the given address.
- Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, NetworkAddress),
+ %% Retrieve all services associated with the remote address
+ Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, Address),
- %% We now have a bunch of service records, convert them to a list of service
+ %% We now have a list of service records, convert them to a list of service
%% names and send them of to schedule for deregistration
AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) ->
[SvcName | Acc]
end, [], Svcs),
?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p",
- [NetworkAddress, AllSvcNames]),
+ [Address, AllSvcNames]),
%% We need to filter AllSvcNames to remove all service entries that have
%% been registered under another name.
%% We do this by creating a list of all matching entries associated
- %% with a network address not matching the disconnected NetworkAddress
+ %% with a network address not matching the disconnected Address
%%
%% See issue https://github.com/PDXostc/rvi/issues/14 for details
FilterSvc =
lists:foldr(
fun(Service, Acc) ->
-
%% Lookup the service in the service table.
case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of
@@ -147,7 +329,7 @@ unregister_remote_services_by_address(NetworkAddress) ->
%% We found or own entry, tiet to the disconnected address.
%% Do not add to addresses to be removed.
- [ #service_entry { network_address = NetworkAddress } ] ->
+ [ #service_entry { network_address = Address } ] ->
Acc;
%% We found an entry that does not the disconnected
@@ -160,17 +342,9 @@ unregister_remote_services_by_address(NetworkAddress) ->
SvcNames = AllSvcNames -- FilterSvc,
-
- case FilterSvc of
- [] -> ok;
-
- _ ->
- ?info("service_discovery_rpc:unregister_remote_services_by_address(): Resurrected services: ~p",
- [FilterSvc]),
-
- ?info("service_discovery_rpc:unregister_remote_services_by_address(): Filtered services to be deleted: ~p",
- [SvcNames])
- end,
+ ?info("service_discovery_rpc:unregister_remote_services_by_address(): "
+ "Resurrected services: ~p",
+ [FilterSvc]),
%% Delete any addresses stored with an empty service name,
@@ -179,23 +353,25 @@ unregister_remote_services_by_address(NetworkAddress) ->
ets:match_delete(?REMOTE_ADDRESS_TABLE,
#service_entry {
service = [],
- network_address = NetworkAddress
+ network_address = Address
}),
- ets:delete(?REMOTE_ADDRESS_TABLE, NetworkAddress),
+ ets:delete(?REMOTE_ADDRESS_TABLE, Address),
+
+ %% Go through all service names and unregister them with schedulew
+ %% and service edge
case SvcNames of
+ %% Nothing to do.
[] ->
true;
_ ->
+ %% Tell scheduler to kill off services
+ schedule_rpc:unregister_remote_services(SvcNames),
- rvi_common:send_component_request(schedule, unregister_remote_services,
- [
- { services, SvcNames }
- ]),
-
+ %% Delete all services from remote service table.
[ ets:delete(?REMOTE_SERVICE_TABLE, Svc#service_entry.service) || Svc <- Svcs ],
- %% Forward to service edge so that it can inform its locally
+ %% Forward to service edge so that it can inform its locally
%% connected services.
%% Build a list of all our local services' addresses to provide
%% to service edge so that it knows where to send the,
@@ -204,104 +380,31 @@ unregister_remote_services_by_address(NetworkAddress) ->
[ LocalAddress | Acc ] end,
[], ?LOCAL_SERVICE_TABLE),
- %% Call service edge with local addresses (sorted and de-duped) and
- %% the services to register.
- rvi_common:send_component_request(service_edge, unregister_remote_services,
- [
- { local_service_addresses, lists:usort(LocalSvcAddresses)},
- { services, SvcNames}
- ])
- end,
-
- {ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
-
+ %% Call service edge with local addresses (sorted and
+ %% de-duped) and the services to register.
+ service_edge_rpc:unregister_remote_services(SvcNames, lists:usort(LocalSvcAddresses))
-unregister_single_remote_service_by_name_(Service) ->
- ?info("service_discovery_rpc:unregister_remote_services_by_name(): ~p",
- [Service]),
-
-
- %% Delete any remote address table entries with a matching Service.
- ets:match_delete(?REMOTE_ADDRESS_TABLE,
- #service_entry {
- service = Service,
- network_address = '_'
- }),
-
- ets:delete(?REMOTE_SERVICE_TABLE, Service),
- After = ets:foldl(fun(#service_entry { service = Svc }, Acc) ->
- [ Svc | Acc ] end,
- [], ?REMOTE_SERVICE_TABLE),
-
- ?debug("AFter removing ~p: ~p", [ Service, After ]),
-
- %% Forward to service edge so that it can inform its locally
- %% connected services.
- %% Build a list of all our local services' addresses to provide
- %% to service edge so that it knows where to send the,
- LocalSvcAddresses =
- ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
- [ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- %% Call service edge with local addresses (sorted and de-duped) and
- %% the services to register.
- rvi_common:send_component_request(service_edge, unregister_remote_services,
- [
- { local_service_addresses, lists:usort(LocalSvcAddresses)},
- { services, [Service]}
- ]),
-
- ok.
+ end,
+ {reply, [ok], State };
+handle_call({rvi_call, unregister_remote_services_by_name, [Services]}, _From, State) ->
+ unregister_remote_services_by_name_(Services),
+ {reply, [ok], State };
-unregister_remote_services_by_name(Services) ->
- [ unregister_single_remote_service_by_name_(Svc) || Svc <- Services],
- {ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
-
-unregister_local_service(Service) ->
+handle_call({rvi_call, unregister_local_service, [Service]}, _From, State) ->
?info("service_discovery_rpc:unregister_local_service(): ~p",
[Service]),
ets:delete(?LOCAL_SERVICE_TABLE, Service),
- {ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
-
+ {reply, [ok], State };
-register_local_service(NetworkAddress, Service) ->
- ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", [Service, NetworkAddress]),
-
- FullSvcName = rvi_common:local_service_to_string(Service),
-
- ets:insert(?LOCAL_SERVICE_TABLE,
- #service_entry {
- service = FullSvcName,
- network_address = NetworkAddress
- }),
- FullSvcName.
-
-
-
-resolve_local_service(RawService) ->
- Service = rvi_common:sanitize_service_string(RawService),
- ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]),
- ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]),
- case resolve_service(?LOCAL_SERVICE_TABLE, Service) of
- not_found ->
- { ok, [ { status, rvi_common:json_rpc_status(not_found) }]};
-
- {ok, NetworkAddress } ->
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { network_address, NetworkAddress }]}
- end.
-
-resolve_remote_service(RawService) ->
+handle_call({rvi_call, resolve_remote_service, [RawService]}, _From, State) ->
Service = rvi_common:sanitize_service_string(RawService),
?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]),
?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]),
- case resolve_service(?REMOTE_SERVICE_TABLE, Service) of
+ case resolve_service_(?REMOTE_SERVICE_TABLE, Service) of
{ok, NetworkAddress } ->
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { network_address, NetworkAddress }]};
+ {reply, [ok, NetworkAddress ], State};
not_found ->
?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. "
@@ -315,82 +418,21 @@ resolve_remote_service(RawService) ->
?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.",
[Service]),
- { ok, [ { status, rvi_common:json_rpc_status(not_found) }]};
+ {reply, [not_found], State };
NetworkAddress -> %% Found
- ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p",
- [Service, NetworkAddress]),
-
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { network_address, NetworkAddress }]}
+ ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p",
+ [Service, NetworkAddress]),
+
+ {reply, [ok, NetworkAddress ], State}
end
- end.
-
-
-
-
-register_remote_services(Address, Services) ->
- %% Loop through the services and register them.
- case Services of
- [] -> register_remote_service(Address);
- _ ->
- lists:map(fun(Svc) -> register_remote_service(Svc, Address) end, Services),
-
- %% Forward to scheduler now that we have updated our own state
- rvi_common:send_component_request(schedule, register_remote_services,
- [
- {services, Services},
- { network_address, Address }
- ]),
+ end;
- %% Forward to service edge so that it can inform its locally
- %% connected services.
- %% Build a list of all our local services' addresses to provide
- %% to service edge so that it knows where to send the,
- LocalSvcAddresses =
- ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
- [ LocalAddress | Acc ] end,
- [], ?LOCAL_SERVICE_TABLE),
-
- %% Call service edge with local addresses (sorted and de-duped) and
- %% the services to register.
- rvi_common:send_component_request(service_edge, register_remote_services,
- [
- { local_service_addresses, lists:usort(LocalSvcAddresses)},
- { services, Services}
- ])
- end,
-
- {ok, [ { status, rvi_common:json_rpc_status(ok) } ]}.
-
-resolve_service(Table, Service) ->
- case ets:lookup(Table, Service) of
- %% We found a service entry, report it back
- [#service_entry { network_address = NetworkAddress }] ->
- ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> ~p",
- [ Table, Service, NetworkAddress ]),
-
- {ok, NetworkAddress };
-
- %% We did not find a service entry, check statically configured nodes.
- [] ->
- ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> Not Found",
- [ Table, Service ]),
- not_found
- end.
-
-
-
-get_services(Table) ->
- Services = ets:foldl(fun(#service_entry {service = ServiceName,
- network_address = ServiceAddr}, Acc) ->
- [ {ServiceName, ServiceAddr } | Acc ] end,
- [], Table),
-
- ?debug("service_discovery_rpc:get_services(): ~p", [ Services ]),
- Services.
+handle_call({rvi_call, get_remote_services, _Args}, _From, State) ->
+ Services = get_services_(?REMOTE_SERVICE_TABLE),
+ {reply, [ok, Services ], State };
-get_all_services() ->
+handle_call({rvi_call, get_all_services, _Args}, _From, State) ->
RemoteSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
[ ServiceName | Acc ] end,
[], ?REMOTE_SERVICE_TABLE),
@@ -400,233 +442,207 @@ get_all_services() ->
[], ?LOCAL_SERVICE_TABLE),
Services = RemoteSvc++LocalSvc,
- ?debug("service_discovery_rpc:get_all_services(): ~p", [ Services]),
- Services.
-
-
-get_json_services(Table) ->
- Services = ets:foldl(fun(#service_entry {service = ServiceName,
- network_address = ServiceAddr}, Acc) ->
- [ {struct,
- [
- {service, ServiceName},
- {address, ServiceAddr}
- ]
- } | Acc ] end,
- [], Table),
- ?debug("service_discovery_rpc:get_services(): ~p", [ Services]),
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { services, {array, Services }}]}.
+ {reply, [ok, Services], State };
-get_network_addresses_(Table) ->
- AddrList = ets:foldl(fun(#service_entry {network_address = NetworkAddr}, Acc)
- when NetworkAddr =:= unavailable ->
- Acc; %% Don't report if service is not active
-
- %% We have an active network address
- (#service_entry {network_address = NetworkAddr}, Acc) ->
- %% Avoid duplicates
- case lists:keyfind(NetworkAddr, 1, Acc) of
- false ->[ NetworkAddr | Acc ];
- _ -> Acc
- end
- end, [], Table),
-
-
- %% Return a dup-scrubbed list.
- Addresses = sets:to_list(sets:from_list(AddrList)),
- ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]),
- Addresses.
-
-
-get_json_network_addresses(Table) ->
- Addresses = get_network_addresses_(Table),
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { addresses, {array, Addresses }}]}.
-
-get_network_addresses(Table) ->
- Addresses = get_network_addresses_(Table),
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { addresses, Addresses }]}.
+handle_call({rvi_call, get_remote_network_addresses, _Args}, _From, State) ->
+ Addresses = get_network_addresses_(?REMOTE_ADDRESS_TABLE),
+ {reply, [ ok, Addresses ], State };
-%% JSON-RPC entry point
-%% Called by local exo http server
+handle_call({rvi_call, resolve_local_service, [RawService]}, _From, State) ->
+ Service = rvi_common:sanitize_service_string(RawService),
+ ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]),
+ ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]),
-%% Register local services
+ case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of
+ not_found ->
+ { reply, [not_found], State };
+
+ {ok, NetworkAddress } ->
+ { reply, [ok, NetworkAddress], State}
+ end;
-handle_rpc("register_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
- FullSvcName = register_local_service(Address, Service),
- {ok, [ { service, FullSvcName },
- { status, rvi_common:json_rpc_status(ok) }
- ]
- };
+handle_call({rvi_call, get_local_services, _Args}, _From, State) ->
+ Services = get_services_(?LOCAL_SERVICE_TABLE),
+ {reply, [ok, Services ], State };
+handle_call({rvi_call, get_local_network_addresses, _Args}, _From, State) ->
+ Addresses = get_network_addresses_(?LOCAL_SERVICE_TABLE),
+ {reply, [ ok, Addresses ], State };
-%% Register remote services
+handle_call(Other, _From, State) ->
+ ?warning("service_discovery_rpc:handle_call(~p): unknown", [ Other ]),
+ { reply, [unknown_command] , State}.
-handle_rpc("register_remote_services", Args) ->
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
- register_remote_services(Address, Services);
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
-handle_rpc("unregister_remote_services_by_address", Args) ->
- {ok, Address} = rvi_common:get_json_element(["network_address"], Args),
+terminate(_Reason, _State) ->
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
- %% Loop through the services and de-register them.
- unregister_remote_services_by_address(Address);
-handle_rpc("unregister_remote_services_by_name", Args) ->
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
+%%
+%% INTERNAL SUPPORT FUNCTIONS
+%%
+dump_table(_Table, '$end_of_table') ->
+ true;
- %% Loop through the services and de-register them.
- unregister_remote_services_by_name(Services);
+dump_table(Table, Key) ->
+ Val = ets:lookup(Table, Key),
+ ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]),
+ dump_table(Table, ets:next(Table, Key)).
-handle_rpc("unregister_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- %% De-register service
- unregister_local_service(Service);
+dump_table(Table) ->
+ dump_table(Table, ets:first(Table)).
-%%
-%% Resolve remote service
-%%
-handle_rpc("resolve_remote_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- resolve_remote_service(Service);
+register_remote_service_(Service, NetworkAddress) ->
+ ?info("service_discovery_rpc:register_remote_service_(): service(~p) -> ~p",
+ [Service, NetworkAddress]),
+ FullSvcName = rvi_common:remote_service_to_string(Service),
-%%
-%% Get remote services
-%%
-handle_rpc("get_remote_services", _Args) ->
- get_json_services(?REMOTE_SERVICE_TABLE);
+ ets:insert(?REMOTE_SERVICE_TABLE,
+ #service_entry {
+ service = FullSvcName,
+ network_address = NetworkAddress
+ }),
-%%
-%% Get all services
-%%
-handle_rpc("get_all_services", _Args) ->
- Services = get_all_services(),
- {ok, [ { status, rvi_common:json_rpc_status(ok) },
- { services, {array, Services }}]};
+ %% Delete any addresses stored with an empty service name,
+ %% installed with register_remote_service/1, since we now have at
+ %% least one service name.
+ ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ #service_entry {
+ service = [],
+ network_address = NetworkAddress
+ }),
+ %% Delete any previous instances of the given entry, in case
+ %% the service registers multiple times
+ ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ #service_entry {
+ service = FullSvcName,
+ network_address = NetworkAddress
+ }),
-%%
-%% Get remote network addresses
-%%
-handle_rpc("get_remote_network_addresses", _Args) ->
- get_json_network_addresses(?REMOTE_ADDRESS_TABLE);
+ ets:insert(?REMOTE_ADDRESS_TABLE,
+ #service_entry {
+ service = FullSvcName,
+ network_address = NetworkAddress
+ }),
+ {ok, FullSvcName}.
-%%
-%% Resolve local service
-%%
-handle_rpc("resolve_local_service", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- resolve_local_service(Service);
-%%
-%% Get local services
-%%
-handle_rpc("get_local_services", _Args) ->
- get_json_services(?LOCAL_SERVICE_TABLE);
+unregister_single_remote_service_by_name_(Service) ->
+ ?info("service_discovery_rpc:unregister_remote_services_by_name(): ~p",
+ [Service]),
-%%
-%% Get local network addresses
-%%
-handle_rpc("get_local_network_addresses", _Args) ->
- get_json_network_addresses(?LOCAL_SERVICE_TABLE);
+ %% Delete any remote address table entries with a matching Service.
+ ets:match_delete(?REMOTE_ADDRESS_TABLE,
+ #service_entry {
+ service = Service,
+ network_address = '_'
+ }),
+ ets:delete(?REMOTE_SERVICE_TABLE, Service),
+ After = ets:foldl(fun(#service_entry { service = Svc }, Acc) ->
+ [ Svc | Acc ] end,
+ [], ?REMOTE_SERVICE_TABLE),
-%%
-%% Handle the rest.
-%%
-handle_rpc( Other, _Args) ->
- ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]),
- { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
+ ?debug("Ater removing ~p: ~p", [ Service, After ]),
+
+ %% Forward to service edge so that it can inform its locally
+ %% connected services.
+ %% Build a list of all our local services' addresses to provide
+ %% to service edge so that it knows where to send the,
+ LocalSvcAddresses =
+ ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) ->
+ [ LocalAddress | Acc ] end,
+ [], ?LOCAL_SERVICE_TABLE),
+
+ %% Call service edge with local addresses (sorted and de-duped) and
+ %% the services to register.
+ service_edge_rpc:unregister_remote_services([Service], lists:usort(LocalSvcAddresses)),
-%% Handle calls received through regular gen_server calls, routed byh
-%% rvi_common:send_component_request()
+ ok.
-handle_call({rvi_call, register_local_service, Args}, _From, State) ->
- {_, Service} = lists:keyfind(service, 1, Args),
- {_, Address} = lists:keyfind(network_address, 1, Args),
- FullSvcName = register_local_service(Address, Service),
- {reply, {ok, [ { service, FullSvcName },
- { status, rvi_common:json_rpc_status(ok) }
- ]
- }, State };
+%% Loop through multiple services and remove them one by one
+unregister_remote_services_by_name_(Services) ->
+ [ unregister_single_remote_service_by_name_(Svc) || Svc <- Services],
+ ok.
-handle_call({rvi_call, register_remote_services, Args}, _From, State) ->
- {_, Services} = lists:keyfind(services, 1, Args),
- {_, Address} = lists:keyfind(network_address, 1, Args),
- {reply, register_remote_services(Address, Services), State };
-handle_call({rvi_call, unregister_remote_services_by_address, Args}, _From, State) ->
- {_, Address} = lists:keyfind(network_address, 1, Args),
- {reply, unregister_remote_services_by_address(Address), State };
+get_services_(Table) ->
+ Services = ets:foldl(fun(#service_entry {service = ServiceName,
+ network_address = ServiceAddr}, Acc) ->
+ [ { ServiceName, ServiceAddr } | Acc ] end,
+ [], Table),
-handle_call({rvi_call, unregister_remote_services_by_name, Args}, _From, State) ->
- {_, Services} = lists:keyfind(services, 1, Args),
- {reply, unregister_remote_services_by_name(Services), State };
+ ?debug("service_discovery_rpc:get_services_(): ~p", [ Services ]),
+ Services.
-handle_call({rvi_call, unregister_local_service, Args}, _From, State) ->
- {_, Service} = lists:keyfind(service, 1, Args),
- {reply, unregister_local_service(Service), State };
-handle_call({rvi_call, resolve_remote_service, Args}, _From, State) ->
- {_, Service} = lists:keyfind(service, 1, Args),
- {reply, resolve_remote_service(Service), State };
+get_network_addresses_(Table) ->
+ AddrList = ets:foldl(fun(#service_entry {network_address = NetworkAddr}, Acc)
+ when NetworkAddr =:= unavailable ->
+ Acc; %% Don't report if service is not active
-handle_call({rvi_call, get_remote_services, _Args}, _From, State) ->
- Services = get_services(?REMOTE_SERVICE_TABLE),
- {reply, {ok,
- [ { status, rvi_common:json_rpc_status(ok) },
- { services, Services }]}, State };
+ %% We have an active network address
+ (#service_entry {network_address = NetworkAddr}, Acc) ->
+ %% Avoid duplicates
+ case lists:keyfind(NetworkAddr, 1, Acc) of
+ false ->[ NetworkAddr | Acc ];
+ _ -> Acc
+ end
+ end, [], Table),
-handle_call({rvi_call, get_all_services, _Args}, _From, State) ->
- Services = get_all_services(),
- {reply, {ok,
- [ { status, rvi_common:json_rpc_status(ok) },
- { services, Services }]}, State };
-handle_call({rvi_call, get_remote_network_addresses, _Args}, _From, State) ->
- {reply, get_network_addresses(?REMOTE_ADDRESS_TABLE), State };
+ %% Return a dup-scrubbed list.
+ Addresses = sets:to_list(sets:from_list(AddrList)),
+ ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]),
+ Addresses.
-handle_call({rvi_call, resolve_local_service, Args}, _From, State) ->
- {_, Service} = lists:keyfind(service, 1, Args),
- {reply, resolve_local_service(Service), State };
-handle_call({rvi_call, get_local_services, _Args}, _From, State) ->
- Services = get_services(?LOCAL_SERVICE_TABLE),
- {reply, {ok,
- [ { status, rvi_common:json_rpc_status(ok) },
- { services, Services }]}, State };
+resolve_service_(Table, Service) ->
+ case ets:lookup(Table, Service) of
+ %% We found a service entry, report it back
+ [#service_entry { network_address = NetworkAddress }] ->
+ ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> ~p",
+ [ Table, Service, NetworkAddress ]),
-handle_call({rvi_call, get_local_network_addresses, _Args}, _From, State) ->
- {reply, get_network_addresses(?LOCAL_SERVICE_TABLE), State };
+ {ok, NetworkAddress };
-handle_call(Other, _From, State) ->
- ?warning("service_discovery_rpc:handle_call(~p): unknown", [ Other ]),
- { reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, State}.
+ %% We did not find a service entry, check statically configured nodes.
+ [] ->
+ ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> Not Found",
+ [ Table, Service ]),
+ not_found
+ end.
-handle_cast(_Msg, State) ->
- {noreply, State}.
-handle_info(_Info, State) ->
- {noreply, State}.
-terminate(_Reason, _State) ->
- ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+%% Convert services returned by get_services_ to JSON format
+convert_json_services(Services) ->
+ JSONServices = lists:foldl(fun({ ServiceName, ServiceAddr}, Acc) ->
+ [ {struct,
+ [
+ {service, ServiceName },
+ {address, ServiceAddr }
+ ]
+ } | Acc ] end,
+ [], Services),
+ ?debug("service_discovery_rpc:get_services_(): ~p", [ Services]),
+ JSONServices.