summaryrefslogtreecommitdiff
path: root/components/schedule
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-07 16:20:21 -0700
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-07 16:20:21 -0700
commit9b605e34f48501fbc5b05cc0eb570d32860ce911 (patch)
tree018cd8a237b63e32651892f9ba30571c216e2ecf /components/schedule
parent261b2e3303a1c955a1d7b397b3e3741adeae5092 (diff)
downloadrvi_core-9b605e34f48501fbc5b05cc0eb570d32860ce911.tar.gz
Temporary, non buildable
Diffstat (limited to 'components/schedule')
-rw-r--r--components/schedule/src/rvi_routing.erl208
-rw-r--r--components/schedule/src/schedule_rpc.erl446
-rw-r--r--components/schedule/src/schedule_sup.erl1
3 files changed, 498 insertions, 157 deletions
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
new file mode 100644
index 0000000..d5b0548
--- /dev/null
+++ b/components/schedule/src/rvi_routing.erl
@@ -0,0 +1,208 @@
+%%
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+-module(rvi_routing).
+
+-behaviour(gen_server).
+
+%% API
+-export([get_service_routes/1]).
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(ROUTING_RULES, routing_rules).
+
+-record(route, {
+ service_prefix = ""::string(),
+ proto_link_pairs = []:: list(tuple())
+ }).
+
+-record(st, {
+ routes= []:: list(#route{})
+ }).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+
+get_service_routes(Service) ->
+ gen_server:call(?SERVER, { rvi_get_service_routes, Service }).
+
+
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, St} |
+%% {ok, St, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+
+init([]) ->
+ {ok, Routes } = application:get_env(rvi, ?ROUTING_RULES),
+
+ {ok, #st {
+ routes = Routes
+ }}.
+
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, St) ->
+%% {reply, Reply, St} |
+%% {reply, Reply, St, Timeout} |
+%% {noreply, St} |
+%% {noreply, St, Timeout} |
+%% {stop, Reason, Reply, St} |
+%% {stop, Reason, St}
+%% @end
+%%--------------------------------------------------------------------
+handle_call( { rvi_get_service_routes, Service }, _From, St) ->
+ {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t};
+
+handle_call(_Request, _From, St) ->
+ Reply = ok,
+ {reply, Reply, St}.
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, St) -> {noreply, St} |
+%% {noreply, St, Timeout} |
+%% {stop, Reason, St}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, St) ->
+ {noreply, St}.
+
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, St) -> {noreply, St} |
+%% {noreply, St, Timeout} |
+%% {stop, Reason, St}
+%% @end
+%%--------------------------------------------------------------------
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, St) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _St) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process st when code is changed
+%%
+%% @spec code_change(OldVsn, St, Extra) -> {ok, NewSt}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+
+prefix_match_(_, [], Len) ->
+ Len;
+
+prefix_match_([], _, Len ) ->
+ Len;
+
+prefix_match_([ C1 | T1], [ C2 | T2 ], Len) when C1 =:= C2 ->
+ prefix_match_(T1, T2, Len +1);
+
+prefix_match_(_,_, Len) ->
+ Len.
+
+find_routes_([], _Service, CurRoutes, CurMatchLen ) ->
+ { CurRoutes, CurMatchLen };
+
+find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) ->
+ MatchLen = prefix_match_(Service, ServicePrefix, 0),
+
+ %% Do we have a better match than previosly recorded?
+ case MatchLen >= CurMatchLen of
+ true ->
+ %% Continue with the new routes and matching len installed
+ find_routes_(T, Service, Routes, MatchLen);
+
+ false ->
+ %% Continue with the old routes and matching len installed
+ find_routes_(T, Service, CurRoutes, CurMatchLen)
+ end.
+
+find_routes_(Routes, Service) ->
+ case find_routes_(Routes, Service, undefined, 0) of
+ { undefined, 0 } ->
+ not_found;
+
+ { Routes, _MatchLen } ->
+ Routes
+ end.
+
+
+normalize_routes_({ServicePrefix, []}, Acc) ->
+ { ServicePrefix, lists:reverse(Acc) };
+
+normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, { DL, DLOp }} | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, DLOp } } | Acc]);
+
+normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, DLOp } } | Acc]);
+
+normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
+
+normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]).
+
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index 64b57cb..310823c 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -6,9 +6,6 @@
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
-
-
-
-module(schedule_rpc).
-behaviour(gen_server).
-behaviour(rvi_schedule).
@@ -21,6 +18,11 @@
register_remote_services/3,
unregister_remote_services/2]).
+%% Invoked by service discovery
+%% FIXME: Should be rvi_service_discovery behavior
+-export([service_available/3,
+ service_unavailable/3]
+
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -40,7 +42,10 @@
-record(service, {
name = "", %% Fully qualified service name.
%% Set to { IP, Port} when service is available, else unknown_network_address.
- network_address = unknown_network_address,
+ address = unknown,
+
+ %% Targeted data link module
+ data_link_module = unknown
%% Table containing #message records,
%% indexed by their transaction ID (and sequence of delivery)
@@ -58,8 +63,11 @@
-record(message, {
transaction_id, %% Transaction ID that message is tagged with.
timeout, %% Timeout, UTC
- timeout_tref, %% Reference to erlang timer associated with this message.
- timeout_cb, %% Callback to invoke when timeout occurs.
+ data_link, %% Data Link Module to use. { Module, Opts}
+ protocol, %% Protocol to use. { Module Opts }
+ routes, %% Routes retrieved for this
+ timeout_tref, %% Reference to erlang timer associated with this message.
+ timeout_cb, %% Callback to invoke when timeout occurs.
parameters,
signature,
certificate
@@ -146,10 +154,26 @@ unregister_remote_services(CompSpec, ServiceNames) ->
CompSpec).
+service_available(Service, DataLinkModule, Address) ->
+ rvi_common:notification(schedule, ?MODULE,
+ service_available,
+ [{ service, Service },
+ { data_link_module, DataLinkModule },
+ { address, Address }],
+ CompSpec).
+
+service_unavailable(Service, DataLinkModule, Address) ->
+ rvi_common:notification(schedule, ?MODULE,
+ service_unavailable,
+ [{ service, Service },
+ { data_link_module, DataLinkModule },
+ { address, Address }],
+ CompSpec).
%% JSON-RPC entry point
%% CAlled by local exo http server
handle_rpc("schedule_message", Args) ->
+
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
@@ -194,11 +218,36 @@ handle_notification("register_remote_services", Args) ->
handle_notification("unregister_remote_services", Args) ->
{ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args),
- ?debug("schedule_notification:unregister_remote_services(): services ~p", [ DiscountinuedServices]),
+ ?debug("schedule_notification:unregister_remote_services(): services ~p",
+ [ DiscountinuedServices]),
+
gen_server:cast(?SERVER, { rvi, unregister_remote_services,
[ DiscountinuedServices ]}),
ok;
+handle_notification("service_available", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ {ok, Address} = rvi_common:get_json_element(["address"], Args),
+
+ gen_server:cast(?SERVER, { rvi, service_available,
+ [ Service,
+ DataLinkModule,
+ Address ]}),
+
+ ok;
+handle_notification("service_unavailable", Args) ->
+ {ok, Service} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+ {ok, Address} = rvi_common:get_json_element(["address"], Args),
+
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
+ [ Service,
+ DataLinkModule,
+ Address ]}),
+
+ ok;
+
handle_notification(Other, _Args) ->
?debug("schedule_notification:handle_other(~p): unknown", [ Other ]),
ok.
@@ -217,16 +266,20 @@ handle_call( { rvi, schedule_message,
?debug("schedule:sched_msg(): certificate ~p", [Certificate]),
?debug("schedule:sched_msg(): St: ~p", [St]),
+ %%
+ %% Retrieve the routes that we should try for this message
+ %%
+ case rvi_routing:get_service_routing(SvcName) of
+ not_found ->
+ {reply, [ no_route ], St };
- { ok, TransID, NSt } = queue_message(SvcName,
- Timeout,
- Parameters,
- Signature,
- Certificate,
- St),
+ Routes ->
+ { NewTransID, NSt1} = create_transaction_id(St),
+ NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature);
+
+ { reply, [ok, TransID], NS2 }
+ end;
- { reply, [ok, TransID], NSt };
-
handle_call(Other, _From, St) ->
?warning("schedule:handle_call(~p): unknown", [ Other ]),
@@ -250,7 +303,7 @@ handle_cast( {rvi, register_remote_services,
?info("schedule:register_remote_services(): services(~p) -> ~p",
[Services, NetworkAddress]),
- {ok, NSt} = multiple_services_available(Services, NetworkAddress, St),
+ {ok, NSt} = multiple_services_available(Services, NetworkAddress, St),
{noreply, NSt};
@@ -260,6 +313,57 @@ handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) ->
{ok, NSt} = multiple_services_unavailable(ServiceNames, St),
{noreply, NSt };
+handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) ->
+ %% Find the service
+ case ets:lookup(SvcTid, {Service, DataLinkModule}) of
+ [] -> %% No service found - Just unsubscribe. Shouldn't really happen
+ service_discovery_rpc:
+ unsubscribe_to_service_availability({Service, DataLinkModule},
+ ?MODULE);
+
+ { noreply, St };
+
+ [ Svc ] ->
+ %% Update the network address, if it differs, and return
+ %% the new service / State as {ok, NSvc, false, NSt}
+ ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.",
+ [ DataLinkModule, Service, Address ]),
+ update_service_network_address(Svc, Address, St),
+ { _, NSt2 } = try_sending_messages(Svc, NSt1),
+ { noreply, NSt2}
+ end.
+
+handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]},
+ #st { services_tid = SvcTid } = St) ->
+
+ %% Find the service
+ case ets:lookup(SvcTid, {Service, DataLinkModule}) of
+ [] -> %% No service found - no op.
+ {noreply, St};
+
+ [ Svc ] ->
+ %% Delete service if it is unused.
+ case delete_unused_service(SvcTid, Svc) of
+ true -> %% service was deleted
+
+ %% Unsubscribe from service availablility notifications
+ service_discovery_rpc:
+ unsubscribe_to_service_availability({ Service, DataLinkModule},
+ ?MODULE);
+
+ { noreply, St};
+
+ false -> %% Service was not deleted, update its network address to unknown
+ { _, _, _, NSt} =
+ update_service_network_address(Svc, unknown_network_address, St),
+
+ { noreply, NSt }
+ end
+
+ end.
+ { noreply, St};
+
+
handle_cast(Other, St) ->
?warning("schedule:handle_cast(~p): unknown", [ Other ]),
{noreply, St}.
@@ -276,19 +380,20 @@ handle_cast(Other, St) ->
%%--------------------------------------------------------------------
%% Handle timeouts
-handle_info({ rvi_message_timeout, SvcName, TransID},
+handle_info({ rvi_message_timeout, SvcName, DLMod, TransID},
#st { cs = CompSpec, services_tid = SvcTid } = St) ->
- case ets:lookup(SvcTid, SvcName) of
+ case ets:lookup(SvcTid, {SvcName, DLMod}) of
[ Svc ] ->
%% Delete from ets.
case ets:lookup(Svc#service.messages_tid, TransID) of
[ Msg ] ->
?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]),
- do_timeout_callback(CompSpec, Svc, Msg),
- ets:delete(Svc#service.messages_tid, TransID);
+ ets:delete(Svc#service.messages_tid, TransID),
+ queue_message(
_ ->
- ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]),
+ ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing",
+ [ TransID, SvcName]),
ok
end;
_-> ok
@@ -329,70 +434,149 @@ code_change(_OldVsn, St, _Extra) ->
%%%===================================================================
-%% Queue a message for transmission once a service become available.
-%% If the service does not exist, it will be created.
-%% Once the message is queued, an attempt will be made to send all
-%% messages queued in the given service. This attempt will fail
-%% pretty quickly if the service's network address is set to
-%% 'unavailable'
+
+%% No more routes to try
+queue_message(_SvcName,
+ _TransID,
+ _MessageTimeout
+ [ ],
+ _Parameters,
+ _Signature,
+ _Certificate, St) ->
+
+ %% FIXME: Handle route failure
+ { route_failed, St };
+
+
queue_message(SvcName,
- Timeout,
+ TransID,
+ [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ],
+ MessageTimeout,
Parameters,
Signature,
- Certificate, St) ->
+ Certificate,
+ St) ->
%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]),
%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]),
%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]),
%% ?info("schedule:sched_msg(): signature: ~p", [Signature]),
%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]),
- %% Create a new transaction ID.
- {NewTransID, NSt1} = create_transaction_id(St),
-
- %% Create a timer to handle message timeout
- TRef = erlang:send_after(calculate_timeout_period(Timeout), self(),
- { rvi_message_timeout, SvcName, NewTransID }),
-
- %% Build the message record.
- Msg = #message {
- transaction_id = NewTransID,
- timeout = Timeout,
- timeout_tref = TRef,
- parameters = Parameters,
- signature = Signature,
- certificate = Certificate
- },
-
- %% Find or create the service
-
- %% If this is a message to a (yet) unknown service, we will create
- %% it to act as a placeholder for its messages until they time out
- %% or the service becomes available.
- %%
- %% If the service does exist, we queue it and try to send it.
- %%
- {ok, Service, IsNewService, NSt2 } = find_or_create_service(SvcName, NSt1),
+ %% Calculate how many msec we have until message times out
+
+ RelativeTimeout = calc_relative_tout(MessageTimeout),
- %% If the service was created by the call above,
- %% We will try to bring up a data link to it.
- case IsNewService of
+ %% Did we run out of time for the message?
+ case RelativeTimeout > 1 of
+ %% Nope
true ->
- bring_up_data_link(St#st.cs, SvcName);
+ {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St),
+
+ Msg = #message {
+ service = SvcName,
+ transaction_id = TransID,
+ data_link = { DLMod, DLOp },
+ routes = RemaingRoutes,
+ message_timeout = MessageTimeout,
+ route_timeout_ref = TRef,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ },
+
+
+ %% Add to ets table
+ TransID = ets:insert(Service#service.messages_tid, Msg),
+
+ %% If this is a new service, subscribe to updates on this service's availablity.
+ %% If service is already available, we will get service_available/3 notification
+ %% immediately.
+ case IsNewService of
+ true ->
+ service_discovery_rpc:
+ subscribe_to_service_availability({SvcName, DLMod},
+ ?MODULE);
+ false ->
+ ok
+ end,
+
+
+ %%
+ %% Bring up the relevant data link for the given route.
+ %% Once up, the data link will invoke service_availble()
+ %% to indicate that the service is available for the given DL.
+ %%
+ case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of
+ { ok, DLTimeout } ->
+ %% Setup a timeout to be
+ TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout),
+ self(),
+ { rvi_message_timeout, SvcName, DLMod, TransID });
+
+ %% We failed with this route. Try the next one
+ { error, _Reason} ->
+ queue_message(SvcName,
+ TransID,
+ RelativeTimeout,
+ RemainingRoutes,
+ Parameters,
+ Signature,
+ Certificate,
+ St)
+ end;
+ %% We are out of time
false ->
- ok
- end,
+ do_timeout_callback(CompSpec, Svc, Msg),
+ { timeout, St}
+ end.
- %% Add to ets table
- ets:insert(Service#service.messages_tid, Msg),
- %% Attempt to send the message
- {_, NSt3 } = try_sending_messages(Service, NSt2),
-
- %% Return
- { ok, NewTransID, NSt3}.
+forward_to_protocol(Svc, Msg, St) ->
+ { DataLinkMod, DataLinkOpts } = Msg#message.data_link,
+ { ProtoMod, ProtoOpts } = Msg#messge.protocol,
+
+ case ProtoMod:send_message(
+ St#st.cs,
+ SvcName,
+ Msg#message.timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ NetworkAddress,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate) of
+
+ %% Success
+ [ok] ->
+ %% Send the rest.
+ try_sending_messages(Service, St);
+
+ %% Failed
+ [Err] ->
+ ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p",
+ [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]),
+
+ %% Requeue this message with the next route
+ { _, St1} = queue_message(SvcName,
+ TransID,
+ Msg#message.timeout,
+ Msg#message.routes,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+
+ %% Send the rest of the messgages targeting
+ %% the same service through the same data link
+ try_sending_messages(Svc, St)
+ end.
+
+
+
%% Check if we can send messages queued up under the given service.
try_sending_messages(#service {
name = SvcName,
@@ -401,51 +585,35 @@ try_sending_messages(#service {
?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]),
{not_available, St};
-
try_sending_messages(#service {
name = SvcName,
+ data_link_module = DataLinkModule,
network_address = NetworkAddress,
messages_tid = Tid } = Service, St) ->
?debug("schedule:try_send(): SvcName: ~p", [SvcName]),
?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]),
-
+
%% Extract the first message of the queue.
case ets:first(Tid) of
%% No more messages to send.
'$end_of_table' ->
?debug("schedule:try_send(): Nothing to send"),
- { ok, St};
+ { ok, St };
Key ->
+ St1 = forward_to_protocol
?debug("schedule:try_send(): Sending: ~p", [Key]),
%% Extract first message and try to send it
case ets:lookup(Tid, Key) of
[ Msg ] ->
- case protocol_rpc:send_message(
- St#st.cs,
- SvcName ,
- Msg#message.timeout,
- NetworkAddress,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate) of
-
- [ok] ->
- %% Send successful - delete entry.
- ets:delete(Tid, Key),
-
- %% Delete timeout ref
- erlang:cancel_timer(Msg#message.timeout_tref),
-
- %% Send the rest.
- try_sending_messages(Service, St);
-
- [Err] ->
- ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]),
- %% Failed to send message, leave in queue and err out.
- { {send_failed, Err}, St}
- end;
+ %% Wipe from ets table and cancel timer
+ ets:delete(Tid, Key),
+ erlang:cancel_timer(Msg#message.timeout_tref),
+
+ %% Forward to protocol and resend
+ forward_to_protocol(Service, Msg, St);
+
_ ->
?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]),
{ ok, St}
@@ -454,60 +622,6 @@ try_sending_messages(#service {
end.
-%% Mark a service as available, as reported by data_link_up
-%% If the service does not exist, it will be created as a host
-%% for future messages destined to that service.
-%%
-%% If the service does exist, its network address will be upadted
-%% with whatever address was reported to this function.
-%%
-%% Once the service has been created or updated, we will attempt
-%% to send any messages queued inside it.
-%%
-service_available(SvcName, NetworkAddress, St) ->
- ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]),
-
- %% Find or create the service.
- {ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St),
-
- try_sending_messages(Svc, NSt1).
-
-service_unavailable(SvcName, #st { services_tid = SvcTid } = St) ->
- ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]),
- ets:delete(SvcTid, SvcName),
- { ok, St }.
-
-
-bring_up_data_link(CompSpec, SvcName) ->
- %% Resolve the service to a network address that we can
- %% use to bring up the data link
- case service_discovery_rpc:resolve_remote_service(CompSpec, SvcName) of
-
- [ ok, Address] ->
- %% Tell data link to bring up a communicationc hannel.
- case data_link_bert_rpc_rpc:setup_data_link(CompSpec, Address) of
- [ ok ] ->
- ok;
- [ already_connected ] ->
- already_connected;
- Que ->
- ?info("schedule:bring_up_data_link() Failed:~p.", [Que]),
- ok
- end;
-
- [ not_found ] ->
- ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p."
- " Service not found.",
- [ SvcName ]),
- not_found;
-
- Err ->
- ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p",
- [ SvcName, Err ]),
- err
- end.
-
-
%%
%% data_link_up has reported that multiple services are now
%% available at NetworkAddress.
@@ -519,7 +633,7 @@ multiple_services_available([], _NetworkAddress, St) ->
{ok, St};
multiple_services_available([ Svc | T], NetworkAddress, St) ->
- {ok, NSt} = service_available(Svc, NetworkAddress, St),
+ {ok, NSt} = service_available(Svc, NetworkAddress, St),
multiple_services_available(T, NetworkAddress, NSt).
multiple_services_unavailable([], St) ->
@@ -555,7 +669,6 @@ find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid
update_service_network_address(Svc2, NetworkAddress, St)
end.
-
%%
%% Catch where no network address update is necessary.
%%
@@ -585,6 +698,7 @@ update_service_network_address(#service {} = Service, NetworkAddress, St) ->
create_service(ServiceName, retain_existing_address, St) ->
create_service(ServiceName, unknown_network_address, St);
+
%% Create a new service and return the new state (not currently modified)
%% and the newly initialized service revord.
%%
@@ -615,20 +729,21 @@ create_transaction_id(St) ->
%% schedulers?
{ ID, St#st { next_transaction_id = ID + 1 }}.
-%% Calculate a relative timeout based on the UTC TS we are provided with.
-calculate_timeout_period(UTC) ->
+%% Calculate a relative timeout based on the UnixTime TS we are provided with.
+calc_relative_tout(UnixTime) ->
{ Mega, Sec, _Micro } = now(),
Now = Mega * 1000000 + Sec,
- ?debug("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p", [ UTC, Now, UTC - Now ]),
+ ?debug("schedule:calc_relative_tout(): Timeout(~p) - Now(~p) = ~p", [ UnixTime, Now, UnixTime - Now ]),
%% Cap the timeout value at something reasonable
TOut =
- case UTC - Now >= 4294967295 of
+ case UnixTime - Now >= 4294967295 of
true ->
- ?info("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p: Truncated to 4294967295", [ UTC, Now, UTC - Now ]),
+ ?info("schedule:calc_relative_tout(): Timeout(~p) - Now(~p) = ~p: "
+ "Truncated to 4294967295", [ UnixTime, Now, UnixTime - Now ]),
4294967295;
- false -> UTC - Now
+ false -> UnixTime - Now
end,
case TOut =< 0 of
@@ -651,3 +766,20 @@ do_timeout_callback(CompSpec, Service,
do_timeout_callback(_,_,_) ->
ok.
+%% Kill off a service that is no longer used.
+%%
+delete_unused_service(SvcTid, Svc) ->
+ %% Do we have messages waiting for this service?
+ case ets:first(Svc#service.messages_tid) of
+ %% Nope.
+ '$end_of_table' ->
+ ets:delete(Svc#messages_tid),
+ ets:delete(SvcTid, { Service, DataLinkModule})
+ %% Update the network address, if it differs, and return
+ %% the new service / State as {ok, NSvc, false, NSt}
+ ?debug("schedule:service_unavailable(): Service ~p:~p now has no address.",
+ [ DataLinkModule, Service ]),
+ true;
+
+ _ -> false
+ end.
diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl
index 238d860..8589df0 100644
--- a/components/schedule/src/schedule_sup.erl
+++ b/components/schedule/src/schedule_sup.erl
@@ -35,5 +35,6 @@ init([]) ->
{ok, { {one_for_one, 5, 10},
[
?CHILD(schedule_rpc, worker)
+ ?CHILD(router, worker)
]} }.