diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-04-07 16:20:21 -0700 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-04-07 16:20:21 -0700 |
commit | 9b605e34f48501fbc5b05cc0eb570d32860ce911 (patch) | |
tree | 018cd8a237b63e32651892f9ba30571c216e2ecf /components/schedule | |
parent | 261b2e3303a1c955a1d7b397b3e3741adeae5092 (diff) | |
download | rvi_core-9b605e34f48501fbc5b05cc0eb570d32860ce911.tar.gz |
Temporary, non buildable
Diffstat (limited to 'components/schedule')
-rw-r--r-- | components/schedule/src/rvi_routing.erl | 208 | ||||
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 446 | ||||
-rw-r--r-- | components/schedule/src/schedule_sup.erl | 1 |
3 files changed, 498 insertions, 157 deletions
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl new file mode 100644 index 0000000..d5b0548 --- /dev/null +++ b/components/schedule/src/rvi_routing.erl @@ -0,0 +1,208 @@ +%% +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% +-module(rvi_routing). + +-behaviour(gen_server). + +%% API +-export([get_service_routes/1]). +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(ROUTING_RULES, routing_rules). + +-record(route, { + service_prefix = ""::string(), + proto_link_pairs = []:: list(tuple()) + }). + +-record(st, { + routes= []:: list(#route{}) + }). + +%%%=================================================================== +%%% API +%%%=================================================================== + +get_service_routes(Service) -> + gen_server:call(?SERVER, { rvi_get_service_routes, Service }). + + + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, St} | +%% {ok, St, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- + +init([]) -> + {ok, Routes } = application:get_env(rvi, ?ROUTING_RULES), + + {ok, #st { + routes = Routes + }}. + + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, St) -> +%% {reply, Reply, St} | +%% {reply, Reply, St, Timeout} | +%% {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, Reply, St} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- +handle_call( { rvi_get_service_routes, Service }, _From, St) -> + {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t}; + +handle_call(_Request, _From, St) -> + Reply = ok, + {reply, Reply, St}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, St) -> {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, St) -> + {noreply, St}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, St) -> {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, St) -> + {noreply, St}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, St) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _St) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process st when code is changed +%% +%% @spec code_change(OldVsn, St, Extra) -> {ok, NewSt} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + + +prefix_match_(_, [], Len) -> + Len; + +prefix_match_([], _, Len ) -> + Len; + +prefix_match_([ C1 | T1], [ C2 | T2 ], Len) when C1 =:= C2 -> + prefix_match_(T1, T2, Len +1); + +prefix_match_(_,_, Len) -> + Len. + +find_routes_([], _Service, CurRoutes, CurMatchLen ) -> + { CurRoutes, CurMatchLen }; + +find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) -> + MatchLen = prefix_match_(Service, ServicePrefix, 0), + + %% Do we have a better match than previosly recorded? + case MatchLen >= CurMatchLen of + true -> + %% Continue with the new routes and matching len installed + find_routes_(T, Service, Routes, MatchLen); + + false -> + %% Continue with the old routes and matching len installed + find_routes_(T, Service, CurRoutes, CurMatchLen) + end. + +find_routes_(Routes, Service) -> + case find_routes_(Routes, Service, undefined, 0) of + { undefined, 0 } -> + not_found; + + { Routes, _MatchLen } -> + Routes + end. + + +normalize_routes_({ServicePrefix, []}, Acc) -> + { ServicePrefix, lists:reverse(Acc) }; + +normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, { DL, DLOp }} | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, DLOp } } | Acc]); + +normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, DLOp } } | Acc]); + +normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]); + +normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]). + diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 64b57cb..310823c 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -6,9 +6,6 @@ %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% - - - -module(schedule_rpc). -behaviour(gen_server). -behaviour(rvi_schedule). @@ -21,6 +18,11 @@ register_remote_services/3, unregister_remote_services/2]). +%% Invoked by service discovery +%% FIXME: Should be rvi_service_discovery behavior +-export([service_available/3, + service_unavailable/3] + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -40,7 +42,10 @@ -record(service, { name = "", %% Fully qualified service name. %% Set to { IP, Port} when service is available, else unknown_network_address. - network_address = unknown_network_address, + address = unknown, + + %% Targeted data link module + data_link_module = unknown %% Table containing #message records, %% indexed by their transaction ID (and sequence of delivery) @@ -58,8 +63,11 @@ -record(message, { transaction_id, %% Transaction ID that message is tagged with. timeout, %% Timeout, UTC - timeout_tref, %% Reference to erlang timer associated with this message. - timeout_cb, %% Callback to invoke when timeout occurs. + data_link, %% Data Link Module to use. { Module, Opts} + protocol, %% Protocol to use. { Module Opts } + routes, %% Routes retrieved for this + timeout_tref, %% Reference to erlang timer associated with this message. + timeout_cb, %% Callback to invoke when timeout occurs. parameters, signature, certificate @@ -146,10 +154,26 @@ unregister_remote_services(CompSpec, ServiceNames) -> CompSpec). +service_available(Service, DataLinkModule, Address) -> + rvi_common:notification(schedule, ?MODULE, + service_available, + [{ service, Service }, + { data_link_module, DataLinkModule }, + { address, Address }], + CompSpec). + +service_unavailable(Service, DataLinkModule, Address) -> + rvi_common:notification(schedule, ?MODULE, + service_unavailable, + [{ service, Service }, + { data_link_module, DataLinkModule }, + { address, Address }], + CompSpec). %% JSON-RPC entry point %% CAlled by local exo http server handle_rpc("schedule_message", Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), @@ -194,11 +218,36 @@ handle_notification("register_remote_services", Args) -> handle_notification("unregister_remote_services", Args) -> {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args), - ?debug("schedule_notification:unregister_remote_services(): services ~p", [ DiscountinuedServices]), + ?debug("schedule_notification:unregister_remote_services(): services ~p", + [ DiscountinuedServices]), + gen_server:cast(?SERVER, { rvi, unregister_remote_services, [ DiscountinuedServices ]}), ok; +handle_notification("service_available", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + {ok, Address} = rvi_common:get_json_element(["address"], Args), + + gen_server:cast(?SERVER, { rvi, service_available, + [ Service, + DataLinkModule, + Address ]}), + + ok; +handle_notification("service_unavailable", Args) -> + {ok, Service} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + {ok, Address} = rvi_common:get_json_element(["address"], Args), + + gen_server:cast(?SERVER, { rvi, service_unavailable, + [ Service, + DataLinkModule, + Address ]}), + + ok; + handle_notification(Other, _Args) -> ?debug("schedule_notification:handle_other(~p): unknown", [ Other ]), ok. @@ -217,16 +266,20 @@ handle_call( { rvi, schedule_message, ?debug("schedule:sched_msg(): certificate ~p", [Certificate]), ?debug("schedule:sched_msg(): St: ~p", [St]), + %% + %% Retrieve the routes that we should try for this message + %% + case rvi_routing:get_service_routing(SvcName) of + not_found -> + {reply, [ no_route ], St }; - { ok, TransID, NSt } = queue_message(SvcName, - Timeout, - Parameters, - Signature, - Certificate, - St), + Routes -> + { NewTransID, NSt1} = create_transaction_id(St), + NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature); + + { reply, [ok, TransID], NS2 } + end; - { reply, [ok, TransID], NSt }; - handle_call(Other, _From, St) -> ?warning("schedule:handle_call(~p): unknown", [ Other ]), @@ -250,7 +303,7 @@ handle_cast( {rvi, register_remote_services, ?info("schedule:register_remote_services(): services(~p) -> ~p", [Services, NetworkAddress]), - {ok, NSt} = multiple_services_available(Services, NetworkAddress, St), + {ok, NSt} = multiple_services_available(Services, NetworkAddress, St), {noreply, NSt}; @@ -260,6 +313,57 @@ handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) -> {ok, NSt} = multiple_services_unavailable(ServiceNames, St), {noreply, NSt }; +handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) -> + %% Find the service + case ets:lookup(SvcTid, {Service, DataLinkModule}) of + [] -> %% No service found - Just unsubscribe. Shouldn't really happen + service_discovery_rpc: + unsubscribe_to_service_availability({Service, DataLinkModule}, + ?MODULE); + + { noreply, St }; + + [ Svc ] -> + %% Update the network address, if it differs, and return + %% the new service / State as {ok, NSvc, false, NSt} + ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.", + [ DataLinkModule, Service, Address ]), + update_service_network_address(Svc, Address, St), + { _, NSt2 } = try_sending_messages(Svc, NSt1), + { noreply, NSt2} + end. + +handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]}, + #st { services_tid = SvcTid } = St) -> + + %% Find the service + case ets:lookup(SvcTid, {Service, DataLinkModule}) of + [] -> %% No service found - no op. + {noreply, St}; + + [ Svc ] -> + %% Delete service if it is unused. + case delete_unused_service(SvcTid, Svc) of + true -> %% service was deleted + + %% Unsubscribe from service availablility notifications + service_discovery_rpc: + unsubscribe_to_service_availability({ Service, DataLinkModule}, + ?MODULE); + + { noreply, St}; + + false -> %% Service was not deleted, update its network address to unknown + { _, _, _, NSt} = + update_service_network_address(Svc, unknown_network_address, St), + + { noreply, NSt } + end + + end. + { noreply, St}; + + handle_cast(Other, St) -> ?warning("schedule:handle_cast(~p): unknown", [ Other ]), {noreply, St}. @@ -276,19 +380,20 @@ handle_cast(Other, St) -> %%-------------------------------------------------------------------- %% Handle timeouts -handle_info({ rvi_message_timeout, SvcName, TransID}, +handle_info({ rvi_message_timeout, SvcName, DLMod, TransID}, #st { cs = CompSpec, services_tid = SvcTid } = St) -> - case ets:lookup(SvcTid, SvcName) of + case ets:lookup(SvcTid, {SvcName, DLMod}) of [ Svc ] -> %% Delete from ets. case ets:lookup(Svc#service.messages_tid, TransID) of [ Msg ] -> ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), - do_timeout_callback(CompSpec, Svc, Msg), - ets:delete(Svc#service.messages_tid, TransID); + ets:delete(Svc#service.messages_tid, TransID), + queue_message( _ -> - ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), + ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", + [ TransID, SvcName]), ok end; _-> ok @@ -329,70 +434,149 @@ code_change(_OldVsn, St, _Extra) -> %%%=================================================================== -%% Queue a message for transmission once a service become available. -%% If the service does not exist, it will be created. -%% Once the message is queued, an attempt will be made to send all -%% messages queued in the given service. This attempt will fail -%% pretty quickly if the service's network address is set to -%% 'unavailable' + +%% No more routes to try +queue_message(_SvcName, + _TransID, + _MessageTimeout + [ ], + _Parameters, + _Signature, + _Certificate, St) -> + + %% FIXME: Handle route failure + { route_failed, St }; + + queue_message(SvcName, - Timeout, + TransID, + [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ], + MessageTimeout, Parameters, Signature, - Certificate, St) -> + Certificate, + St) -> %% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), %% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), %% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), %% ?info("schedule:sched_msg(): signature: ~p", [Signature]), %% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), - %% Create a new transaction ID. - {NewTransID, NSt1} = create_transaction_id(St), - - %% Create a timer to handle message timeout - TRef = erlang:send_after(calculate_timeout_period(Timeout), self(), - { rvi_message_timeout, SvcName, NewTransID }), - - %% Build the message record. - Msg = #message { - transaction_id = NewTransID, - timeout = Timeout, - timeout_tref = TRef, - parameters = Parameters, - signature = Signature, - certificate = Certificate - }, - - %% Find or create the service - - %% If this is a message to a (yet) unknown service, we will create - %% it to act as a placeholder for its messages until they time out - %% or the service becomes available. - %% - %% If the service does exist, we queue it and try to send it. - %% - {ok, Service, IsNewService, NSt2 } = find_or_create_service(SvcName, NSt1), + %% Calculate how many msec we have until message times out + + RelativeTimeout = calc_relative_tout(MessageTimeout), - %% If the service was created by the call above, - %% We will try to bring up a data link to it. - case IsNewService of + %% Did we run out of time for the message? + case RelativeTimeout > 1 of + %% Nope true -> - bring_up_data_link(St#st.cs, SvcName); + {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St), + + Msg = #message { + service = SvcName, + transaction_id = TransID, + data_link = { DLMod, DLOp }, + routes = RemaingRoutes, + message_timeout = MessageTimeout, + route_timeout_ref = TRef, + parameters = Parameters, + signature = Signature, + certificate = Certificate + }, + + + %% Add to ets table + TransID = ets:insert(Service#service.messages_tid, Msg), + + %% If this is a new service, subscribe to updates on this service's availablity. + %% If service is already available, we will get service_available/3 notification + %% immediately. + case IsNewService of + true -> + service_discovery_rpc: + subscribe_to_service_availability({SvcName, DLMod}, + ?MODULE); + false -> + ok + end, + + + %% + %% Bring up the relevant data link for the given route. + %% Once up, the data link will invoke service_availble() + %% to indicate that the service is available for the given DL. + %% + case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of + { ok, DLTimeout } -> + %% Setup a timeout to be + TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout), + self(), + { rvi_message_timeout, SvcName, DLMod, TransID }); + + %% We failed with this route. Try the next one + { error, _Reason} -> + queue_message(SvcName, + TransID, + RelativeTimeout, + RemainingRoutes, + Parameters, + Signature, + Certificate, + St) + end; + %% We are out of time false -> - ok - end, + do_timeout_callback(CompSpec, Svc, Msg), + { timeout, St} + end. - %% Add to ets table - ets:insert(Service#service.messages_tid, Msg), - %% Attempt to send the message - {_, NSt3 } = try_sending_messages(Service, NSt2), - - %% Return - { ok, NewTransID, NSt3}. +forward_to_protocol(Svc, Msg, St) -> + { DataLinkMod, DataLinkOpts } = Msg#message.data_link, + { ProtoMod, ProtoOpts } = Msg#messge.protocol, + + case ProtoMod:send_message( + St#st.cs, + SvcName, + Msg#message.timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + NetworkAddress, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate) of + + %% Success + [ok] -> + %% Send the rest. + try_sending_messages(Service, St); + + %% Failed + [Err] -> + ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p", + [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]), + + %% Requeue this message with the next route + { _, St1} = queue_message(SvcName, + TransID, + Msg#message.timeout, + Msg#message.routes, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate, + St), + + %% Send the rest of the messgages targeting + %% the same service through the same data link + try_sending_messages(Svc, St) + end. + + + %% Check if we can send messages queued up under the given service. try_sending_messages(#service { name = SvcName, @@ -401,51 +585,35 @@ try_sending_messages(#service { ?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]), {not_available, St}; - try_sending_messages(#service { name = SvcName, + data_link_module = DataLinkModule, network_address = NetworkAddress, messages_tid = Tid } = Service, St) -> ?debug("schedule:try_send(): SvcName: ~p", [SvcName]), ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]), - + %% Extract the first message of the queue. case ets:first(Tid) of %% No more messages to send. '$end_of_table' -> ?debug("schedule:try_send(): Nothing to send"), - { ok, St}; + { ok, St }; Key -> + St1 = forward_to_protocol ?debug("schedule:try_send(): Sending: ~p", [Key]), %% Extract first message and try to send it case ets:lookup(Tid, Key) of [ Msg ] -> - case protocol_rpc:send_message( - St#st.cs, - SvcName , - Msg#message.timeout, - NetworkAddress, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate) of - - [ok] -> - %% Send successful - delete entry. - ets:delete(Tid, Key), - - %% Delete timeout ref - erlang:cancel_timer(Msg#message.timeout_tref), - - %% Send the rest. - try_sending_messages(Service, St); - - [Err] -> - ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]), - %% Failed to send message, leave in queue and err out. - { {send_failed, Err}, St} - end; + %% Wipe from ets table and cancel timer + ets:delete(Tid, Key), + erlang:cancel_timer(Msg#message.timeout_tref), + + %% Forward to protocol and resend + forward_to_protocol(Service, Msg, St); + _ -> ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), { ok, St} @@ -454,60 +622,6 @@ try_sending_messages(#service { end. -%% Mark a service as available, as reported by data_link_up -%% If the service does not exist, it will be created as a host -%% for future messages destined to that service. -%% -%% If the service does exist, its network address will be upadted -%% with whatever address was reported to this function. -%% -%% Once the service has been created or updated, we will attempt -%% to send any messages queued inside it. -%% -service_available(SvcName, NetworkAddress, St) -> - ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]), - - %% Find or create the service. - {ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St), - - try_sending_messages(Svc, NSt1). - -service_unavailable(SvcName, #st { services_tid = SvcTid } = St) -> - ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]), - ets:delete(SvcTid, SvcName), - { ok, St }. - - -bring_up_data_link(CompSpec, SvcName) -> - %% Resolve the service to a network address that we can - %% use to bring up the data link - case service_discovery_rpc:resolve_remote_service(CompSpec, SvcName) of - - [ ok, Address] -> - %% Tell data link to bring up a communicationc hannel. - case data_link_bert_rpc_rpc:setup_data_link(CompSpec, Address) of - [ ok ] -> - ok; - [ already_connected ] -> - already_connected; - Que -> - ?info("schedule:bring_up_data_link() Failed:~p.", [Que]), - ok - end; - - [ not_found ] -> - ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p." - " Service not found.", - [ SvcName ]), - not_found; - - Err -> - ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p", - [ SvcName, Err ]), - err - end. - - %% %% data_link_up has reported that multiple services are now %% available at NetworkAddress. @@ -519,7 +633,7 @@ multiple_services_available([], _NetworkAddress, St) -> {ok, St}; multiple_services_available([ Svc | T], NetworkAddress, St) -> - {ok, NSt} = service_available(Svc, NetworkAddress, St), + {ok, NSt} = service_available(Svc, NetworkAddress, St), multiple_services_available(T, NetworkAddress, NSt). multiple_services_unavailable([], St) -> @@ -555,7 +669,6 @@ find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid update_service_network_address(Svc2, NetworkAddress, St) end. - %% %% Catch where no network address update is necessary. %% @@ -585,6 +698,7 @@ update_service_network_address(#service {} = Service, NetworkAddress, St) -> create_service(ServiceName, retain_existing_address, St) -> create_service(ServiceName, unknown_network_address, St); + %% Create a new service and return the new state (not currently modified) %% and the newly initialized service revord. %% @@ -615,20 +729,21 @@ create_transaction_id(St) -> %% schedulers? { ID, St#st { next_transaction_id = ID + 1 }}. -%% Calculate a relative timeout based on the UTC TS we are provided with. -calculate_timeout_period(UTC) -> +%% Calculate a relative timeout based on the UnixTime TS we are provided with. +calc_relative_tout(UnixTime) -> { Mega, Sec, _Micro } = now(), Now = Mega * 1000000 + Sec, - ?debug("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p", [ UTC, Now, UTC - Now ]), + ?debug("schedule:calc_relative_tout(): Timeout(~p) - Now(~p) = ~p", [ UnixTime, Now, UnixTime - Now ]), %% Cap the timeout value at something reasonable TOut = - case UTC - Now >= 4294967295 of + case UnixTime - Now >= 4294967295 of true -> - ?info("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p: Truncated to 4294967295", [ UTC, Now, UTC - Now ]), + ?info("schedule:calc_relative_tout(): Timeout(~p) - Now(~p) = ~p: " + "Truncated to 4294967295", [ UnixTime, Now, UnixTime - Now ]), 4294967295; - false -> UTC - Now + false -> UnixTime - Now end, case TOut =< 0 of @@ -651,3 +766,20 @@ do_timeout_callback(CompSpec, Service, do_timeout_callback(_,_,_) -> ok. +%% Kill off a service that is no longer used. +%% +delete_unused_service(SvcTid, Svc) -> + %% Do we have messages waiting for this service? + case ets:first(Svc#service.messages_tid) of + %% Nope. + '$end_of_table' -> + ets:delete(Svc#messages_tid), + ets:delete(SvcTid, { Service, DataLinkModule}) + %% Update the network address, if it differs, and return + %% the new service / State as {ok, NSvc, false, NSt} + ?debug("schedule:service_unavailable(): Service ~p:~p now has no address.", + [ DataLinkModule, Service ]), + true; + + _ -> false + end. diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl index 238d860..8589df0 100644 --- a/components/schedule/src/schedule_sup.erl +++ b/components/schedule/src/schedule_sup.erl @@ -35,5 +35,6 @@ init([]) -> {ok, { {one_for_one, 5, 10}, [ ?CHILD(schedule_rpc, worker) + ?CHILD(router, worker) ]} }. |