diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-18 20:02:21 -0700 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-18 20:02:21 -0700 |
commit | d0bb358a78076972e1aaeaa769c93486b5f83110 (patch) | |
tree | 371101db4fd211a14c236e7a2c9757dbf6eb1486 /components/service_discovery | |
parent | 0a24e9c82235bdfb64ce03382e0073af63aba719 (diff) | |
download | rvi_core-d0bb358a78076972e1aaeaa769c93486b5f83110.tar.gz |
Rewrite to adapt to gen_server model
Diffstat (limited to 'components/service_discovery')
3 files changed, 438 insertions, 422 deletions
diff --git a/components/service_discovery/src/service_discovery.app.src b/components/service_discovery/src/service_discovery.app.src index 6d41bb4..4b32eed 100644 --- a/components/service_discovery/src/service_discovery.app.src +++ b/components/service_discovery/src/service_discovery.app.src @@ -19,5 +19,5 @@ rvi_common ]}, {mod, { service_discovery_app, []}}, - {start_phases, [{init, []}]} + {start_phases, [{json_rpc, []}]} ]}. diff --git a/components/service_discovery/src/service_discovery_app.erl b/components/service_discovery/src/service_discovery_app.erl index 7bd7fe5..75e2ab3 100644 --- a/components/service_discovery/src/service_discovery_app.erl +++ b/components/service_discovery/src/service_discovery_app.erl @@ -23,8 +23,8 @@ start(_StartType, _StartArgs) -> service_discovery_sup:start_link(). -start_phase(init, _, _) -> - service_discovery_rpc:init_rvi_component(), +start_phase(json_rpc, _, _) -> + service_discovery_rpc:start_json_server(), ok. stop(_State) -> diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index 19bebb0..265bffa 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -15,7 +15,17 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([init_rvi_component/0]). +-export([get_all_services/1, + get_local_network_addresses/1, + resolve_local_service/2, + resolve_remote_service/2, + register_remote_services/3, + register_local_service/3, + unregister_remote_services_by_address/2, + unregister_remote_service_by_name/2, + unregister_local_service/2]). + +-export([start_json_server/0]). -include_lib("lager/include/log.hrl"). -define(LOCAL_SERVICE_TABLE, rvi_local_services). @@ -35,109 +45,281 @@ start_link() -> init([]) -> ?debug("service_discovery_rpc:init(): called."), - {ok, #st {}}. - -%% Called by service_discovery_app:start_phase(). -init_rvi_component() -> ets:new(?LOCAL_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]), ets:new(?REMOTE_SERVICE_TABLE, [set, public, named_table, { keypos, #service_entry.service }]), ets:new(?REMOTE_ADDRESS_TABLE, [duplicate_bag, public, named_table, {keypos, #service_entry.network_address}]), + {ok, #st {}}. - case rvi_common:get_component_config(service_discovery, exo_http_opts) of - { ok, ExoHttpOpts } -> - exoport_exo_http:instance(service_discovery_sup, - service_discovery_rpc, - ExoHttpOpts); - Err -> Err - end, - ok. -dump_table(_Table, '$end_of_table') -> - true; +start_json_server() -> + rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup). -dump_table(Table, Key) -> - Val = ets:lookup(Table, Key), - ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]), - dump_table(Table, ets:next(Table, Key)). +get_all_services(CompSpec) -> + rvi_common:request(service_discovery, ?MODULE, + get_all_services, [], [status], CompSpec). -dump_table(Table) -> - dump_table(Table, ets:first(Table)). +get_local_network_addresses(CompSpec) -> + rvi_common:request(service_discovery, ?MODULE, + get_local_network_addresses, [], [status], CompSpec). -register_remote_service(NetworkAddress) -> - ?info("service_discovery_rpc:register_remote_service(): service(n/a) -> ~p", [NetworkAddress]), - ets:insert(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = "", - network_address = NetworkAddress - }), +resolve_local_service(CompSpec, RawService) -> + rvi_common:request(service_discovery, ?MODULE, resove_local_service, + [RawService], [service], + [status, full_service], CompSpec). - dump_table(?REMOTE_ADDRESS_TABLE), - {ok, [ {service, ""}, { status, rvi_common:json_rpc_status(ok)}]}. +resolve_remote_service(CompSpec, RawService) -> + rvi_common:request(service_discovery, ?MODULE, resolve_remote_service, + [RawService], [service], + [status], CompSpec). -register_remote_service(Service, NetworkAddress) -> - ?info("service_discovery_rpc:register_remote_service(): service(~p) -> ~p", [Service, NetworkAddress]), +register_remote_services(CompSpec, Address, Services) -> + rvi_common:request(service_discovery, ?MODULE, register_remote_services, + [Address, Services], [network_address, services], + [status], CompSpec). - FullSvcName = rvi_common:remote_service_to_string(Service), - ets:insert(?REMOTE_SERVICE_TABLE, - #service_entry { - service = FullSvcName, - network_address = NetworkAddress - }), +register_local_service(CompSpec, Address, Services) -> + rvi_common:request(service_discovery, ?MODULE, register_local_service, + [Address, Services], [network_address, service], + [status], CompSpec). - %% Delete any addresses stored with an empty service name, - %% installed with register_remote_service/1, since we now have at - %% least one service name. - ets:match_delete(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = [], - network_address = NetworkAddress - }), - %% Delete any previous instances of the given entry, in case - %% the service registers multiple times - ets:match_delete(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = FullSvcName, - network_address = NetworkAddress - }), - ets:insert(?REMOTE_ADDRESS_TABLE, +unregister_remote_services_by_address(CompSpec, Address) -> + rvi_common:request(service_discovery, ?MODULE, unregister_remote_services_by_address, + [Address], [network_address], + [status], CompSpec). + +unregister_remote_service_by_name(CompSpec, Service) -> + rvi_common:request(service_discovery, ?MODULE, unregister_remote_service_by_name, + [Service], [service], + [status], CompSpec). + +unregister_local_service(CompSpec, Service) -> + rvi_common:request(service_discovery, ?MODULE, unregister_local_service, + [Service], [service], + [status], CompSpec). + + +%% JSON-RPC entry point +%% Called by local exo http server + +%% Register local services + +handle_rpc("register_local_service", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + {ok, Address} = rvi_common:get_json_element(["network_address"], Args), + [ok, FullSvcName] = gen_server:call(?SERVER, { rvi_call, register_local_service, + [ Service, Address ]}), + [ {status, ok }, { full_service_name, FullSvcName }]; + +%% Register remote services + +handle_rpc("register_remote_services", Args) -> + {ok, Services} = rvi_common:get_json_element(["services"], Args), + {ok, Address} = rvi_common:get_json_element(["network_address"], Args), + + [ok ] = gen_server:call(?SERVER, { rvi_call, register_remote_services, + [ Services, Address ]}), + [ {status, ok} ]; + + +handle_rpc("unregister_remote_services_by_address", Args) -> + {ok, Address} = rvi_common:get_json_element(["network_address"], Args), + [ok] = gen_server:call(?SERVER, { rvi_call, unregister_remote_services_by_address, + [ Address ]}), + [ {status, ok} ]; + +handle_rpc("unregister_remote_service_by_name", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + [ok ] = gen_server:call(?SERVER, { rvi_call, unregister_remote_service_by_Name, + [ Service ]}), + [ {status, ok} ]; + + +handle_rpc("unregister_local_service", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + %% De-register service + [ok ] = gen_server:call(?SERVER, { rvi_call, unregister_local_service, + [ Service ]}), + [ {status, ok} ]; + + +%% +%% Get remote services +%% +handle_rpc("get_remote_services", _Args) -> + [ok, Services ] = gen_server:call(?SERVER, { rvi_call, get_remote_services, + [ ]}), + [ {status, ok} , { services, { array, Services } }]; + +%% +%% Get all services +%% +handle_rpc("get_all_services", _Args) -> + [ok, Services ] = gen_server:call(?SERVER, { rvi_call, get_all_services, + []}), + [ {status, ok} , { services, { array, Services } }]; + + +%% +%% Get remote network addresses +%% +handle_rpc("get_remote_network_addresses", _Args) -> + [ok, Addresses ] = gen_server:call(?SERVER, { rvi_call, get_remote_network_addresses, + []}), + [ {status, ok}, { addresses, { array, Addresses }}]; + +%% +%% Resolve remote service +%% +handle_rpc("resolve_remote_service", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + + case gen_server:call(?SERVER, { rvi_call, resolve_remote_service, + [Service]}) of + [ok, Addresses ] -> + [ {status, ok}, { addresses, { array, Addresses }}]; + + [ Other ] -> [ {status, Other} ] + end; + + + +%% +%% Resolve local service +%% +handle_rpc("resolve_local_service", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + + case gen_server:call(?SERVER, { rvi_call, resolve_local_service, + [Service]}) of + [ok, Addresses ] -> + [ {status, ok}, { addresses, { array, Addresses }}]; + + [ Other ] -> [ {status, Other} ] + end; + + + + +%% +%% Get local services +%% +handle_rpc("get_local_services", _Args) -> + [ok, LocalServices ] = + gen_server:call(?SERVER, { rvi_call, get_local_services, []}), + [ {status, ok} , { services, { array, LocalServices }}]; + + + +%% +%% Get local network addresses +%% +handle_rpc("get_local_network_addresses", _Args) -> + [ok, LocalAddresses ] = + gen_server:call(?SERVER, { rvi_call, get_local_network_addresses, []}), + + [ {status, ok} , { network_addresses, { array, LocalAddresses }}]; + + + +%% +%% Handle the rest. +%% +handle_rpc( Other, _Args) -> + ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]), + [ { status, invalid_command } ]. + + +%% Handle calls received through regular gen_server calls, routed by +%% rvi_common:send_component_request() + +handle_call({rvi_call, register_local_service, [Service, Address] }, _From, State) -> + ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", + [Service, Address]), + + FullSvcName = rvi_common:local_service_to_string(Service), + + ets:insert(?LOCAL_SERVICE_TABLE, #service_entry { service = FullSvcName, - network_address = NetworkAddress + network_address = Address }), + {reply, [ ok, FullSvcName ], State }; + + +handle_call({rvi_call, register_remote_services, [Services, Address] }, _From, State) -> + case Services of + %% We have zero services associated with address. + %% Just register the address. + [] -> + ?info("service_discovery_rpc:register_remote_service_(): service(n/a) -> ~p", + [Address]), + + ets:insert(?REMOTE_ADDRESS_TABLE, + #service_entry { + service = "", + network_address = Address + }), + + dump_table(?REMOTE_ADDRESS_TABLE), + ok; + + %% Loop through the services and register them. + _ -> + lists:map(fun(Svc) -> + register_remote_service_(Svc, Address) + end, Services), + + %% Forward to scheduler now that we have updated our own state + schedule_rpc:register_remote_services(Address, Services), + + %% Forward to service edge so that it can inform its locally + %% connected services. + %% Build a list of all our local services' addresses to provide + %% to service edge so that it knows where to send the, + LocalSvcAddresses = + ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> + [ LocalAddress | Acc ] end, + [], ?LOCAL_SERVICE_TABLE), - {ok, [ {service, FullSvcName}, { status, rvi_common:json_rpc_status(ok)}]}. + %% Call service edge with local addresses (sorted and de-duped) and + %% the services to register. + service_edge_rpc:register_remote_services(Services, lists:usort(LocalSvcAddresses)) + end, + {reply, [ok], State }; -unregister_remote_services_by_address(NetworkAddress) -> +%% +%% Delete all services registered under the given address +%% +handle_call({rvi_call, unregister_remote_services_by_address, [Address]}, _From, State) -> - %% Delete all services registered under the given address. - Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, NetworkAddress), + %% Retrieve all services associated with the remote address + Svcs = ets:lookup(?REMOTE_ADDRESS_TABLE, Address), - %% We now have a bunch of service records, convert them to a list of service + %% We now have a list of service records, convert them to a list of service %% names and send them of to schedule for deregistration AllSvcNames = lists:foldr(fun(#service_entry { service = SvcName }, Acc) -> [SvcName | Acc] end, [], Svcs), ?info("service_discovery_rpc:unregister_remote_services_by_address(): ~p -> ~p", - [NetworkAddress, AllSvcNames]), + [Address, AllSvcNames]), %% We need to filter AllSvcNames to remove all service entries that have %% been registered under another name. %% We do this by creating a list of all matching entries associated - %% with a network address not matching the disconnected NetworkAddress + %% with a network address not matching the disconnected Address %% %% See issue https://github.com/PDXostc/rvi/issues/14 for details FilterSvc = lists:foldr( fun(Service, Acc) -> - %% Lookup the service in the service table. case ets:lookup(?REMOTE_SERVICE_TABLE, Service) of @@ -147,7 +329,7 @@ unregister_remote_services_by_address(NetworkAddress) -> %% We found or own entry, tiet to the disconnected address. %% Do not add to addresses to be removed. - [ #service_entry { network_address = NetworkAddress } ] -> + [ #service_entry { network_address = Address } ] -> Acc; %% We found an entry that does not the disconnected @@ -160,17 +342,9 @@ unregister_remote_services_by_address(NetworkAddress) -> SvcNames = AllSvcNames -- FilterSvc, - - case FilterSvc of - [] -> ok; - - _ -> - ?info("service_discovery_rpc:unregister_remote_services_by_address(): Resurrected services: ~p", - [FilterSvc]), - - ?info("service_discovery_rpc:unregister_remote_services_by_address(): Filtered services to be deleted: ~p", - [SvcNames]) - end, + ?info("service_discovery_rpc:unregister_remote_services_by_address(): " + "Resurrected services: ~p", + [FilterSvc]), %% Delete any addresses stored with an empty service name, @@ -179,23 +353,25 @@ unregister_remote_services_by_address(NetworkAddress) -> ets:match_delete(?REMOTE_ADDRESS_TABLE, #service_entry { service = [], - network_address = NetworkAddress + network_address = Address }), - ets:delete(?REMOTE_ADDRESS_TABLE, NetworkAddress), + ets:delete(?REMOTE_ADDRESS_TABLE, Address), + + %% Go through all service names and unregister them with schedulew + %% and service edge case SvcNames of + %% Nothing to do. [] -> true; _ -> + %% Tell scheduler to kill off services + schedule_rpc:unregister_remote_services(SvcNames), - rvi_common:send_component_request(schedule, unregister_remote_services, - [ - { services, SvcNames } - ]), - + %% Delete all services from remote service table. [ ets:delete(?REMOTE_SERVICE_TABLE, Svc#service_entry.service) || Svc <- Svcs ], - %% Forward to service edge so that it can inform its locally + %% Forward to service edge so that it can inform its locally %% connected services. %% Build a list of all our local services' addresses to provide %% to service edge so that it knows where to send the, @@ -204,104 +380,31 @@ unregister_remote_services_by_address(NetworkAddress) -> [ LocalAddress | Acc ] end, [], ?LOCAL_SERVICE_TABLE), - %% Call service edge with local addresses (sorted and de-duped) and - %% the services to register. - rvi_common:send_component_request(service_edge, unregister_remote_services, - [ - { local_service_addresses, lists:usort(LocalSvcAddresses)}, - { services, SvcNames} - ]) - end, - - {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. - + %% Call service edge with local addresses (sorted and + %% de-duped) and the services to register. + service_edge_rpc:unregister_remote_services(SvcNames, lists:usort(LocalSvcAddresses)) -unregister_single_remote_service_by_name_(Service) -> - ?info("service_discovery_rpc:unregister_remote_services_by_name(): ~p", - [Service]), - - - %% Delete any remote address table entries with a matching Service. - ets:match_delete(?REMOTE_ADDRESS_TABLE, - #service_entry { - service = Service, - network_address = '_' - }), - - ets:delete(?REMOTE_SERVICE_TABLE, Service), - After = ets:foldl(fun(#service_entry { service = Svc }, Acc) -> - [ Svc | Acc ] end, - [], ?REMOTE_SERVICE_TABLE), - - ?debug("AFter removing ~p: ~p", [ Service, After ]), - - %% Forward to service edge so that it can inform its locally - %% connected services. - %% Build a list of all our local services' addresses to provide - %% to service edge so that it knows where to send the, - LocalSvcAddresses = - ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> - [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - %% Call service edge with local addresses (sorted and de-duped) and - %% the services to register. - rvi_common:send_component_request(service_edge, unregister_remote_services, - [ - { local_service_addresses, lists:usort(LocalSvcAddresses)}, - { services, [Service]} - ]), - - ok. + end, + {reply, [ok], State }; +handle_call({rvi_call, unregister_remote_services_by_name, [Services]}, _From, State) -> + unregister_remote_services_by_name_(Services), + {reply, [ok], State }; -unregister_remote_services_by_name(Services) -> - [ unregister_single_remote_service_by_name_(Svc) || Svc <- Services], - {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. - -unregister_local_service(Service) -> +handle_call({rvi_call, unregister_local_service, [Service]}, _From, State) -> ?info("service_discovery_rpc:unregister_local_service(): ~p", [Service]), ets:delete(?LOCAL_SERVICE_TABLE, Service), - {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. - + {reply, [ok], State }; -register_local_service(NetworkAddress, Service) -> - ?info("service_discovery_rpc:register_local_service(): ~p -> ~p", [Service, NetworkAddress]), - - FullSvcName = rvi_common:local_service_to_string(Service), - - ets:insert(?LOCAL_SERVICE_TABLE, - #service_entry { - service = FullSvcName, - network_address = NetworkAddress - }), - FullSvcName. - - - -resolve_local_service(RawService) -> - Service = rvi_common:sanitize_service_string(RawService), - ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]), - ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]), - case resolve_service(?LOCAL_SERVICE_TABLE, Service) of - not_found -> - { ok, [ { status, rvi_common:json_rpc_status(not_found) }]}; - - {ok, NetworkAddress } -> - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { network_address, NetworkAddress }]} - end. - -resolve_remote_service(RawService) -> +handle_call({rvi_call, resolve_remote_service, [RawService]}, _From, State) -> Service = rvi_common:sanitize_service_string(RawService), ?debug("service_discovery_rpc:resolve_remote_service(): RawService: ~p", [RawService]), ?debug("service_discovery_rpc:resolve_remote_service(): Cleaned Service: ~p", [Service]), - case resolve_service(?REMOTE_SERVICE_TABLE, Service) of + case resolve_service_(?REMOTE_SERVICE_TABLE, Service) of {ok, NetworkAddress } -> - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { network_address, NetworkAddress }]}; + {reply, [ok, NetworkAddress ], State}; not_found -> ?debug("service_discovery_rpc:resolve_remote_service(~p): Service not found in ets. " @@ -315,82 +418,21 @@ resolve_remote_service(RawService) -> ?info("service_discovery_rpc:resolve_remote_service(~p): Service not found.", [Service]), - { ok, [ { status, rvi_common:json_rpc_status(not_found) }]}; + {reply, [not_found], State }; NetworkAddress -> %% Found - ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p", - [Service, NetworkAddress]), - - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { network_address, NetworkAddress }]} + ?debug("service_discovery_rpc:resolve_service(~p): Service is on static node ~p", + [Service, NetworkAddress]), + + {reply, [ok, NetworkAddress ], State} end - end. - - - - -register_remote_services(Address, Services) -> - %% Loop through the services and register them. - case Services of - [] -> register_remote_service(Address); - _ -> - lists:map(fun(Svc) -> register_remote_service(Svc, Address) end, Services), - - %% Forward to scheduler now that we have updated our own state - rvi_common:send_component_request(schedule, register_remote_services, - [ - {services, Services}, - { network_address, Address } - ]), + end; - %% Forward to service edge so that it can inform its locally - %% connected services. - %% Build a list of all our local services' addresses to provide - %% to service edge so that it knows where to send the, - LocalSvcAddresses = - ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> - [ LocalAddress | Acc ] end, - [], ?LOCAL_SERVICE_TABLE), - - %% Call service edge with local addresses (sorted and de-duped) and - %% the services to register. - rvi_common:send_component_request(service_edge, register_remote_services, - [ - { local_service_addresses, lists:usort(LocalSvcAddresses)}, - { services, Services} - ]) - end, - - {ok, [ { status, rvi_common:json_rpc_status(ok) } ]}. - -resolve_service(Table, Service) -> - case ets:lookup(Table, Service) of - %% We found a service entry, report it back - [#service_entry { network_address = NetworkAddress }] -> - ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> ~p", - [ Table, Service, NetworkAddress ]), - - {ok, NetworkAddress }; - - %% We did not find a service entry, check statically configured nodes. - [] -> - ?debug("service_discovery_rpc:resolve_service(~p): service: ~p -> Not Found", - [ Table, Service ]), - not_found - end. - - - -get_services(Table) -> - Services = ets:foldl(fun(#service_entry {service = ServiceName, - network_address = ServiceAddr}, Acc) -> - [ {ServiceName, ServiceAddr } | Acc ] end, - [], Table), - - ?debug("service_discovery_rpc:get_services(): ~p", [ Services ]), - Services. +handle_call({rvi_call, get_remote_services, _Args}, _From, State) -> + Services = get_services_(?REMOTE_SERVICE_TABLE), + {reply, [ok, Services ], State }; -get_all_services() -> +handle_call({rvi_call, get_all_services, _Args}, _From, State) -> RemoteSvc = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> [ ServiceName | Acc ] end, [], ?REMOTE_SERVICE_TABLE), @@ -400,233 +442,207 @@ get_all_services() -> [], ?LOCAL_SERVICE_TABLE), Services = RemoteSvc++LocalSvc, - ?debug("service_discovery_rpc:get_all_services(): ~p", [ Services]), - Services. - - -get_json_services(Table) -> - Services = ets:foldl(fun(#service_entry {service = ServiceName, - network_address = ServiceAddr}, Acc) -> - [ {struct, - [ - {service, ServiceName}, - {address, ServiceAddr} - ] - } | Acc ] end, - [], Table), - ?debug("service_discovery_rpc:get_services(): ~p", [ Services]), - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { services, {array, Services }}]}. + {reply, [ok, Services], State }; -get_network_addresses_(Table) -> - AddrList = ets:foldl(fun(#service_entry {network_address = NetworkAddr}, Acc) - when NetworkAddr =:= unavailable -> - Acc; %% Don't report if service is not active - - %% We have an active network address - (#service_entry {network_address = NetworkAddr}, Acc) -> - %% Avoid duplicates - case lists:keyfind(NetworkAddr, 1, Acc) of - false ->[ NetworkAddr | Acc ]; - _ -> Acc - end - end, [], Table), - - - %% Return a dup-scrubbed list. - Addresses = sets:to_list(sets:from_list(AddrList)), - ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]), - Addresses. - - -get_json_network_addresses(Table) -> - Addresses = get_network_addresses_(Table), - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { addresses, {array, Addresses }}]}. - -get_network_addresses(Table) -> - Addresses = get_network_addresses_(Table), - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { addresses, Addresses }]}. +handle_call({rvi_call, get_remote_network_addresses, _Args}, _From, State) -> + Addresses = get_network_addresses_(?REMOTE_ADDRESS_TABLE), + {reply, [ ok, Addresses ], State }; -%% JSON-RPC entry point -%% Called by local exo http server +handle_call({rvi_call, resolve_local_service, [RawService]}, _From, State) -> + Service = rvi_common:sanitize_service_string(RawService), + ?debug("service_discovery_rpc:resolve_local_service(): RawService: ~p", [RawService]), + ?debug("service_discovery_rpc:resolve_local_service(): Cleaned Service: ~p", [Service]), -%% Register local services + case resolve_service_(?LOCAL_SERVICE_TABLE, Service) of + not_found -> + { reply, [not_found], State }; + + {ok, NetworkAddress } -> + { reply, [ok, NetworkAddress], State} + end; -handle_rpc("register_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, Address} = rvi_common:get_json_element(["network_address"], Args), - FullSvcName = register_local_service(Address, Service), - {ok, [ { service, FullSvcName }, - { status, rvi_common:json_rpc_status(ok) } - ] - }; +handle_call({rvi_call, get_local_services, _Args}, _From, State) -> + Services = get_services_(?LOCAL_SERVICE_TABLE), + {reply, [ok, Services ], State }; +handle_call({rvi_call, get_local_network_addresses, _Args}, _From, State) -> + Addresses = get_network_addresses_(?LOCAL_SERVICE_TABLE), + {reply, [ ok, Addresses ], State }; -%% Register remote services +handle_call(Other, _From, State) -> + ?warning("service_discovery_rpc:handle_call(~p): unknown", [ Other ]), + { reply, [unknown_command] , State}. -handle_rpc("register_remote_services", Args) -> - {ok, Services} = rvi_common:get_json_element(["services"], Args), - {ok, Address} = rvi_common:get_json_element(["network_address"], Args), - register_remote_services(Address, Services); +handle_cast(_Msg, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. -handle_rpc("unregister_remote_services_by_address", Args) -> - {ok, Address} = rvi_common:get_json_element(["network_address"], Args), +terminate(_Reason, _State) -> + ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. - %% Loop through the services and de-register them. - unregister_remote_services_by_address(Address); -handle_rpc("unregister_remote_services_by_name", Args) -> - {ok, Services} = rvi_common:get_json_element(["services"], Args), +%% +%% INTERNAL SUPPORT FUNCTIONS +%% +dump_table(_Table, '$end_of_table') -> + true; - %% Loop through the services and de-register them. - unregister_remote_services_by_name(Services); +dump_table(Table, Key) -> + Val = ets:lookup(Table, Key), + ?info("Table: ~p(~p) - ~p", [ Table, Key, Val ]), + dump_table(Table, ets:next(Table, Key)). -handle_rpc("unregister_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - %% De-register service - unregister_local_service(Service); +dump_table(Table) -> + dump_table(Table, ets:first(Table)). -%% -%% Resolve remote service -%% -handle_rpc("resolve_remote_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - resolve_remote_service(Service); +register_remote_service_(Service, NetworkAddress) -> + ?info("service_discovery_rpc:register_remote_service_(): service(~p) -> ~p", + [Service, NetworkAddress]), + FullSvcName = rvi_common:remote_service_to_string(Service), -%% -%% Get remote services -%% -handle_rpc("get_remote_services", _Args) -> - get_json_services(?REMOTE_SERVICE_TABLE); + ets:insert(?REMOTE_SERVICE_TABLE, + #service_entry { + service = FullSvcName, + network_address = NetworkAddress + }), -%% -%% Get all services -%% -handle_rpc("get_all_services", _Args) -> - Services = get_all_services(), - {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { services, {array, Services }}]}; + %% Delete any addresses stored with an empty service name, + %% installed with register_remote_service/1, since we now have at + %% least one service name. + ets:match_delete(?REMOTE_ADDRESS_TABLE, + #service_entry { + service = [], + network_address = NetworkAddress + }), + %% Delete any previous instances of the given entry, in case + %% the service registers multiple times + ets:match_delete(?REMOTE_ADDRESS_TABLE, + #service_entry { + service = FullSvcName, + network_address = NetworkAddress + }), -%% -%% Get remote network addresses -%% -handle_rpc("get_remote_network_addresses", _Args) -> - get_json_network_addresses(?REMOTE_ADDRESS_TABLE); + ets:insert(?REMOTE_ADDRESS_TABLE, + #service_entry { + service = FullSvcName, + network_address = NetworkAddress + }), + {ok, FullSvcName}. -%% -%% Resolve local service -%% -handle_rpc("resolve_local_service", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - resolve_local_service(Service); -%% -%% Get local services -%% -handle_rpc("get_local_services", _Args) -> - get_json_services(?LOCAL_SERVICE_TABLE); +unregister_single_remote_service_by_name_(Service) -> + ?info("service_discovery_rpc:unregister_remote_services_by_name(): ~p", + [Service]), -%% -%% Get local network addresses -%% -handle_rpc("get_local_network_addresses", _Args) -> - get_json_network_addresses(?LOCAL_SERVICE_TABLE); + %% Delete any remote address table entries with a matching Service. + ets:match_delete(?REMOTE_ADDRESS_TABLE, + #service_entry { + service = Service, + network_address = '_' + }), + ets:delete(?REMOTE_SERVICE_TABLE, Service), + After = ets:foldl(fun(#service_entry { service = Svc }, Acc) -> + [ Svc | Acc ] end, + [], ?REMOTE_SERVICE_TABLE), -%% -%% Handle the rest. -%% -handle_rpc( Other, _Args) -> - ?info("service_discovery_rpc:handle_rpc(~p): unknown", [ Other ]), - { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. + ?debug("Ater removing ~p: ~p", [ Service, After ]), + + %% Forward to service edge so that it can inform its locally + %% connected services. + %% Build a list of all our local services' addresses to provide + %% to service edge so that it knows where to send the, + LocalSvcAddresses = + ets:foldl(fun(#service_entry { network_address = LocalAddress }, Acc) -> + [ LocalAddress | Acc ] end, + [], ?LOCAL_SERVICE_TABLE), + + %% Call service edge with local addresses (sorted and de-duped) and + %% the services to register. + service_edge_rpc:unregister_remote_services([Service], lists:usort(LocalSvcAddresses)), -%% Handle calls received through regular gen_server calls, routed byh -%% rvi_common:send_component_request() + ok. -handle_call({rvi_call, register_local_service, Args}, _From, State) -> - {_, Service} = lists:keyfind(service, 1, Args), - {_, Address} = lists:keyfind(network_address, 1, Args), - FullSvcName = register_local_service(Address, Service), - {reply, {ok, [ { service, FullSvcName }, - { status, rvi_common:json_rpc_status(ok) } - ] - }, State }; +%% Loop through multiple services and remove them one by one +unregister_remote_services_by_name_(Services) -> + [ unregister_single_remote_service_by_name_(Svc) || Svc <- Services], + ok. -handle_call({rvi_call, register_remote_services, Args}, _From, State) -> - {_, Services} = lists:keyfind(services, 1, Args), - {_, Address} = lists:keyfind(network_address, 1, Args), - {reply, register_remote_services(Address, Services), State }; -handle_call({rvi_call, unregister_remote_services_by_address, Args}, _From, State) -> - {_, Address} = lists:keyfind(network_address, 1, Args), - {reply, unregister_remote_services_by_address(Address), State }; +get_services_(Table) -> + Services = ets:foldl(fun(#service_entry {service = ServiceName, + network_address = ServiceAddr}, Acc) -> + [ { ServiceName, ServiceAddr } | Acc ] end, + [], Table), -handle_call({rvi_call, unregister_remote_services_by_name, Args}, _From, State) -> - {_, Services} = lists:keyfind(services, 1, Args), - {reply, unregister_remote_services_by_name(Services), State }; + ?debug("service_discovery_rpc:get_services_(): ~p", [ Services ]), + Services. -handle_call({rvi_call, unregister_local_service, Args}, _From, State) -> - {_, Service} = lists:keyfind(service, 1, Args), - {reply, unregister_local_service(Service), State }; -handle_call({rvi_call, resolve_remote_service, Args}, _From, State) -> - {_, Service} = lists:keyfind(service, 1, Args), - {reply, resolve_remote_service(Service), State }; +get_network_addresses_(Table) -> + AddrList = ets:foldl(fun(#service_entry {network_address = NetworkAddr}, Acc) + when NetworkAddr =:= unavailable -> + Acc; %% Don't report if service is not active -handle_call({rvi_call, get_remote_services, _Args}, _From, State) -> - Services = get_services(?REMOTE_SERVICE_TABLE), - {reply, {ok, - [ { status, rvi_common:json_rpc_status(ok) }, - { services, Services }]}, State }; + %% We have an active network address + (#service_entry {network_address = NetworkAddr}, Acc) -> + %% Avoid duplicates + case lists:keyfind(NetworkAddr, 1, Acc) of + false ->[ NetworkAddr | Acc ]; + _ -> Acc + end + end, [], Table), -handle_call({rvi_call, get_all_services, _Args}, _From, State) -> - Services = get_all_services(), - {reply, {ok, - [ { status, rvi_common:json_rpc_status(ok) }, - { services, Services }]}, State }; -handle_call({rvi_call, get_remote_network_addresses, _Args}, _From, State) -> - {reply, get_network_addresses(?REMOTE_ADDRESS_TABLE), State }; + %% Return a dup-scrubbed list. + Addresses = sets:to_list(sets:from_list(AddrList)), + ?debug("service_discovery_rpc:get_network_addresses(~p): ~p", [ Table, Addresses ]), + Addresses. -handle_call({rvi_call, resolve_local_service, Args}, _From, State) -> - {_, Service} = lists:keyfind(service, 1, Args), - {reply, resolve_local_service(Service), State }; -handle_call({rvi_call, get_local_services, _Args}, _From, State) -> - Services = get_services(?LOCAL_SERVICE_TABLE), - {reply, {ok, - [ { status, rvi_common:json_rpc_status(ok) }, - { services, Services }]}, State }; +resolve_service_(Table, Service) -> + case ets:lookup(Table, Service) of + %% We found a service entry, report it back + [#service_entry { network_address = NetworkAddress }] -> + ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> ~p", + [ Table, Service, NetworkAddress ]), -handle_call({rvi_call, get_local_network_addresses, _Args}, _From, State) -> - {reply, get_network_addresses(?LOCAL_SERVICE_TABLE), State }; + {ok, NetworkAddress }; -handle_call(Other, _From, State) -> - ?warning("service_discovery_rpc:handle_call(~p): unknown", [ Other ]), - { reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, State}. + %% We did not find a service entry, check statically configured nodes. + [] -> + ?debug("service_discovery_rpc:resolve_service_(~p): service: ~p -> Not Found", + [ Table, Service ]), + not_found + end. -handle_cast(_Msg, State) -> - {noreply, State}. -handle_info(_Info, State) -> - {noreply, State}. -terminate(_Reason, _State) -> - ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%% Convert services returned by get_services_ to JSON format +convert_json_services(Services) -> + JSONServices = lists:foldl(fun({ ServiceName, ServiceAddr}, Acc) -> + [ {struct, + [ + {service, ServiceName }, + {address, ServiceAddr } + ] + } | Acc ] end, + [], Services), + ?debug("service_discovery_rpc:get_services_(): ~p", [ Services]), + JSONServices. |