diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-04-10 18:57:50 -0700 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-04-10 18:57:50 -0700 |
commit | ffe60f97c1ec46aae1b62f0867b755095cf659ec (patch) | |
tree | f7cd64100cbd6082a4c51f3dcad8394cf4ad6657 /components/schedule | |
parent | f3f5e6b2929c53f70d2efca7d03b7e58931a21f3 (diff) | |
download | rvi_core-ffe60f97c1ec46aae1b62f0867b755095cf659ec.tar.gz |
Temp
Diffstat (limited to 'components/schedule')
-rw-r--r-- | components/schedule/src/rvi_routing.erl | 17 | ||||
-rw-r--r-- | components/schedule/src/rvi_schedule.erl | 1 | ||||
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 623 | ||||
-rw-r--r-- | components/schedule/src/schedule_sup.erl | 4 |
4 files changed, 280 insertions, 365 deletions
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl index d5b0548..0e6dfbe 100644 --- a/components/schedule/src/rvi_routing.erl +++ b/components/schedule/src/rvi_routing.erl @@ -9,6 +9,9 @@ -behaviour(gen_server). + +-include_lib("lager/include/log.hrl"). + %% API -export([get_service_routes/1]). -export([start_link/0]). @@ -88,7 +91,7 @@ init([]) -> %% @end %%-------------------------------------------------------------------- handle_call( { rvi_get_service_routes, Service }, _From, St) -> - {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t}; + {reply, normalize_routes_(find_routes_( St#st.routes, Service), []), t}; handle_call(_Request, _From, St) -> Reply = ok, @@ -179,8 +182,13 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) false -> %% Continue with the old routes and matching len installed find_routes_(T, Service, CurRoutes, CurMatchLen) - end. + end; +find_routes_(Rt, Svc, CurRoutes, CurMatchLen) -> + ?info("----------------Failed on ~p", [Rt]), + { x, y}. + + find_routes_(Routes, Service) -> case find_routes_(Routes, Service, undefined, 0) of { undefined, 0 } -> @@ -203,6 +211,5 @@ normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) -> normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) -> normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]); -normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) -> - normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]). - +normalize_routes_({ServicePrefix, [ {Pr, DL} | Rem ]}, Acc) -> + normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, [] } } | Acc]). diff --git a/components/schedule/src/rvi_schedule.erl b/components/schedule/src/rvi_schedule.erl index 00a142a..2b4606e 100644 --- a/components/schedule/src/rvi_schedule.erl +++ b/components/schedule/src/rvi_schedule.erl @@ -7,6 +7,7 @@ %% -module(rvi_schedule). + -include_lib("rvi_common/include/rvi_common.hrl"). -callback schedule_message(CompSpec :: #component_spec{}, diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 310823c..7a1b23a 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -8,20 +8,17 @@ -module(schedule_rpc). -behaviour(gen_server). --behaviour(rvi_schedule). -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). %% API -export([start_link/0]). --export([schedule_message/6, - register_remote_services/3, - unregister_remote_services/2]). +-export([schedule_message/6]). %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior -export([service_available/3, - service_unavailable/3] + service_unavailable/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -40,12 +37,10 @@ %% %% Service -> ETS -> Messages -record(service, { - name = "", %% Fully qualified service name. - %% Set to { IP, Port} when service is available, else unknown_network_address. - address = unknown, + key = { "", unknown }, %% Service name and data link modu - %% Targeted data link module - data_link_module = unknown + %% Is this service currently available on the data link module + available = false, %% Table containing #message records, %% indexed by their transaction ID (and sequence of delivery) @@ -117,7 +112,7 @@ init([]) -> cs = rvi_common:get_component_specification(), services_tid = ets:new(rvi_schedule_services, [ set, private, - { keypos, #service.name } ])}}. + { keypos, #service.key } ])}}. start_json_server() -> rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup). @@ -139,35 +134,19 @@ schedule_message(CompSpec, [status, transaction_id], CompSpec). -register_remote_services(CompSpec, NetworkAddress, Services) -> - rvi_common:notification(schedule, ?MODULE, - register_remote_services, - [{ network_address, NetworkAddress} , - { services, Services }], - CompSpec). - - -unregister_remote_services(CompSpec, ServiceNames) -> - rvi_common:notification(schedule, ?MODULE, - unregister_remote_services, - [{ services, ServiceNames }], - CompSpec). - -service_available(Service, DataLinkModule, Address) -> +service_available(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(schedule, ?MODULE, service_available, - [{ service, Service }, - { data_link_module, DataLinkModule }, - { address, Address }], + [{ service, SvcName }, + { data_link_mod, DataLinkModule } ], CompSpec). -service_unavailable(Service, DataLinkModule, Address) -> +service_unavailable(CompSpec, SvcName, DataLinkModule) -> rvi_common:notification(schedule, ?MODULE, service_unavailable, - [{ service, Service }, - { data_link_module, DataLinkModule }, - { address, Address }], + [{ service, SvcName }, + { data_link_mod, DataLinkModule } ], CompSpec). %% JSON-RPC entry point @@ -197,54 +176,28 @@ handle_rpc("schedule_message", Args) -> { transaction_id, TransID } ] }; - - handle_rpc(Other, _Args) -> ?debug("schedule_rpc:handle_rpc(~p): unknown", [ Other ]), {ok, [ {status, rvi_common:json_rpc_status(invalid_command)}]}. -handle_notification("register_remote_services", Args) -> - {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), - {ok, Services} = rvi_common:get_json_element(["services"], Args), - ?debug("schedule_notification:register_remote_services(): network_address: ~p", [ NetworkAddress]), - ?debug("schedule_notification:register_remote_services(): services: ~p", [ Services]), - - gen_server:cast(?SERVER, { rvi, register_remote_services, - [ NetworkAddress, - Services ]}), - - ok; - -handle_notification("unregister_remote_services", Args) -> - {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args), - ?debug("schedule_notification:unregister_remote_services(): services ~p", - [ DiscountinuedServices]), - - gen_server:cast(?SERVER, { rvi, unregister_remote_services, - [ DiscountinuedServices ]}), - ok; handle_notification("service_available", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), gen_server:cast(?SERVER, { rvi, service_available, - [ Service, - DataLinkModule, - Address ]}), + [ SvcName, + DataLinkModule ]}), ok; handle_notification("service_unavailable", Args) -> - {ok, Service} = rvi_common:get_json_element(["service"], Args), - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - {ok, Address} = rvi_common:get_json_element(["address"], Args), + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), gen_server:cast(?SERVER, { rvi, service_unavailable, - [ Service, - DataLinkModule, - Address ]}), + [ SvcName, + DataLinkModule ]}), ok; @@ -264,20 +217,28 @@ handle_call( { rvi, schedule_message, ?debug("schedule:sched_msg(): parameters: ~p", [Parameters]), ?debug("schedule:sched_msg(): signature: ~p", [Signature]), ?debug("schedule:sched_msg(): certificate ~p", [Certificate]), - ?debug("schedule:sched_msg(): St: ~p", [St]), + %%?debug("schedule:sched_msg(): St: ~p", [St]), %% %% Retrieve the routes that we should try for this message %% - case rvi_routing:get_service_routing(SvcName) of + case rvi_routing:get_service_routes(SvcName) of not_found -> {reply, [ no_route ], St }; Routes -> - { NewTransID, NSt1} = create_transaction_id(St), - NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature); - - { reply, [ok, TransID], NS2 } + { TransID, NSt1} = create_transaction_id(St), + NSt2 = queue_message(SvcName, + TransID, + Routes, + Timeout, + calc_relative_tout(Timeout), + Parameters, + Signature, + Certificate, + NSt1), + + { reply, [ok, TransID], NSt2 } end; @@ -297,71 +258,40 @@ handle_call(Other, _From, St) -> %% @end %%-------------------------------------------------------------------- -handle_cast( {rvi, register_remote_services, - [ NetworkAddress, Services]}, St) -> - - ?info("schedule:register_remote_services(): services(~p) -> ~p", - [Services, NetworkAddress]), - - {ok, NSt} = multiple_services_available(Services, NetworkAddress, St), - {noreply, NSt}; +handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) -> + %% Find or create the service. + ?debug("schedule:service_available(): ~p:~p", [ DataLinkModule, SvcName ]), -handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) -> - ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]), - {ok, NSt} = multiple_services_unavailable(ServiceNames, St), - {noreply, NSt }; + SvcRec = update_service(SvcName, DataLinkModule, available, St), -handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) -> - %% Find the service - case ets:lookup(SvcTid, {Service, DataLinkModule}) of - [] -> %% No service found - Just unsubscribe. Shouldn't really happen - service_discovery_rpc: - unsubscribe_to_service_availability({Service, DataLinkModule}, - ?MODULE); - - { noreply, St }; + %% Try to send any pending messages. + { _, NSt} = try_sending_messages(SvcRec, St), - [ Svc ] -> - %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} - ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.", - [ DataLinkModule, Service, Address ]), - update_service_network_address(Svc, Address, St), - { _, NSt2 } = try_sending_messages(Svc, NSt1), - { noreply, NSt2} - end. + { noreply, NSt}; -handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]}, +handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, #st { services_tid = SvcTid } = St) -> - %% Find the service - case ets:lookup(SvcTid, {Service, DataLinkModule}) of + %% Grab the service + case ets:lookup(SvcTid, {SvcName, DataLinkModule}) of [] -> %% No service found - no op. {noreply, St}; - [ Svc ] -> - %% Delete service if it is unused. - case delete_unused_service(SvcTid, Svc) of + [ SvcRec ] -> + %% Delete service if it does not have any pending messages. + case delete_unused_service(SvcTid, SvcRec) of true -> %% service was deleted - - %% Unsubscribe from service availablility notifications - service_discovery_rpc: - unsubscribe_to_service_availability({ Service, DataLinkModule}, - ?MODULE); - { noreply, St}; - false -> %% Service was not deleted, update its network address to unknown - { _, _, _, NSt} = - update_service_network_address(Svc, unknown_network_address, St), - - { noreply, NSt } + false -> %% SvcName was not deleted, set it to not available + update_service(SvcName, DataLinkModule, unavailable, St), + + { noreply, St } end - end. - { noreply, St}; + end; handle_cast(Other, St) -> @@ -381,24 +311,41 @@ handle_cast(Other, St) -> %% Handle timeouts handle_info({ rvi_message_timeout, SvcName, DLMod, TransID}, - #st { cs = CompSpec, services_tid = SvcTid } = St) -> + #st { services_tid = SvcTid } = St) -> + %% Look up the service / DataLink mod case ets:lookup(SvcTid, {SvcName, DLMod}) of - [ Svc ] -> - %% Delete from ets. - case ets:lookup(Svc#service.messages_tid, TransID) of + [ SvcRec ] -> %% Found service for specific data link + + %% Delete message from service queue + case ets:lookup(SvcRec#service.messages_tid, TransID) of [ Msg ] -> ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), - ets:delete(Svc#service.messages_tid, TransID), - queue_message( + ets:delete(SvcRec#service.messages_tid, TransID), + + %% Try to requeue message + { _Res, NSt } = + queue_message(SvcName, + Msg#message.transaction_id, + Msg#message.routes, + Msg#message.timeout, + calc_relative_tout(Msg#message.timeout), + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate, + St), + {noreply, NSt}; + _ -> ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), - ok + + {noreply, St} + end; - _-> ok - end, - {noreply, St}; + _ -> {noreply, St} + + end; handle_info(_Info, St) -> @@ -435,289 +382,234 @@ code_change(_OldVsn, St, _Extra) -> +%% %% No more routes to try -queue_message(_SvcName, +%% +queue_message(SvcName, _TransID, - _MessageTimeout [ ], + _MessageTimeout, + _RelativeTimeout, _Parameters, _Signature, _Certificate, St) -> %% FIXME: Handle route failure + ?notice("schedule:queue_message(): Ran out of routes to try for ~p", [SvcName]), { route_failed, St }; +%% +%% The message has timed out +%% +queue_message(SvcName, + TransID, + _Routes, + _MessageTimeout, + -1, %% We are timed out + _Parameters, + _Signature, + _Certificate, + St) -> + do_timeout_callback(St#st.cs, SvcName, TransID), + { timeout, St }; + + +%% Try to queue message queue_message(SvcName, TransID, - [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ], - MessageTimeout, + [ { { _Pr, _PrOp }, { DLMod, DLOp } } | RemainingRoutes ], + Timeout, + RelativeTimeout, Parameters, Signature, Certificate, St) -> + %% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), %% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), %% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), %% ?info("schedule:sched_msg(): signature: ~p", [Signature]), %% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), - %% Calculate how many msec we have until message times out + SvcRec = find_or_create_service(SvcName, DLMod, St), - RelativeTimeout = calc_relative_tout(MessageTimeout), - - %% Did we run out of time for the message? - case RelativeTimeout > 1 of - %% Nope - true -> - {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St), - - Msg = #message { - service = SvcName, - transaction_id = TransID, - data_link = { DLMod, DLOp }, - routes = RemaingRoutes, - message_timeout = MessageTimeout, - route_timeout_ref = TRef, - parameters = Parameters, - signature = Signature, - certificate = Certificate - }, - - - %% Add to ets table - TransID = ets:insert(Service#service.messages_tid, Msg), - - %% If this is a new service, subscribe to updates on this service's availablity. - %% If service is already available, we will get service_available/3 notification - %% immediately. - case IsNewService of - true -> - service_discovery_rpc: - subscribe_to_service_availability({SvcName, DLMod}, - ?MODULE); - false -> - ok - end, + %% The service may already be available, give it a shot. + case try_sending_messages(SvcRec, St) of + { ok, NSt } -> %% We managed to send the message. Done. + { ok, NSt }; + { _ , NSt } -> %% Failed to send message. Setup data link + %% %% Bring up the relevant data link for the given route. %% Once up, the data link will invoke service_availble() %% to indicate that the service is available for the given DL. %% - case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of + case DLMod:setup_data_link(NSt#st.cs, SvcName, DLOp) of { ok, DLTimeout } -> - %% Setup a timeout to be - TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout), + + %% Setup a timeout that triggers on whatever + %% comes first of the message's general timeout + %% or the timeout of the data link bringup + TRef = erlang:send_after(min(RelativeTimeout, DLTimeout), self(), - { rvi_message_timeout, SvcName, DLMod, TransID }); - + { rvi_message_timeout, SvcName, DLMod, TransID }), + + %% Setup a message to be added to the service / dl table. + Msg = #message { + transaction_id = TransID, + data_link = { DLMod, DLOp }, + routes = RemainingRoutes, + timeout = Timeout, + timeout_tref = TRef, + parameters = Parameters, + signature = Signature, + certificate = Certificate + }, + + + %% Add to ets table + TransID = ets:insert(SvcRec#service.messages_tid, Msg), + {ok, NSt}; + %% We failed with this route. Try the next one { error, _Reason} -> queue_message(SvcName, TransID, - RelativeTimeout, + Timeout, + calc_relative_tout(Timeout), RemainingRoutes, Parameters, Signature, Certificate, - St) - end; - - %% We are out of time - false -> - do_timeout_callback(CompSpec, Svc, Msg), - { timeout, St} + NSt) + end end. -forward_to_protocol(Svc, Msg, St) -> - { DataLinkMod, DataLinkOpts } = Msg#message.data_link, - { ProtoMod, ProtoOpts } = Msg#messge.protocol, - - case ProtoMod:send_message( - St#st.cs, - SvcName, - Msg#message.timeout, - ProtoOpts, - DataLinkMod, - DataLinkOpts, - NetworkAddress, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate) of - - %% Success - [ok] -> - %% Send the rest. - try_sending_messages(Service, St); - - %% Failed - [Err] -> - ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p", - [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]), - - %% Requeue this message with the next route - { _, St1} = queue_message(SvcName, - TransID, - Msg#message.timeout, - Msg#message.routes, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate, - St), - - %% Send the rest of the messgages targeting - %% the same service through the same data link - try_sending_messages(Svc, St) - end. - - - -%% Check if we can send messages queued up under the given service. +%% The service is not available try_sending_messages(#service { - name = SvcName, - network_address = unknown_network_address, - messages_tid = _Tid } = _Service, St) -> + key = { SvcName, _ }, + available = false, + messages_tid = _Tid } = _SvcRec, St) -> + ?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]), - {not_available, St}; + { not_available, St }; try_sending_messages(#service { - name = SvcName, - data_link_module = DataLinkModule, - network_address = NetworkAddress, - messages_tid = Tid } = Service, St) -> + key = { SvcName, DataLinkMod }, + available = true, + messages_tid = Tid } = SvcRec, St) -> - ?debug("schedule:try_send(): SvcName: ~p", [SvcName]), - ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]), + ?debug("schedule:try_send(): Service: ~p:~p", [DataLinkMod, SvcName]), %% Extract the first message of the queue. - case ets:first(Tid) of - %% No more messages to send. - '$end_of_table' -> + case first_service_message(SvcRec) of + empty -> ?debug("schedule:try_send(): Nothing to send"), { ok, St }; - Key -> - St1 = forward_to_protocol - ?debug("schedule:try_send(): Sending: ~p", [Key]), - %% Extract first message and try to send it - case ets:lookup(Tid, Key) of - [ Msg ] -> - %% Wipe from ets table and cancel timer - ets:delete(Tid, Key), - erlang:cancel_timer(Msg#message.timeout_tref), - - %% Forward to protocol and resend - forward_to_protocol(Service, Msg, St); - - _ -> - ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), - { ok, St} - + yanked -> + ?info("schedule:try_send(): Message was yanked while trying to send: ~p", + [SvcRec#service.key]), + { ok, St}; + + Msg -> + %% Wipe from ets table and cancel timer + ets:delete(Tid, Msg#message.transaction_id), + + erlang:cancel_timer(Msg#message.timeout_tref), + + %% Forward to protocol. + { _, DataLinkOpts } = Msg#message.data_link, + { ProtoMod, ProtoOpts } = Msg#message.protocol, + + %% Send off message to the correct protocol module + case ProtoMod:send_message( + St#st.cs, + SvcName, + Msg#message.timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate) of + + %% Success + [ok] -> + %% Send the rest of the messages associated with this service/dl + try_sending_messages(SvcRec, St); + + %% Failed + [Err] -> + ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p", + [ProtoMod, DataLinkMod, SvcName, Err]), + + %% Requeue this message with the next route + { _, St1} = queue_message(SvcName, + Msg#message.transaction_id, + Msg#message.timeout, + calc_relative_tout(Msg#message.timeout), + Msg#message.routes, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate, + St), + + %% Send the rest of the messgages targeting + %% the same service through the same data link + %% If they all fail due to some data link error, + %% they will all be requeued to the next route. + try_sending_messages(SvcRec, St1) end end. -%% -%% data_link_up has reported that multiple services are now -%% available at NetworkAddress. -%% Iterate through all services and mark them as available, -%% possibly sending any pending message targeting the newly activated -%% service. -%% -multiple_services_available([], _NetworkAddress, St) -> - {ok, St}; - -multiple_services_available([ Svc | T], NetworkAddress, St) -> - {ok, NSt} = service_available(Svc, NetworkAddress, St), - multiple_services_available(T, NetworkAddress, NSt). - -multiple_services_unavailable([], St) -> - {ok, St}; - -multiple_services_unavailable([ SvcName | T], St) -> - {ok, NSt} = service_unavailable(SvcName, St), - multiple_services_unavailable(T, NSt). +find_or_create_service(SvcName, DataLinkMod, #st { services_tid = SvcTid } = St) -> + ?debug("schedule:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]), - -find_or_create_service(ServiceName, St) -> - %% Invoke with retain to keep any exising network addresses already - %% in place in an existing service. - find_or_create_service(ServiceName, retain_existing_address, St). - -find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) -> - ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]), - - case ets:lookup(SvcTid, ServiceName) of + case ets:lookup(SvcTid, { SvcName, DataLinkMod }) of [] -> %% The given service does not exist, create it. - ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]), - create_service(ServiceName, NetworkAddress, St); - - [ Svc1 ] when NetworkAddress =:= retain_existing_address -> - %% We found a service, and are instructed not to touch - %% its network address. Return it. - { ok, Svc1, false, St}; + ?debug("schedule:find_or_create_service(): Creating new ~p", [ SvcName]), + update_service(SvcName, DataLinkMod, unavailable, St); - [ Svc2 ] -> + [ SvcRec ] -> %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} - ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]), - update_service_network_address(Svc2, NetworkAddress, St) + %% the new service / State as {ok, NSvcRec, false, NSt} + ?debug("schedule:find_or_create_service(): Updating existing ~p", [ SvcName]), + SvcRec end. -%% -%% Catch where no network address update is necessary. -%% -update_service_network_address(#service { network_address = NetworkAddress } = Service, - NetworkAddress, St) -> - { ok, Service, false, St}; %% False indicates that the service exists. - - -%% -%% Update the service in the ets table with a new network address. -%% -update_service_network_address(#service {} = Service, NetworkAddress, St) -> - %% Create a new service. - NewService = Service#service { network_address = NetworkAddress}, - %% Replace existing serviceo in the ets table of services. - ets:insert(St#st.services_tid, [ NewService ]), - { ok, NewService, false, St}. %% False indicates that the service exists. - - - - -%% If we are creating a new service, we need to remap the -%% retain_existing_address to unknown_network_address in order to get the correct -%% initial state, which is a created service with an unknown network address. -create_service(ServiceName, retain_existing_address, St) -> - create_service(ServiceName, unknown_network_address, St); - - -%% Create a new service and return the new state (not currently modified) -%% and the newly initialized service revord. +%% Create a new service, or update an existing one, and return the new +%% state (not currently modified) and the newly initialized service +%% revord. %% -create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) -> - Svc = #service { - name = ServiceName, - network_address = NetworkAddress, - messages_tid = ets:new(rvi_messages, - [ ordered_set, private, - { keypos, #message.transaction_id } ]) - }, +update_service(SvcName, DataLinkMod, Available, + #st { services_tid = SvcsTid, cs = CS }) -> + %% Return new service and existing state. + ?debug("schedule:create_service(): ~p:~p ", [ DataLinkMod, SvcName]), + SvcRec = #service { + key = { SvcName, DataLinkMod }, + available = Available, + messages_tid = ets:new(rvi_messages, + [ ordered_set, private, + { keypos, #message.transaction_id } ]) + }, %% Insert new service to ets table. - ets:insert(SvcsTid, Svc), - - %% Return new service and existing state. - ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]), - ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), - { ok, Svc, true, St}. %% True indicates that the service is newly created. + ets:insert(SvcsTid, SvcRec), + + %% Subscribe to updates on the availbility of this service. + service_discovery_rpc:subscribe(CS, SvcName, DataLinkMod), + + SvcRec. %% Create a new and unique transaction id @@ -748,7 +640,7 @@ calc_relative_tout(UnixTime) -> case TOut =< 0 of true -> - 1; %% One millisec is the smallest value we will time out on + -1; %% We have timed out false -> TOut * 1000 @@ -756,30 +648,45 @@ calc_relative_tout(UnixTime) -> %% Handle a callback for a timed out message. -do_timeout_callback(CompSpec, Service, - #message {transaction_id = TransID}) -> - service_edge_rpc:handle_local_timeout(CompSpec, Service, TransID), - ok; +do_timeout_callback(CompSpec, SvcName, TransID) -> + service_edge_rpc:handle_local_timeout(CompSpec, SvcName, TransID), + ok. -%% callback element of #message is not an {M,F,A} format, ignore. -do_timeout_callback(_,_,_) -> - ok. %% Kill off a service that is no longer used. %% -delete_unused_service(SvcTid, Svc) -> +delete_unused_service(SvcTid, SvcRec) -> %% Do we have messages waiting for this service? - case ets:first(Svc#service.messages_tid) of + case ets:first(SvcRec#service.messages_tid) of %% Nope. '$end_of_table' -> - ets:delete(Svc#messages_tid), - ets:delete(SvcTid, { Service, DataLinkModule}) + ets:delete(SvcRec#service.messages_tid), + ets:delete(SvcTid, SvcRec#service.key), + + { SvcName, DataLinkMod} = SvcRec#service.key, + %% Unsubscribe from service availablility notifications + service_discovery_rpc: + unsubscribe_to_service_availability(SvcName, DataLinkMod, ?MODULE), %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} + %% the new service / State as {ok, NSvcRec, false, NSt} ?debug("schedule:service_unavailable(): Service ~p:~p now has no address.", - [ DataLinkModule, Service ]), + [ SvcRec#service.key ]), true; _ -> false end. + +first_service_message(#service { messages_tid = Tid }) -> + case ets:first(Tid) of + '$end_of_table' -> + empty; + + Key -> + case ets:lookup(Tid, Key) of + [ Msg ] -> Msg; + [] -> yanked + end + end. + + diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl index 8589df0..76c403e 100644 --- a/components/schedule/src/schedule_sup.erl +++ b/components/schedule/src/schedule_sup.erl @@ -34,7 +34,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ - ?CHILD(schedule_rpc, worker) - ?CHILD(router, worker) + ?CHILD(schedule_rpc, worker), + ?CHILD(rvi_routing, worker) ]} }. |