diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-10-06 12:56:11 +0200 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-11-20 13:43:07 -0800 |
commit | 34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (patch) | |
tree | edfb4bb844c3b90565e7a0bb00f678703d084188 /components/schedule | |
parent | e6299ff287e767dae71fb47009f9bf4620cc3d78 (diff) | |
download | rvi_core-34aa86b5a2e97650fe6299ccf794d5eb5d052d91.tar.gz |
w.i.p. transition to jsx json codec
Diffstat (limited to 'components/schedule')
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 258 |
1 files changed, 129 insertions, 129 deletions
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 5dbce1c..bf19dda 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -25,7 +25,7 @@ terminate/2, code_change/3]). -export([start_json_server/0]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -export([handle_rpc/2, @@ -33,7 +33,7 @@ %% Message structure and storage -%% A message is a piece of +%% A message is a piece of %% %% Service -> ETS -> Messages -record(service, { @@ -41,8 +41,8 @@ %% Is this service currently available on the data link module available = false, - - %% Table containing #message records, + + %% Table containing #message records, %% indexed by their transaction ID (and sequence of delivery) messages_tid = undefined, @@ -53,22 +53,22 @@ % A single message to be delivered to a service. -% Messages are stored in ets tables hosted by a service +% Messages are stored in ets tables hosted by a service --record(message, { +-record(message, { transaction_id, %% Transaction ID that message is tagged with. service, %% Target service timeout, %% Timeout, UTC data_link, %% Data Link Module to use. { Module, Opts} protocol, %% Protocol to use. { Module Opts } - routes, %% Routes retrieved for this + routes, %% Routes retrieved for this timeout_tref, %% Reference to erlang timer associated with this message. parameters, signature }). --record(st, { +-record(st, { next_transaction_id = 1, %% Sequentially incremented transaction id. services_tid = undefined, cs %% Service specification @@ -111,42 +111,42 @@ init([]) -> CS = rvi_common:get_component_specification(), service_discovery_rpc:subscribe(CS, ?MODULE), - {ok, #st{ + {ok, #st{ cs = CS, - services_tid = ets:new(rvi_schedule_services, - [ set, private, + services_tid = ets:new(rvi_schedule_services, + [ set, private, { keypos, #service.key } ])}}. start_json_server() -> rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup). -schedule_message(CompSpec, - SvcName, - Timeout, +schedule_message(CompSpec, + SvcName, + Timeout, Parameters, Signature) -> - - rvi_common:request(schedule, ?MODULE, - schedule_message, - [{ service, SvcName }, + + rvi_common:request(schedule, ?MODULE, + schedule_message, + [{ service, SvcName }, { timeout, Timeout }, - { parameters, {struct, Parameters } }, - { signature, Signature }], + { parameters, Parameters }, + { signature, Signature }], [status, transaction_id], CompSpec). service_available(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(schedule, ?MODULE, - service_available, + rvi_common:notification(schedule, ?MODULE, + service_available, [{ service, SvcName }, { data_link_mod, DataLinkModule } ], CompSpec). service_unavailable(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(schedule, ?MODULE, - service_unavailable, + rvi_common:notification(schedule, ?MODULE, + service_unavailable, [{ service, SvcName }, { data_link_mod, DataLinkModule } ], CompSpec). @@ -165,7 +165,7 @@ handle_rpc("schedule_message", Args) -> %% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]), ?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]), - [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message, + [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message, [ SvcName, Timeout, {struct, Parameters}, @@ -185,16 +185,16 @@ handle_notification("service_available", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), - gen_server:cast(?SERVER, { rvi, service_available, + gen_server:cast(?SERVER, { rvi, service_available, [ SvcName, - list_to_atom(DataLinkModule) ]}), + list_to_existing_atom(DataLinkModule) ]}), ok; handle_notification("service_unavailable", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), - gen_server:cast(?SERVER, { rvi, service_unavailable, + gen_server:cast(?SERVER, { rvi, service_unavailable, [ SvcName, list_to_atom(DataLinkModule) ]}), @@ -205,8 +205,8 @@ handle_notification(Other, _Args) -> ok. handle_call( { rvi, schedule_message, - [SvcName, - Timeout, + [SvcName, + Timeout, Parameters, Signature] }, _From, St) -> @@ -220,11 +220,11 @@ handle_call( { rvi, schedule_message, { TransID, NSt1} = create_transaction_id(St), %% Queue the message - {_, NSt2 }= queue_message(SvcName, - TransID, + {_, NSt2 }= queue_message(SvcName, + TransID, rvi_routing:get_service_routes(SvcName), %% Can be [] (no route) - Timeout, - Parameters, + Timeout, + Parameters, Signature, NSt1), @@ -253,7 +253,7 @@ handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) -> %% Find or create the service. ?debug("sched:service_available(): ~p:~s", [ DataLinkModule, SvcName ]), - + %% Create a new or update an existing service. SvcRec = update_service(SvcName, DataLinkModule, available, St), @@ -267,7 +267,7 @@ handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) -> { noreply, NSt2 }; -handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, +handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, #st { services_tid = SvcTid } = St) -> %% Grab the service @@ -275,7 +275,7 @@ handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]}, [] -> %% No service found - no op. {noreply, St}; - [ SvcRec ] -> + [ SvcRec ] -> %% Delete service if it does not have any pending messages. case delete_unused_service(SvcTid, SvcRec) of true -> %% service was deleted @@ -306,7 +306,7 @@ handle_cast(Other, St) -> %%-------------------------------------------------------------------- %% Handle timeouts -handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, +handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, #st { services_tid = SvcTid } = St) -> ?info("sched:timeout(~p:~p): trans_id: ~p", [ DLMod, SvcName, TransID]), @@ -320,7 +320,7 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, ?debug("sched:timeout(~p:~p): Rescheduling", [ DLMod, SvcName]), ets:delete(SvcRec#service.messages_tid, TransID), - %% Calculate + %% Calculate TOut = calc_relative_tout(Msg#message.timeout), %% Has the message itself, not only the current @@ -331,11 +331,11 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, do_timeout_callback(St#st.cs, SvcName, TransID), {noreply, St}; false -> - + %% Try to requeue message - { _Res, NSt } = - queue_message(SvcName, - Msg#message.transaction_id, + { _Res, NSt } = + queue_message(SvcName, + Msg#message.transaction_id, Msg#message.routes, Msg#message.timeout, Msg#message.parameters, @@ -344,8 +344,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, {noreply, NSt} end; - _ -> - ?info("sched:timeout(): trans_id(~p) service(~p): Yanked while processing", + _ -> + ?info("sched:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), {noreply, St} @@ -398,15 +398,15 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) -> { SvcName, _ } = SvcRec#service.key, - TRef = erlang:send_after(RelativeTimeout, self(), - { rvi_message_timeout, + TRef = erlang:send_after(RelativeTimeout, self(), + { rvi_message_timeout, SvcName, DataLinkMod, Message#message.transaction_id }), %% Add message to the service's queue, with an updated %% timeout ref. - ets:insert(SvcRec#service.messages_tid, + ets:insert(SvcRec#service.messages_tid, Message#message { timeout_tref = TRef }), ok. @@ -418,44 +418,44 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) -> %% %% Stash the message in the unknown datalinvariant of the service %% and opportunistically send it if the service -queue_message(SvcName, - TransID, +queue_message(SvcName, + TransID, [ ], Timeout, - Parameters, - Signature, + Parameters, + Signature, St) -> - TOut = calc_relative_tout(Timeout), - ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.", - [ SvcName, TOut / 1000.0]), - %% Stash in Service / orphaned + TOut = calc_relative_tout(Timeout), + ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.", + [ SvcName, TOut / 1000.0]), + %% Stash in Service / orphaned SvcRec = find_or_create_service(SvcName, orphaned, St), store_message(SvcRec, orphaned, #message { - transaction_id = TransID, + transaction_id = TransID, service = SvcName, timeout = Timeout, data_link = undefined, protocol = undefined, routes = [], timeout_tref = 0, - parameters = Parameters, + parameters = Parameters, signature = Signature - }, + }, TOut), {ok, St}; - - + + %% Try to queue message -queue_message(SvcName, - TransID, +queue_message(SvcName, + TransID, [ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ], - Timeout, - Parameters, - Signature, + Timeout, + Parameters, + Signature, St) -> ?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]), @@ -469,24 +469,24 @@ queue_message(SvcName, %% Bring up the relevant data link for the given route. %% Once up, the data link will invoke service_availble() %% to indicate that the service is available for the given DL. - %% + %% Msg = #message { - transaction_id = TransID, + transaction_id = TransID, service = SvcName, timeout = Timeout, data_link = { DLMod, DLOpt }, protocol = { ProtoMod, ProtoOpt }, routes = RemainingRoutes, timeout_tref = 0, - parameters = Parameters, + parameters = Parameters, signature = Signature - }, - + }, + case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of [ ok, DLTimeout ] -> TOut = select_timeout(calc_relative_tout(Timeout), DLTimeout), - ?debug("sched:q(~p:~s): ~p seconds to compe up.", + ?debug("sched:q(~p:~s): ~p seconds to compe up.", [ DLMod, SvcName, TOut / 1000.0]), store_message(SvcRec, DLMod, Msg, TOut), @@ -494,11 +494,11 @@ queue_message(SvcName, [ already_connected, _] -> %% The service may already be available, give it a shot. - ?debug("sched:q(~p:~s): already up. Sending.", - [ DLMod, SvcName]), + ?debug("sched:q(~p:~s): already up. Sending.", + [ DLMod, SvcName]), %% Will re-queue message if cannot send. - { _, NSt } = + { _, NSt } = send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St), { ok, NSt }; @@ -507,26 +507,26 @@ queue_message(SvcName, %% We failed to setup a data link. Try the next route. %% [ Err, _Reason] -> - ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]), + ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]), queue_message(SvcName, TransID, - RemainingRoutes, + RemainingRoutes, Timeout, Parameters, - Signature, + Signature, St) end. - + %% Send messages to locally connected service send_message(local, _, _, _, Msg, St) -> - ?debug("sched:send_msg(local:~s). WIll send to local", - [ Msg#message.service]), + ?debug("sched:send_msg(local:~s). WIll send to local", + [ Msg#message.service]), - service_edge_rpc:handle_remote_message(St#st.cs, - Msg#message.service, + service_edge_rpc:handle_remote_message(St#st.cs, + Msg#message.service, Msg#message.timeout, Msg#message.parameters, Msg#message.signature), @@ -537,12 +537,13 @@ send_message(DataLinkMod, DataLinkOpts, ProtoMod, ProtoOpts, Msg, St) -> - ?debug("sched:send_msg(): ~p:~p:~p", - [ProtoMod, DataLinkMod, Msg#message.service]), + ?debug("sched:send_msg(): ~p:~p:~p", + [ProtoMod, DataLinkMod, Msg#message.service]), %% Send off message to the correct protocol module case ProtoMod:send_message( St#st.cs, + Msg#message.transaction_id, Msg#message.service, Msg#message.timeout, ProtoOpts, @@ -552,27 +553,27 @@ send_message(DataLinkMod, DataLinkOpts, Msg#message.signature) of %% Success - [ok] -> + [ok] -> %% Send the rest of the messages associated with this service/dl {ok, St}; - + %% Failed [Err] -> - ?info("sched:send_msg(): Failed: ~p:~p:~p -> ~p", + ?info("sched:send_msg(): Failed: ~p:~p:~p -> ~p", [ProtoMod, DataLinkMod, Msg#message.service, Err]), - + %% Requeue this message with the next route queue_message(Msg#message.service, Msg#message.transaction_id, Msg#message.routes, - Msg#message.timeout, + Msg#message.timeout, Msg#message.parameters, Msg#message.signature, St) end. %% The service is not available -send_queued_messages(#service { +send_queued_messages(#service { key = { SvcName, _ }, available = unavailable, messages_tid = _Tid } = _SvcRec, St) -> @@ -580,7 +581,7 @@ send_queued_messages(#service { ?info("sched:send(): SvcName: ~p: Not available", [SvcName]), { not_available, St }; -send_queued_messages(#service { +send_queued_messages(#service { key = { SvcName, DataLinkMod }, available = available, messages_tid = Tid } = SvcRec, St) -> @@ -593,8 +594,8 @@ send_queued_messages(#service { ?debug("sched:send(): Nothing to send"), { ok, St }; - yanked -> - ?info("sched:send(): Message was yanked while trying to send: ~p", + yanked -> + ?info("sched:send(): Message was yanked while trying to send: ~p", [SvcRec#service.key]), { ok, St}; @@ -617,7 +618,7 @@ send_queued_messages(#service { %% where messages are placed while waiting for a final timeout after %% all routes have failed. %% -%% If messages exist in the orphaned queue for SvcName, +%% If messages exist in the orphaned queue for SvcName, %% we will try to send them using the DataLink/Protocol combo %% provided on the command line. send_orphaned_messages(SvcName, local, St) -> @@ -634,7 +635,7 @@ send_orphaned_messages(SvcName, local, St) -> %% the given data link %% Start chugging out messages SvcRec -> - send_orphaned_messages_(undefined, undefined, + send_orphaned_messages_(undefined, undefined, local, undefined, SvcRec, St) end; @@ -643,7 +644,7 @@ send_orphaned_messages(SvcName, local, St) -> %% are placed while waiting for a final timeout after all routes %% have failed. %% -%% If messages exist in the orphaned queue for SvcName, +%% If messages exist in the orphaned queue for SvcName, %% we will try to send them using the DataLink/Protocol combo %% provided on the command line. send_orphaned_messages(SvcName, DataLinkMod, St) -> @@ -658,26 +659,26 @@ send_orphaned_messages(SvcName, DataLinkMod, St) -> %% We have messages waiting for the service, but no protocol has been configured %% for transmitting them over the given data link module - { _, [] } -> + { _, [] } -> ?debug("sched:send_orph(~p:~p): No protocol configured. Skipped", [DataLinkMod, SvcName]), St; - + %% We have orphaned messages for the service and %% we have at least one protocol that we can use over %% the given data link %% Start chugging out messages { SvcRec, [{ Proto, ProtoOpts, DataLinkOpts } | _]} -> - send_orphaned_messages_(Proto, ProtoOpts, + send_orphaned_messages_(Proto, ProtoOpts, DataLinkMod, DataLinkOpts, SvcRec, St) end. - - + + send_orphaned_messages_(Protocol, ProtocolOpts, DataLinkMod, DataLinkOpts, - #service { + #service { key = { SvcName, _ }, messages_tid = Tid } = SvcRec, St) -> @@ -685,15 +686,15 @@ send_orphaned_messages_(Protocol, ProtocolOpts, %% Extract the first message of the queue. case first_service_message(SvcRec) of empty -> - ?debug("sched:send_orph(~p:~p): Nothing to send", + ?debug("sched:send_orph(~p:~p): Nothing to send", [DataLinkMod, SvcName ]), St; - yanked -> - ?info("sched:send_orph(~p:~p): Message was yanked while processing", + yanked -> + ?info("sched:send_orph(~p:~p): Message was yanked while processing", [DataLinkMod, SvcName ]), send_orphaned_messages_(DataLinkMod, DataLinkOpts, - Protocol, ProtocolOpts, + Protocol, ProtocolOpts, SvcRec, St); Msg-> @@ -702,16 +703,16 @@ send_orphaned_messages_(Protocol, ProtocolOpts, erlang:cancel_timer(Msg#message.timeout_tref), ?debug("sched:send_orph(~p:~p): Sending Trans(~p) Pr(~p) PrOp(~p) DlOp(~p)", - [DataLinkMod, SvcName, - Msg#message.transaction_id, + [DataLinkMod, SvcName, + Msg#message.transaction_id, Protocol, ProtocolOpts, DataLinkOpts]), { _, NSt} = send_message( DataLinkMod, DataLinkOpts, - Protocol, ProtocolOpts, + Protocol, ProtocolOpts, Msg, St), send_orphaned_messages_(DataLinkMod, DataLinkOpts, - Protocol, ProtocolOpts, + Protocol, ProtocolOpts, SvcRec, NSt) end. @@ -736,7 +737,7 @@ find_or_create_service(SvcName, DataLinkMod, St) -> ?debug("sched:find_or_create_service(): Creating new ~p", [ SvcName]), update_service(SvcName, DataLinkMod, unavailable, St); - SvcRec -> + SvcRec -> %% Update the network address, if it differs, and return %% the new service / State as {ok, NSvcRec, false, NSt} ?debug("sched:find_or_create_service(): Updating existing ~p", [ SvcName]), @@ -747,22 +748,22 @@ find_or_create_service(SvcName, DataLinkMod, St) -> %% Create a new service. %% Warning: Will overwrite existing service (and its message table reference). -%% -update_service(SvcName, DataLinkMod, Available, +%% +update_service(SvcName, DataLinkMod, Available, #st { services_tid = SvcsTid, cs = CS }) -> - MsgTID = + MsgTID = case ets:lookup(SvcsTid, { SvcName, DataLinkMod }) of [] -> %% The given service does not exist, create a new message TID - ?debug("sched:update_service(~p:~p): ~p - Creating new", + ?debug("sched:update_service(~p:~p): ~p - Creating new", [ DataLinkMod, SvcName, Available]), - ets:new(rvi_messages, - [ ordered_set, private, + ets:new(rvi_messages, + [ ordered_set, private, { keypos, #message.transaction_id } ]); - [ TmpSvcRec ] -> + [ TmpSvcRec ] -> %% Grab the existing messagae table ID - ?debug("sched:update_service(~p:~p): ~p - Updating existing", + ?debug("sched:update_service(~p:~p): ~p - Updating existing", [ DataLinkMod, SvcName, Available]), #service { messages_tid = TID } = TmpSvcRec, @@ -771,15 +772,15 @@ update_service(SvcName, DataLinkMod, Available, %% Insert new service to ets table. - SvcRec = #service { + SvcRec = #service { key = { SvcName, DataLinkMod }, - available = Available, + available = Available, messages_tid = MsgTID, cs = CS }, ets:insert(SvcsTid, SvcRec), - SvcRec. + SvcRec. @@ -797,7 +798,7 @@ create_transaction_id(St) -> calc_relative_tout(UnixTimeMS) -> { Mega, Sec, Micro } = now(), Now = Mega * 1000000000 + Sec * 1000 + trunc(Micro / 1000) , - ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p", + ?debug("sched:calc_relative_tout(): TimeoutUnixMS(~p) - Now(~p) = ~p", [ UnixTimeMS, Now, UnixTimeMS - Now ]), @@ -831,7 +832,7 @@ delete_unused_service(SvcTid, SvcRec) -> %% Update the network address, if it differs, and return %% the new service / State as {ok, NSvcRec, false, NSt} - ?debug("sched:service_unavailable(): Service ~p now has no address.", + ?debug("sched:service_unavailable(): Service ~p now has no address.", [ SvcRec#service.key ]), true; @@ -842,14 +843,14 @@ first_service_message(#service { messages_tid = Tid }) -> case ets:first(Tid) of '$end_of_table' -> empty; - + Key -> case ets:lookup(Tid, Key) of [ Msg ] -> Msg; [] -> yanked end end. - + %% A timeout of -1 means 'does not apply' %% @@ -868,4 +869,3 @@ select_timeout(TimeOut1, TimeOut2) -> { false, false } -> min(TimeOut1, TimeOut2) end. - |