summaryrefslogtreecommitdiff
path: root/components/schedule
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-10-06 12:56:11 +0200
committerUlf Wiger <ulf@feuerlabs.com>2015-11-20 13:43:07 -0800
commit34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (patch)
treeedfb4bb844c3b90565e7a0bb00f678703d084188 /components/schedule
parente6299ff287e767dae71fb47009f9bf4620cc3d78 (diff)
downloadrvi_core-34aa86b5a2e97650fe6299ccf794d5eb5d052d91.tar.gz
w.i.p. transition to jsx json codec
Diffstat (limited to 'components/schedule')
-rw-r--r--components/schedule/src/schedule_rpc.erl258
1 files changed, 129 insertions, 129 deletions
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index 5dbce1c..bf19dda 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -25,7 +25,7 @@
terminate/2, code_change/3]).
-export([start_json_server/0]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-export([handle_rpc/2,
@@ -33,7 +33,7 @@
%% Message structure and storage
-%% A message is a piece of
+%% A message is a piece of
%%
%% Service -> ETS -> Messages
-record(service, {
@@ -41,8 +41,8 @@
%% Is this service currently available on the data link module
available = false,
-
- %% Table containing #message records,
+
+ %% Table containing #message records,
%% indexed by their transaction ID (and sequence of delivery)
messages_tid = undefined,
@@ -53,22 +53,22 @@
% A single message to be delivered to a service.
-% Messages are stored in ets tables hosted by a service
+% Messages are stored in ets tables hosted by a service
--record(message, {
+-record(message, {
transaction_id, %% Transaction ID that message is tagged with.
service, %% Target service
timeout, %% Timeout, UTC
data_link, %% Data Link Module to use. { Module, Opts}
protocol, %% Protocol to use. { Module Opts }
- routes, %% Routes retrieved for this
+ routes, %% Routes retrieved for this
timeout_tref, %% Reference to erlang timer associated with this message.
parameters,
signature
}).
--record(st, {
+-record(st, {
next_transaction_id = 1, %% Sequentially incremented transaction id.
services_tid = undefined,
cs %% Service specification
@@ -111,42 +111,42 @@ init([]) ->
CS = rvi_common:get_component_specification(),
service_discovery_rpc:subscribe(CS, ?MODULE),
- {ok, #st{
+ {ok, #st{
cs = CS,
- services_tid = ets:new(rvi_schedule_services,
- [ set, private,
+ services_tid = ets:new(rvi_schedule_services,
+ [ set, private,
{ keypos, #service.key } ])}}.
start_json_server() ->
rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup).
-schedule_message(CompSpec,
- SvcName,
- Timeout,
+schedule_message(CompSpec,
+ SvcName,
+ Timeout,
Parameters,
Signature) ->
-
- rvi_common:request(schedule, ?MODULE,
- schedule_message,
- [{ service, SvcName },
+
+ rvi_common:request(schedule, ?MODULE,
+ schedule_message,
+ [{ service, SvcName },
{ timeout, Timeout },
- { parameters, {struct, Parameters } },
- { signature, Signature }],
+ { parameters, Parameters },
+ { signature, Signature }],
[status, transaction_id], CompSpec).
service_available(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(schedule, ?MODULE,
- service_available,
+ rvi_common:notification(schedule, ?MODULE,
+ service_available,
[{ service, SvcName },
{ data_link_mod, DataLinkModule } ],
CompSpec).
service_unavailable(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(schedule, ?MODULE,
- service_unavailable,
+ rvi_common:notification(schedule, ?MODULE,
+ service_unavailable,
[{ service, SvcName },
{ data_link_mod, DataLinkModule } ],
CompSpec).
@@ -165,7 +165,7 @@ handle_rpc("schedule_message", Args) ->
%% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]),
?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]),
- [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message,
+ [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message,
[ SvcName,
Timeout,
{struct, Parameters},
@@ -185,16 +185,16 @@ handle_notification("service_available", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
- gen_server:cast(?SERVER, { rvi, service_available,
+ gen_server:cast(?SERVER, { rvi, service_available,
[ SvcName,
- list_to_atom(DataLinkModule) ]}),
+ list_to_existing_atom(DataLinkModule) ]}),
ok;
handle_notification("service_unavailable", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
- gen_server:cast(?SERVER, { rvi, service_unavailable,
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
[ SvcName,
list_to_atom(DataLinkModule) ]}),
@@ -205,8 +205,8 @@ handle_notification(Other, _Args) ->
ok.
handle_call( { rvi, schedule_message,
- [SvcName,
- Timeout,
+ [SvcName,
+ Timeout,
Parameters,
Signature] }, _From, St) ->
@@ -220,11 +220,11 @@ handle_call( { rvi, schedule_message,
{ TransID, NSt1} = create_transaction_id(St),
%% Queue the message
- {_, NSt2 }= queue_message(SvcName,
- TransID,
+ {_, NSt2 }= queue_message(SvcName,
+ TransID,
rvi_routing:get_service_routes(SvcName), %% Can be [] (no route)
- Timeout,
- Parameters,
+ Timeout,
+ Parameters,
Signature,
NSt1),
@@ -253,7 +253,7 @@ handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) ->
%% Find or create the service.
?debug("sched:service_available(): ~p:~s", [ DataLinkModule, SvcName ]),
-
+
%% Create a new or update an existing service.
SvcRec = update_service(SvcName, DataLinkModule, available, St),
@@ -267,7 +267,7 @@ handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) ->
{ noreply, NSt2 };
-handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
+handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
#st { services_tid = SvcTid } = St) ->
%% Grab the service
@@ -275,7 +275,7 @@ handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
[] -> %% No service found - no op.
{noreply, St};
- [ SvcRec ] ->
+ [ SvcRec ] ->
%% Delete service if it does not have any pending messages.
case delete_unused_service(SvcTid, SvcRec) of
true -> %% service was deleted
@@ -306,7 +306,7 @@ handle_cast(Other, St) ->
%%--------------------------------------------------------------------
%% Handle timeouts
-handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
+handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
#st { services_tid = SvcTid } = St) ->
?info("sched:timeout(~p:~p): trans_id: ~p", [ DLMod, SvcName, TransID]),
@@ -320,7 +320,7 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
?debug("sched:timeout(~p:~p): Rescheduling", [ DLMod, SvcName]),
ets:delete(SvcRec#service.messages_tid, TransID),
- %% Calculate
+ %% Calculate
TOut = calc_relative_tout(Msg#message.timeout),
%% Has the message itself, not only the current
@@ -331,11 +331,11 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
do_timeout_callback(St#st.cs, SvcName, TransID),
{noreply, St};
false ->
-
+
%% Try to requeue message
- { _Res, NSt } =
- queue_message(SvcName,
- Msg#message.transaction_id,
+ { _Res, NSt } =
+ queue_message(SvcName,
+ Msg#message.transaction_id,
Msg#message.routes,
Msg#message.timeout,
Msg#message.parameters,
@@ -344,8 +344,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
{noreply, NSt}
end;
- _ ->
- ?info("sched:timeout(): trans_id(~p) service(~p): Yanked while processing",
+ _ ->
+ ?info("sched:timeout(): trans_id(~p) service(~p): Yanked while processing",
[ TransID, SvcName]),
{noreply, St}
@@ -398,15 +398,15 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) ->
{ SvcName, _ } = SvcRec#service.key,
- TRef = erlang:send_after(RelativeTimeout, self(),
- { rvi_message_timeout,
+ TRef = erlang:send_after(RelativeTimeout, self(),
+ { rvi_message_timeout,
SvcName,
DataLinkMod,
Message#message.transaction_id }),
%% Add message to the service's queue, with an updated
%% timeout ref.
- ets:insert(SvcRec#service.messages_tid,
+ ets:insert(SvcRec#service.messages_tid,
Message#message { timeout_tref = TRef }),
ok.
@@ -418,44 +418,44 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) ->
%%
%% Stash the message in the unknown datalinvariant of the service
%% and opportunistically send it if the service
-queue_message(SvcName,
- TransID,
+queue_message(SvcName,
+ TransID,
[ ],
Timeout,
- Parameters,
- Signature,
+ Parameters,
+ Signature,
St) ->
- TOut = calc_relative_tout(Timeout),
- ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.",
- [ SvcName, TOut / 1000.0]),
- %% Stash in Service / orphaned
+ TOut = calc_relative_tout(Timeout),
+ ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.",
+ [ SvcName, TOut / 1000.0]),
+ %% Stash in Service / orphaned
SvcRec = find_or_create_service(SvcName, orphaned, St),
store_message(SvcRec,
orphaned,
#message {
- transaction_id = TransID,
+ transaction_id = TransID,
service = SvcName,
timeout = Timeout,
data_link = undefined,
protocol = undefined,
routes = [],
timeout_tref = 0,
- parameters = Parameters,
+ parameters = Parameters,
signature = Signature
- },
+ },
TOut),
{ok, St};
-
-
+
+
%% Try to queue message
-queue_message(SvcName,
- TransID,
+queue_message(SvcName,
+ TransID,
[ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ],
- Timeout,
- Parameters,
- Signature,
+ Timeout,
+ Parameters,
+ Signature,
St) ->
?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]),
@@ -469,24 +469,24 @@ queue_message(SvcName,
%% Bring up the relevant data link for the given route.
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
- %%
+ %%
Msg = #message {
- transaction_id = TransID,
+ transaction_id = TransID,
service = SvcName,
timeout = Timeout,
data_link = { DLMod, DLOpt },
protocol = { ProtoMod, ProtoOpt },
routes = RemainingRoutes,
timeout_tref = 0,
- parameters = Parameters,
+ parameters = Parameters,
signature = Signature
- },
-
+ },
+
case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of
[ ok, DLTimeout ] ->
TOut = select_timeout(calc_relative_tout(Timeout), DLTimeout),
- ?debug("sched:q(~p:~s): ~p seconds to compe up.",
+ ?debug("sched:q(~p:~s): ~p seconds to compe up.",
[ DLMod, SvcName, TOut / 1000.0]),
store_message(SvcRec, DLMod, Msg, TOut),
@@ -494,11 +494,11 @@ queue_message(SvcName,
[ already_connected, _] ->
%% The service may already be available, give it a shot.
- ?debug("sched:q(~p:~s): already up. Sending.",
- [ DLMod, SvcName]),
+ ?debug("sched:q(~p:~s): already up. Sending.",
+ [ DLMod, SvcName]),
%% Will re-queue message if cannot send.
- { _, NSt } =
+ { _, NSt } =
send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St),
{ ok, NSt };
@@ -507,26 +507,26 @@ queue_message(SvcName,
%% We failed to setup a data link. Try the next route.
%%
[ Err, _Reason] ->
- ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]),
+ ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]),
queue_message(SvcName,
TransID,
- RemainingRoutes,
+ RemainingRoutes,
Timeout,
Parameters,
- Signature,
+ Signature,
St)
end.
-
+
%% Send messages to locally connected service
send_message(local, _, _, _, Msg, St) ->
- ?debug("sched:send_msg(local:~s). WIll send to local",
- [ Msg#message.service]),
+ ?debug("sched:send_msg(local:~s). WIll send to local",
+ [ Msg#message.service]),
- service_edge_rpc:handle_remote_message(St#st.cs,
- Msg#message.service,
+ service_edge_rpc:handle_remote_message(St#st.cs,
+ Msg#message.service,
Msg#message.timeout,
Msg#message.parameters,
Msg#message.signature),
@@ -537,12 +537,13 @@ send_message(DataLinkMod, DataLinkOpts,
ProtoMod, ProtoOpts,
Msg, St) ->
- ?debug("sched:send_msg(): ~p:~p:~p",
- [ProtoMod, DataLinkMod, Msg#message.service]),
+ ?debug("sched:send_msg(): ~p:~p:~p",
+ [ProtoMod, DataLinkMod, Msg#message.service]),
%% Send off message to the correct protocol module
case ProtoMod:send_message(
St#st.cs,
+ Msg#message.transaction_id,
Msg#message.service,
Msg#message.timeout,
ProtoOpts,
@@ -552,27 +553,27 @@ send_message(DataLinkMod, DataLinkOpts,
Msg#message.signature) of
%% Success
- [ok] ->
+ [ok] ->
%% Send the rest of the messages associated with this service/dl
{ok, St};
-
+
%% Failed
[Err] ->
- ?info("sched:send_msg(): Failed: ~p:~p:~p -> ~p",
+ ?info("sched:send_msg(): Failed: ~p:~p:~p -> ~p",
[ProtoMod, DataLinkMod, Msg#message.service, Err]),
-
+
%% Requeue this message with the next route
queue_message(Msg#message.service,
Msg#message.transaction_id,
Msg#message.routes,
- Msg#message.timeout,
+ Msg#message.timeout,
Msg#message.parameters,
Msg#message.signature,
St)
end.
%% The service is not available
-send_queued_messages(#service {
+send_queued_messages(#service {
key = { SvcName, _ },
available = unavailable,
messages_tid = _Tid } = _SvcRec, St) ->
@@ -580,7 +581,7 @@ send_queued_messages(#service {
?info("sched:send(): SvcName: ~p: Not available", [SvcName]),
{ not_available, St };
-send_queued_messages(#service {
+send_queued_messages(#service {
key = { SvcName, DataLinkMod },
available = available,
messages_tid = Tid } = SvcRec, St) ->
@@ -593,8 +594,8 @@ send_queued_messages(#service {
?debug("sched:send(): Nothing to send"),
{ ok, St };
- yanked ->
- ?info("sched:send(): Message was yanked while trying to send: ~p",
+ yanked ->
+ ?info("sched:send(): Message was yanked while trying to send: ~p",
[SvcRec#service.key]),
{ ok, St};
@@ -617,7 +618,7 @@ send_queued_messages(#service {
%% where messages are placed while waiting for a final timeout after
%% all routes have failed.
%%
-%% If messages exist in the orphaned queue for SvcName,
+%% If messages exist in the orphaned queue for SvcName,
%% we will try to send them using the DataLink/Protocol combo
%% provided on the command line.
send_orphaned_messages(SvcName, local, St) ->
@@ -634,7 +635,7 @@ send_orphaned_messages(SvcName, local, St) ->
%% the given data link
%% Start chugging out messages
SvcRec ->
- send_orphaned_messages_(undefined, undefined,
+ send_orphaned_messages_(undefined, undefined,
local, undefined,
SvcRec, St)
end;
@@ -643,7 +644,7 @@ send_orphaned_messages(SvcName, local, St) ->
%% are placed while waiting for a final timeout after all routes
%% have failed.
%%
-%% If messages exist in the orphaned queue for SvcName,
+%% If messages exist in the orphaned queue for SvcName,
%% we will try to send them using the DataLink/Protocol combo
%% provided on the command line.
send_orphaned_messages(SvcName, DataLinkMod, St) ->
@@ -658,26 +659,26 @@ send_orphaned_messages(SvcName, DataLinkMod, St) ->
%% We have messages waiting for the service, but no protocol has been configured
%% for transmitting them over the given data link module
- { _, [] } ->
+ { _, [] } ->
?debug("sched:send_orph(~p:~p): No protocol configured. Skipped",
[DataLinkMod, SvcName]),
St;
-
+
%% We have orphaned messages for the service and
%% we have at least one protocol that we can use over
%% the given data link
%% Start chugging out messages
{ SvcRec, [{ Proto, ProtoOpts, DataLinkOpts } | _]} ->
- send_orphaned_messages_(Proto, ProtoOpts,
+ send_orphaned_messages_(Proto, ProtoOpts,
DataLinkMod, DataLinkOpts,
SvcRec, St)
end.
-
-
+
+
send_orphaned_messages_(Protocol, ProtocolOpts,
DataLinkMod, DataLinkOpts,
- #service {
+ #service {
key = { SvcName, _ },
messages_tid = Tid } = SvcRec, St) ->
@@ -685,15 +686,15 @@ send_orphaned_messages_(Protocol, ProtocolOpts,
%% Extract the first message of the queue.
case first_service_message(SvcRec) of
empty ->
- ?debug("sched:send_orph(~p:~p): Nothing to send",
+ ?debug("sched:send_orph(~p:~p): Nothing to send",
[DataLinkMod, SvcName ]),
St;
- yanked ->
- ?info("sched:send_orph(~p:~p): Message was yanked while processing",
+ yanked ->
+ ?info("sched:send_orph(~p:~p): Message was yanked while processing",
[DataLinkMod, SvcName ]),
send_orphaned_messages_(DataLinkMod, DataLinkOpts,
- Protocol, ProtocolOpts,
+ Protocol, ProtocolOpts,
SvcRec, St);
Msg->
@@ -702,16 +703,16 @@ send_orphaned_messages_(Protocol, ProtocolOpts,
erlang:cancel_timer(Msg#message.timeout_tref),
?debug("sched:send_orph(~p:~p): Sending Trans(~p) Pr(~p) PrOp(~p) DlOp(~p)",
- [DataLinkMod, SvcName,
- Msg#message.transaction_id,
+ [DataLinkMod, SvcName,
+ Msg#message.transaction_id,
Protocol, ProtocolOpts, DataLinkOpts]),
{ _, NSt} = send_message( DataLinkMod, DataLinkOpts,
- Protocol, ProtocolOpts,
+ Protocol, ProtocolOpts,
Msg, St),
send_orphaned_messages_(DataLinkMod, DataLinkOpts,
- Protocol, ProtocolOpts,
+ Protocol, ProtocolOpts,
SvcRec, NSt)
end.
@@ -736,7 +737,7 @@ find_or_create_service(SvcName, DataLinkMod, St) ->
?debug("sched:find_or_create_service(): Creating new ~p", [ SvcName]),
update_service(SvcName, DataLinkMod, unavailable, St);
- SvcRec ->
+ SvcRec ->
%% Update the network address, if it differs, and return
%% the new service / State as {ok, NSvcRec, false, NSt}
?debug("sched:find_or_create_service(): Updating existing ~p", [ SvcName]),
@@ -747,22 +748,22 @@ find_or_create_service(SvcName, DataLinkMod, St) ->
%% Create a new service.
%% Warning: Will overwrite existing service (and its message table reference).
-%%
-update_service(SvcName, DataLinkMod, Available,
+%%
+update_service(SvcName, DataLinkMod, Available,
#st { services_tid = SvcsTid, cs = CS }) ->
- MsgTID =
+ MsgTID =
case ets:lookup(SvcsTid, { SvcName, DataLinkMod }) of
[] -> %% The given service does not exist, create a new message TID
- ?debug("sched:update_service(~p:~p): ~p - Creating new",
+ ?debug("sched:update_service(~p:~p): ~p - Creating new",
[ DataLinkMod, SvcName, Available]),
- ets:new(rvi_messages,
- [ ordered_set, private,
+ ets:new(rvi_messages,
+ [ ordered_set, private,
{ keypos, #message.transaction_id } ]);
- [ TmpSvcRec ] ->
+ [ TmpSvcRec ] ->
%% Grab the existing messagae table ID
- ?debug("sched:update_service(~p:~p): ~p - Updating existing",
+ ?debug("sched:update_service(~p:~p): ~p - Updating existing",
[ DataLinkMod, SvcName, Available]),
#service { messages_tid = TID } = TmpSvcRec,
@@ -771,15 +772,15 @@ update_service(SvcName, DataLinkMod, Available,
%% Insert new service to ets table.
- SvcRec = #service {
+ SvcRec = #service {
key = { SvcName, DataLinkMod },
- available = Available,
+ available = Available,
messages_tid = MsgTID,
cs = CS
},
ets:insert(SvcsTid, SvcRec),
- SvcRec.
+ SvcRec.
@@ -797,7 +798,7 @@ create_transaction_id(St) ->
calc_relative_tout(UnixTimeMS) ->
{ Mega, Sec, Micro } = now(),
Now = Mega * 1000000000 + Sec * 1000 + trunc(Micro / 1000) ,
- ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p",
+ ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p",
[ UnixTimeMS, Now, UnixTimeMS - Now ]),
@@ -831,7 +832,7 @@ delete_unused_service(SvcTid, SvcRec) ->
%% Update the network address, if it differs, and return
%% the new service / State as {ok, NSvcRec, false, NSt}
- ?debug("sched:service_unavailable(): Service ~p now has no address.",
+ ?debug("sched:service_unavailable(): Service ~p now has no address.",
[ SvcRec#service.key ]),
true;
@@ -842,14 +843,14 @@ first_service_message(#service { messages_tid = Tid }) ->
case ets:first(Tid) of
'$end_of_table' ->
empty;
-
+
Key ->
case ets:lookup(Tid, Key) of
[ Msg ] -> Msg;
[] -> yanked
end
end.
-
+
%% A timeout of -1 means 'does not apply'
%%
@@ -868,4 +869,3 @@ select_timeout(TimeOut1, TimeOut2) ->
{ false, false } -> min(TimeOut1, TimeOut2)
end.
-