summaryrefslogtreecommitdiff
path: root/components/schedule
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-10 18:57:50 -0700
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-10 18:57:50 -0700
commitffe60f97c1ec46aae1b62f0867b755095cf659ec (patch)
treef7cd64100cbd6082a4c51f3dcad8394cf4ad6657 /components/schedule
parentf3f5e6b2929c53f70d2efca7d03b7e58931a21f3 (diff)
downloadrvi_core-ffe60f97c1ec46aae1b62f0867b755095cf659ec.tar.gz
Temp
Diffstat (limited to 'components/schedule')
-rw-r--r--components/schedule/src/rvi_routing.erl17
-rw-r--r--components/schedule/src/rvi_schedule.erl1
-rw-r--r--components/schedule/src/schedule_rpc.erl623
-rw-r--r--components/schedule/src/schedule_sup.erl4
4 files changed, 280 insertions, 365 deletions
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index d5b0548..0e6dfbe 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -9,6 +9,9 @@
-behaviour(gen_server).
+
+-include_lib("lager/include/log.hrl").
+
%% API
-export([get_service_routes/1]).
-export([start_link/0]).
@@ -88,7 +91,7 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
handle_call( { rvi_get_service_routes, Service }, _From, St) ->
- {reply, normalize_routes_(find_routes_(Service, St#st.routes), []), t};
+ {reply, normalize_routes_(find_routes_( St#st.routes, Service), []), t};
handle_call(_Request, _From, St) ->
Reply = ok,
@@ -179,8 +182,13 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen )
false ->
%% Continue with the old routes and matching len installed
find_routes_(T, Service, CurRoutes, CurMatchLen)
- end.
+ end;
+find_routes_(Rt, Svc, CurRoutes, CurMatchLen) ->
+ ?info("----------------Failed on ~p", [Rt]),
+ { x, y}.
+
+
find_routes_(Routes, Service) ->
case find_routes_(Routes, Service, undefined, 0) of
{ undefined, 0 } ->
@@ -203,6 +211,5 @@ normalize_routes_({ServicePrefix, [ { Pr, { DL, DLOp }} | Rem ]}, Acc) ->
normalize_routes_({ServicePrefix, [ {{ Pr, PrOp}, DL } | Rem ]}, Acc) ->
normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
-normalize_routes_({ServicePrefix, [ {{ Pr, PrOp }, DL} | Rem ]}, Acc) ->
- normalize_routes_({ServicePrefix, Rem}, [ { {Pr, PrOp}, { DL, [] } } | Acc]).
-
+normalize_routes_({ServicePrefix, [ {Pr, DL} | Rem ]}, Acc) ->
+ normalize_routes_({ServicePrefix, Rem}, [ { {Pr, []}, { DL, [] } } | Acc]).
diff --git a/components/schedule/src/rvi_schedule.erl b/components/schedule/src/rvi_schedule.erl
index 00a142a..2b4606e 100644
--- a/components/schedule/src/rvi_schedule.erl
+++ b/components/schedule/src/rvi_schedule.erl
@@ -7,6 +7,7 @@
%%
-module(rvi_schedule).
+
-include_lib("rvi_common/include/rvi_common.hrl").
-callback schedule_message(CompSpec :: #component_spec{},
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index 310823c..7a1b23a 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -8,20 +8,17 @@
-module(schedule_rpc).
-behaviour(gen_server).
--behaviour(rvi_schedule).
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
%% API
-export([start_link/0]).
--export([schedule_message/6,
- register_remote_services/3,
- unregister_remote_services/2]).
+-export([schedule_message/6]).
%% Invoked by service discovery
%% FIXME: Should be rvi_service_discovery behavior
-export([service_available/3,
- service_unavailable/3]
+ service_unavailable/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -40,12 +37,10 @@
%%
%% Service -> ETS -> Messages
-record(service, {
- name = "", %% Fully qualified service name.
- %% Set to { IP, Port} when service is available, else unknown_network_address.
- address = unknown,
+ key = { "", unknown }, %% Service name and data link modu
- %% Targeted data link module
- data_link_module = unknown
+ %% Is this service currently available on the data link module
+ available = false,
%% Table containing #message records,
%% indexed by their transaction ID (and sequence of delivery)
@@ -117,7 +112,7 @@ init([]) ->
cs = rvi_common:get_component_specification(),
services_tid = ets:new(rvi_schedule_services,
[ set, private,
- { keypos, #service.name } ])}}.
+ { keypos, #service.key } ])}}.
start_json_server() ->
rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup).
@@ -139,35 +134,19 @@ schedule_message(CompSpec,
[status, transaction_id], CompSpec).
-register_remote_services(CompSpec, NetworkAddress, Services) ->
- rvi_common:notification(schedule, ?MODULE,
- register_remote_services,
- [{ network_address, NetworkAddress} ,
- { services, Services }],
- CompSpec).
-
-
-unregister_remote_services(CompSpec, ServiceNames) ->
- rvi_common:notification(schedule, ?MODULE,
- unregister_remote_services,
- [{ services, ServiceNames }],
- CompSpec).
-
-service_available(Service, DataLinkModule, Address) ->
+service_available(CompSpec, SvcName, DataLinkModule) ->
rvi_common:notification(schedule, ?MODULE,
service_available,
- [{ service, Service },
- { data_link_module, DataLinkModule },
- { address, Address }],
+ [{ service, SvcName },
+ { data_link_mod, DataLinkModule } ],
CompSpec).
-service_unavailable(Service, DataLinkModule, Address) ->
+service_unavailable(CompSpec, SvcName, DataLinkModule) ->
rvi_common:notification(schedule, ?MODULE,
service_unavailable,
- [{ service, Service },
- { data_link_module, DataLinkModule },
- { address, Address }],
+ [{ service, SvcName },
+ { data_link_mod, DataLinkModule } ],
CompSpec).
%% JSON-RPC entry point
@@ -197,54 +176,28 @@ handle_rpc("schedule_message", Args) ->
{ transaction_id, TransID } ] };
-
-
handle_rpc(Other, _Args) ->
?debug("schedule_rpc:handle_rpc(~p): unknown", [ Other ]),
{ok, [ {status, rvi_common:json_rpc_status(invalid_command)}]}.
-handle_notification("register_remote_services", Args) ->
- {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
- {ok, Services} = rvi_common:get_json_element(["services"], Args),
- ?debug("schedule_notification:register_remote_services(): network_address: ~p", [ NetworkAddress]),
- ?debug("schedule_notification:register_remote_services(): services: ~p", [ Services]),
-
- gen_server:cast(?SERVER, { rvi, register_remote_services,
- [ NetworkAddress,
- Services ]}),
-
- ok;
-
-handle_notification("unregister_remote_services", Args) ->
- {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args),
- ?debug("schedule_notification:unregister_remote_services(): services ~p",
- [ DiscountinuedServices]),
-
- gen_server:cast(?SERVER, { rvi, unregister_remote_services,
- [ DiscountinuedServices ]}),
- ok;
handle_notification("service_available", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
gen_server:cast(?SERVER, { rvi, service_available,
- [ Service,
- DataLinkModule,
- Address ]}),
+ [ SvcName,
+ DataLinkModule ]}),
ok;
handle_notification("service_unavailable", Args) ->
- {ok, Service} = rvi_common:get_json_element(["service"], Args),
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- {ok, Address} = rvi_common:get_json_element(["address"], Args),
+ {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
gen_server:cast(?SERVER, { rvi, service_unavailable,
- [ Service,
- DataLinkModule,
- Address ]}),
+ [ SvcName,
+ DataLinkModule ]}),
ok;
@@ -264,20 +217,28 @@ handle_call( { rvi, schedule_message,
?debug("schedule:sched_msg(): parameters: ~p", [Parameters]),
?debug("schedule:sched_msg(): signature: ~p", [Signature]),
?debug("schedule:sched_msg(): certificate ~p", [Certificate]),
- ?debug("schedule:sched_msg(): St: ~p", [St]),
+ %%?debug("schedule:sched_msg(): St: ~p", [St]),
%%
%% Retrieve the routes that we should try for this message
%%
- case rvi_routing:get_service_routing(SvcName) of
+ case rvi_routing:get_service_routes(SvcName) of
not_found ->
{reply, [ no_route ], St };
Routes ->
- { NewTransID, NSt1} = create_transaction_id(St),
- NTS2 = queue_message(SvcName, Routes, Timeout, Parameter, Signature);
-
- { reply, [ok, TransID], NS2 }
+ { TransID, NSt1} = create_transaction_id(St),
+ NSt2 = queue_message(SvcName,
+ TransID,
+ Routes,
+ Timeout,
+ calc_relative_tout(Timeout),
+ Parameters,
+ Signature,
+ Certificate,
+ NSt1),
+
+ { reply, [ok, TransID], NSt2 }
end;
@@ -297,71 +258,40 @@ handle_call(Other, _From, St) ->
%% @end
%%--------------------------------------------------------------------
-handle_cast( {rvi, register_remote_services,
- [ NetworkAddress, Services]}, St) ->
-
- ?info("schedule:register_remote_services(): services(~p) -> ~p",
- [Services, NetworkAddress]),
-
- {ok, NSt} = multiple_services_available(Services, NetworkAddress, St),
- {noreply, NSt};
+handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) ->
+ %% Find or create the service.
+ ?debug("schedule:service_available(): ~p:~p", [ DataLinkModule, SvcName ]),
-handle_cast( {rvi, unregister_remote_services, [ServiceNames]}, St) ->
- ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]),
- {ok, NSt} = multiple_services_unavailable(ServiceNames, St),
- {noreply, NSt };
+ SvcRec = update_service(SvcName, DataLinkModule, available, St),
-handle_cast( {rvi, service_available, [Service, DataLinkModule, Address]}, St) ->
- %% Find the service
- case ets:lookup(SvcTid, {Service, DataLinkModule}) of
- [] -> %% No service found - Just unsubscribe. Shouldn't really happen
- service_discovery_rpc:
- unsubscribe_to_service_availability({Service, DataLinkModule},
- ?MODULE);
-
- { noreply, St };
+ %% Try to send any pending messages.
+ { _, NSt} = try_sending_messages(SvcRec, St),
- [ Svc ] ->
- %% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
- ?debug("schedule:service_unavailable(): Service ~p:~p now has address~p.",
- [ DataLinkModule, Service, Address ]),
- update_service_network_address(Svc, Address, St),
- { _, NSt2 } = try_sending_messages(Svc, NSt1),
- { noreply, NSt2}
- end.
+ { noreply, NSt};
-handle_cast( {rvi, service_unavailable, [Service, DataLinkModule, Address]},
+handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
#st { services_tid = SvcTid } = St) ->
- %% Find the service
- case ets:lookup(SvcTid, {Service, DataLinkModule}) of
+ %% Grab the service
+ case ets:lookup(SvcTid, {SvcName, DataLinkModule}) of
[] -> %% No service found - no op.
{noreply, St};
- [ Svc ] ->
- %% Delete service if it is unused.
- case delete_unused_service(SvcTid, Svc) of
+ [ SvcRec ] ->
+ %% Delete service if it does not have any pending messages.
+ case delete_unused_service(SvcTid, SvcRec) of
true -> %% service was deleted
-
- %% Unsubscribe from service availablility notifications
- service_discovery_rpc:
- unsubscribe_to_service_availability({ Service, DataLinkModule},
- ?MODULE);
-
{ noreply, St};
- false -> %% Service was not deleted, update its network address to unknown
- { _, _, _, NSt} =
- update_service_network_address(Svc, unknown_network_address, St),
-
- { noreply, NSt }
+ false -> %% SvcName was not deleted, set it to not available
+ update_service(SvcName, DataLinkModule, unavailable, St),
+
+ { noreply, St }
end
- end.
- { noreply, St};
+ end;
handle_cast(Other, St) ->
@@ -381,24 +311,41 @@ handle_cast(Other, St) ->
%% Handle timeouts
handle_info({ rvi_message_timeout, SvcName, DLMod, TransID},
- #st { cs = CompSpec, services_tid = SvcTid } = St) ->
+ #st { services_tid = SvcTid } = St) ->
+ %% Look up the service / DataLink mod
case ets:lookup(SvcTid, {SvcName, DLMod}) of
- [ Svc ] ->
- %% Delete from ets.
- case ets:lookup(Svc#service.messages_tid, TransID) of
+ [ SvcRec ] -> %% Found service for specific data link
+
+ %% Delete message from service queue
+ case ets:lookup(SvcRec#service.messages_tid, TransID) of
[ Msg ] ->
?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]),
- ets:delete(Svc#service.messages_tid, TransID),
- queue_message(
+ ets:delete(SvcRec#service.messages_tid, TransID),
+
+ %% Try to requeue message
+ { _Res, NSt } =
+ queue_message(SvcName,
+ Msg#message.transaction_id,
+ Msg#message.routes,
+ Msg#message.timeout,
+ calc_relative_tout(Msg#message.timeout),
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+ {noreply, NSt};
+
_ ->
?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing",
[ TransID, SvcName]),
- ok
+
+ {noreply, St}
+
end;
- _-> ok
- end,
- {noreply, St};
+ _ -> {noreply, St}
+
+ end;
handle_info(_Info, St) ->
@@ -435,289 +382,234 @@ code_change(_OldVsn, St, _Extra) ->
+%%
%% No more routes to try
-queue_message(_SvcName,
+%%
+queue_message(SvcName,
_TransID,
- _MessageTimeout
[ ],
+ _MessageTimeout,
+ _RelativeTimeout,
_Parameters,
_Signature,
_Certificate, St) ->
%% FIXME: Handle route failure
+ ?notice("schedule:queue_message(): Ran out of routes to try for ~p", [SvcName]),
{ route_failed, St };
+%%
+%% The message has timed out
+%%
+queue_message(SvcName,
+ TransID,
+ _Routes,
+ _MessageTimeout,
+ -1, %% We are timed out
+ _Parameters,
+ _Signature,
+ _Certificate,
+ St) ->
+ do_timeout_callback(St#st.cs, SvcName, TransID),
+ { timeout, St };
+
+
+%% Try to queue message
queue_message(SvcName,
TransID,
- [ { { Pr, PrOp }, { DL, DLOp } } | RemainingRoutes ],
- MessageTimeout,
+ [ { { _Pr, _PrOp }, { DLMod, DLOp } } | RemainingRoutes ],
+ Timeout,
+ RelativeTimeout,
Parameters,
Signature,
Certificate,
St) ->
+
%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]),
%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]),
%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]),
%% ?info("schedule:sched_msg(): signature: ~p", [Signature]),
%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]),
- %% Calculate how many msec we have until message times out
+ SvcRec = find_or_create_service(SvcName, DLMod, St),
- RelativeTimeout = calc_relative_tout(MessageTimeout),
-
- %% Did we run out of time for the message?
- case RelativeTimeout > 1 of
- %% Nope
- true ->
- {ok, Service, IsNewService, NSt1 } = find_or_create_service({SvcName, DL}, St),
-
- Msg = #message {
- service = SvcName,
- transaction_id = TransID,
- data_link = { DLMod, DLOp },
- routes = RemaingRoutes,
- message_timeout = MessageTimeout,
- route_timeout_ref = TRef,
- parameters = Parameters,
- signature = Signature,
- certificate = Certificate
- },
-
-
- %% Add to ets table
- TransID = ets:insert(Service#service.messages_tid, Msg),
-
- %% If this is a new service, subscribe to updates on this service's availablity.
- %% If service is already available, we will get service_available/3 notification
- %% immediately.
- case IsNewService of
- true ->
- service_discovery_rpc:
- subscribe_to_service_availability({SvcName, DLMod},
- ?MODULE);
- false ->
- ok
- end,
+ %% The service may already be available, give it a shot.
+ case try_sending_messages(SvcRec, St) of
+ { ok, NSt } -> %% We managed to send the message. Done.
+ { ok, NSt };
+ { _ , NSt } -> %% Failed to send message. Setup data link
+
%%
%% Bring up the relevant data link for the given route.
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
%%
- case DLMod:setup_data_link(CompSpec, SvcName, DLOp) of
+ case DLMod:setup_data_link(NSt#st.cs, SvcName, DLOp) of
{ ok, DLTimeout } ->
- %% Setup a timeout to be
- TRef = erlang:send_after(min(RelativeMessageTimeout, DLTimeout),
+
+ %% Setup a timeout that triggers on whatever
+ %% comes first of the message's general timeout
+ %% or the timeout of the data link bringup
+ TRef = erlang:send_after(min(RelativeTimeout, DLTimeout),
self(),
- { rvi_message_timeout, SvcName, DLMod, TransID });
-
+ { rvi_message_timeout, SvcName, DLMod, TransID }),
+
+ %% Setup a message to be added to the service / dl table.
+ Msg = #message {
+ transaction_id = TransID,
+ data_link = { DLMod, DLOp },
+ routes = RemainingRoutes,
+ timeout = Timeout,
+ timeout_tref = TRef,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ },
+
+
+ %% Add to ets table
+ TransID = ets:insert(SvcRec#service.messages_tid, Msg),
+ {ok, NSt};
+
%% We failed with this route. Try the next one
{ error, _Reason} ->
queue_message(SvcName,
TransID,
- RelativeTimeout,
+ Timeout,
+ calc_relative_tout(Timeout),
RemainingRoutes,
Parameters,
Signature,
Certificate,
- St)
- end;
-
- %% We are out of time
- false ->
- do_timeout_callback(CompSpec, Svc, Msg),
- { timeout, St}
+ NSt)
+ end
end.
-forward_to_protocol(Svc, Msg, St) ->
- { DataLinkMod, DataLinkOpts } = Msg#message.data_link,
- { ProtoMod, ProtoOpts } = Msg#messge.protocol,
-
- case ProtoMod:send_message(
- St#st.cs,
- SvcName,
- Msg#message.timeout,
- ProtoOpts,
- DataLinkMod,
- DataLinkOpts,
- NetworkAddress,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate) of
-
- %% Success
- [ok] ->
- %% Send the rest.
- try_sending_messages(Service, St);
-
- %% Failed
- [Err] ->
- ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p",
- [ProtocolMod, DataLinkMod, SvcName, NetworkAddress, Err]),
-
- %% Requeue this message with the next route
- { _, St1} = queue_message(SvcName,
- TransID,
- Msg#message.timeout,
- Msg#message.routes,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate,
- St),
-
- %% Send the rest of the messgages targeting
- %% the same service through the same data link
- try_sending_messages(Svc, St)
- end.
-
-
-
-%% Check if we can send messages queued up under the given service.
+%% The service is not available
try_sending_messages(#service {
- name = SvcName,
- network_address = unknown_network_address,
- messages_tid = _Tid } = _Service, St) ->
+ key = { SvcName, _ },
+ available = false,
+ messages_tid = _Tid } = _SvcRec, St) ->
+
?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]),
- {not_available, St};
+ { not_available, St };
try_sending_messages(#service {
- name = SvcName,
- data_link_module = DataLinkModule,
- network_address = NetworkAddress,
- messages_tid = Tid } = Service, St) ->
+ key = { SvcName, DataLinkMod },
+ available = true,
+ messages_tid = Tid } = SvcRec, St) ->
- ?debug("schedule:try_send(): SvcName: ~p", [SvcName]),
- ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]),
+ ?debug("schedule:try_send(): Service: ~p:~p", [DataLinkMod, SvcName]),
%% Extract the first message of the queue.
- case ets:first(Tid) of
- %% No more messages to send.
- '$end_of_table' ->
+ case first_service_message(SvcRec) of
+ empty ->
?debug("schedule:try_send(): Nothing to send"),
{ ok, St };
- Key ->
- St1 = forward_to_protocol
- ?debug("schedule:try_send(): Sending: ~p", [Key]),
- %% Extract first message and try to send it
- case ets:lookup(Tid, Key) of
- [ Msg ] ->
- %% Wipe from ets table and cancel timer
- ets:delete(Tid, Key),
- erlang:cancel_timer(Msg#message.timeout_tref),
-
- %% Forward to protocol and resend
- forward_to_protocol(Service, Msg, St);
-
- _ ->
- ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]),
- { ok, St}
-
+ yanked ->
+ ?info("schedule:try_send(): Message was yanked while trying to send: ~p",
+ [SvcRec#service.key]),
+ { ok, St};
+
+ Msg ->
+ %% Wipe from ets table and cancel timer
+ ets:delete(Tid, Msg#message.transaction_id),
+
+ erlang:cancel_timer(Msg#message.timeout_tref),
+
+ %% Forward to protocol.
+ { _, DataLinkOpts } = Msg#message.data_link,
+ { ProtoMod, ProtoOpts } = Msg#message.protocol,
+
+ %% Send off message to the correct protocol module
+ case ProtoMod:send_message(
+ St#st.cs,
+ SvcName,
+ Msg#message.timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate) of
+
+ %% Success
+ [ok] ->
+ %% Send the rest of the messages associated with this service/dl
+ try_sending_messages(SvcRec, St);
+
+ %% Failed
+ [Err] ->
+ ?info("schedule:try_send(): No send: ~p:~p:~p -> ~p : ~p",
+ [ProtoMod, DataLinkMod, SvcName, Err]),
+
+ %% Requeue this message with the next route
+ { _, St1} = queue_message(SvcName,
+ Msg#message.transaction_id,
+ Msg#message.timeout,
+ calc_relative_tout(Msg#message.timeout),
+ Msg#message.routes,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+
+ %% Send the rest of the messgages targeting
+ %% the same service through the same data link
+ %% If they all fail due to some data link error,
+ %% they will all be requeued to the next route.
+ try_sending_messages(SvcRec, St1)
end
end.
-%%
-%% data_link_up has reported that multiple services are now
-%% available at NetworkAddress.
-%% Iterate through all services and mark them as available,
-%% possibly sending any pending message targeting the newly activated
-%% service.
-%%
-multiple_services_available([], _NetworkAddress, St) ->
- {ok, St};
-
-multiple_services_available([ Svc | T], NetworkAddress, St) ->
- {ok, NSt} = service_available(Svc, NetworkAddress, St),
- multiple_services_available(T, NetworkAddress, NSt).
-
-multiple_services_unavailable([], St) ->
- {ok, St};
-
-multiple_services_unavailable([ SvcName | T], St) ->
- {ok, NSt} = service_unavailable(SvcName, St),
- multiple_services_unavailable(T, NSt).
+find_or_create_service(SvcName, DataLinkMod, #st { services_tid = SvcTid } = St) ->
+ ?debug("schedule:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]),
-
-find_or_create_service(ServiceName, St) ->
- %% Invoke with retain to keep any exising network addresses already
- %% in place in an existing service.
- find_or_create_service(ServiceName, retain_existing_address, St).
-
-find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) ->
- ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]),
-
- case ets:lookup(SvcTid, ServiceName) of
+ case ets:lookup(SvcTid, { SvcName, DataLinkMod }) of
[] -> %% The given service does not exist, create it.
- ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]),
- create_service(ServiceName, NetworkAddress, St);
-
- [ Svc1 ] when NetworkAddress =:= retain_existing_address ->
- %% We found a service, and are instructed not to touch
- %% its network address. Return it.
- { ok, Svc1, false, St};
+ ?debug("schedule:find_or_create_service(): Creating new ~p", [ SvcName]),
+ update_service(SvcName, DataLinkMod, unavailable, St);
- [ Svc2 ] ->
+ [ SvcRec ] ->
%% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
- ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]),
- update_service_network_address(Svc2, NetworkAddress, St)
+ %% the new service / State as {ok, NSvcRec, false, NSt}
+ ?debug("schedule:find_or_create_service(): Updating existing ~p", [ SvcName]),
+ SvcRec
end.
-%%
-%% Catch where no network address update is necessary.
-%%
-update_service_network_address(#service { network_address = NetworkAddress } = Service,
- NetworkAddress, St) ->
- { ok, Service, false, St}; %% False indicates that the service exists.
-
-
-%%
-%% Update the service in the ets table with a new network address.
-%%
-update_service_network_address(#service {} = Service, NetworkAddress, St) ->
- %% Create a new service.
- NewService = Service#service { network_address = NetworkAddress},
- %% Replace existing serviceo in the ets table of services.
- ets:insert(St#st.services_tid, [ NewService ]),
- { ok, NewService, false, St}. %% False indicates that the service exists.
-
-
-
-
-%% If we are creating a new service, we need to remap the
-%% retain_existing_address to unknown_network_address in order to get the correct
-%% initial state, which is a created service with an unknown network address.
-create_service(ServiceName, retain_existing_address, St) ->
- create_service(ServiceName, unknown_network_address, St);
-
-
-%% Create a new service and return the new state (not currently modified)
-%% and the newly initialized service revord.
+%% Create a new service, or update an existing one, and return the new
+%% state (not currently modified) and the newly initialized service
+%% revord.
%%
-create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) ->
- Svc = #service {
- name = ServiceName,
- network_address = NetworkAddress,
- messages_tid = ets:new(rvi_messages,
- [ ordered_set, private,
- { keypos, #message.transaction_id } ])
- },
+update_service(SvcName, DataLinkMod, Available,
+ #st { services_tid = SvcsTid, cs = CS }) ->
+ %% Return new service and existing state.
+ ?debug("schedule:create_service(): ~p:~p ", [ DataLinkMod, SvcName]),
+ SvcRec = #service {
+ key = { SvcName, DataLinkMod },
+ available = Available,
+ messages_tid = ets:new(rvi_messages,
+ [ ordered_set, private,
+ { keypos, #message.transaction_id } ])
+ },
%% Insert new service to ets table.
- ets:insert(SvcsTid, Svc),
-
- %% Return new service and existing state.
- ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]),
- ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]),
- { ok, Svc, true, St}. %% True indicates that the service is newly created.
+ ets:insert(SvcsTid, SvcRec),
+
+ %% Subscribe to updates on the availbility of this service.
+ service_discovery_rpc:subscribe(CS, SvcName, DataLinkMod),
+
+ SvcRec.
%% Create a new and unique transaction id
@@ -748,7 +640,7 @@ calc_relative_tout(UnixTime) ->
case TOut =< 0 of
true ->
- 1; %% One millisec is the smallest value we will time out on
+ -1; %% We have timed out
false ->
TOut * 1000
@@ -756,30 +648,45 @@ calc_relative_tout(UnixTime) ->
%% Handle a callback for a timed out message.
-do_timeout_callback(CompSpec, Service,
- #message {transaction_id = TransID}) ->
- service_edge_rpc:handle_local_timeout(CompSpec, Service, TransID),
- ok;
+do_timeout_callback(CompSpec, SvcName, TransID) ->
+ service_edge_rpc:handle_local_timeout(CompSpec, SvcName, TransID),
+ ok.
-%% callback element of #message is not an {M,F,A} format, ignore.
-do_timeout_callback(_,_,_) ->
- ok.
%% Kill off a service that is no longer used.
%%
-delete_unused_service(SvcTid, Svc) ->
+delete_unused_service(SvcTid, SvcRec) ->
%% Do we have messages waiting for this service?
- case ets:first(Svc#service.messages_tid) of
+ case ets:first(SvcRec#service.messages_tid) of
%% Nope.
'$end_of_table' ->
- ets:delete(Svc#messages_tid),
- ets:delete(SvcTid, { Service, DataLinkModule})
+ ets:delete(SvcRec#service.messages_tid),
+ ets:delete(SvcTid, SvcRec#service.key),
+
+ { SvcName, DataLinkMod} = SvcRec#service.key,
+ %% Unsubscribe from service availablility notifications
+ service_discovery_rpc:
+ unsubscribe_to_service_availability(SvcName, DataLinkMod, ?MODULE),
%% Update the network address, if it differs, and return
- %% the new service / State as {ok, NSvc, false, NSt}
+ %% the new service / State as {ok, NSvcRec, false, NSt}
?debug("schedule:service_unavailable(): Service ~p:~p now has no address.",
- [ DataLinkModule, Service ]),
+ [ SvcRec#service.key ]),
true;
_ -> false
end.
+
+first_service_message(#service { messages_tid = Tid }) ->
+ case ets:first(Tid) of
+ '$end_of_table' ->
+ empty;
+
+ Key ->
+ case ets:lookup(Tid, Key) of
+ [ Msg ] -> Msg;
+ [] -> yanked
+ end
+ end.
+
+
diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl
index 8589df0..76c403e 100644
--- a/components/schedule/src/schedule_sup.erl
+++ b/components/schedule/src/schedule_sup.erl
@@ -34,7 +34,7 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10},
[
- ?CHILD(schedule_rpc, worker)
- ?CHILD(router, worker)
+ ?CHILD(schedule_rpc, worker),
+ ?CHILD(rvi_routing, worker)
]} }.