diff options
author | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-18 20:02:21 -0700 |
---|---|---|
committer | Magnus Feuer <mfeuer@jaguarlandrover.com> | 2015-03-18 20:02:21 -0700 |
commit | d0bb358a78076972e1aaeaa769c93486b5f83110 (patch) | |
tree | 371101db4fd211a14c236e7a2c9757dbf6eb1486 /components/schedule | |
parent | 0a24e9c82235bdfb64ce03382e0073af63aba719 (diff) | |
download | rvi_core-d0bb358a78076972e1aaeaa769c93486b5f83110.tar.gz |
Rewrite to adapt to gen_server model
Diffstat (limited to 'components/schedule')
-rw-r--r-- | components/schedule/src/rvi_schedule.erl | 4 | ||||
-rw-r--r-- | components/schedule/src/schedule.app.src | 2 | ||||
-rw-r--r-- | components/schedule/src/schedule.erl | 644 | ||||
-rw-r--r-- | components/schedule/src/schedule_app.erl | 6 | ||||
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 654 | ||||
-rw-r--r-- | components/schedule/src/schedule_sup.erl | 2 |
6 files changed, 650 insertions, 662 deletions
diff --git a/components/schedule/src/rvi_schedule.erl b/components/schedule/src/rvi_schedule.erl index c5ae66b..5d039df 100644 --- a/components/schedule/src/rvi_schedule.erl +++ b/components/schedule/src/rvi_schedule.erl @@ -16,9 +16,9 @@ Certificate :: string()) -> Result::tuple(). --callback register_remote_service(NetworkAddress :: string(), +-callback register_remote_services(NetworkAddress :: string(), AvailableServices :: string()) -> Result::tuple(). --callback unregister_remote_service(ServiceNames :: string()) -> Result::tuple(). +-callback unregister_remote_services(ServiceNames :: string()) -> Result::tuple(). diff --git a/components/schedule/src/schedule.app.src b/components/schedule/src/schedule.app.src index 810f140..cf31f0d 100644 --- a/components/schedule/src/schedule.app.src +++ b/components/schedule/src/schedule.app.src @@ -19,5 +19,5 @@ rvi_common ]}, {mod, { schedule_app, []}}, - {start_phases, [{init, []}]} + {start_phases, [{json_rpc, []}]} ]}. diff --git a/components/schedule/src/schedule.erl b/components/schedule/src/schedule.erl deleted file mode 100644 index 558c4e7..0000000 --- a/components/schedule/src/schedule.erl +++ /dev/null @@ -1,644 +0,0 @@ -%% -%% Copyright (C) 2014, Jaguar Land Rover -%% -%% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the -%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ -%% - --module(schedule). - - --behaviour(gen_server). --behaviour(rvi_schedule). --include_lib("lager/include/log.hrl"). - -%% API --export([start_link/0]). --export([schedule_message/6, - register_remote_services/2, - unregister_remote_services/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% Message structure and storage -%% A message is a piece of -%% -%% Service -> ETS -> Messages --record(service, { - name = "", %% Fully qualified service name. - %% Set to { IP, Port} when service is available, else unknown_network_address. - network_address = unknown_network_address, - - %% Table containing #message records, - %% indexed by their transaction ID (and sequence of delivery) - messages_tid = undefined - }). - - -% A single message to be delivered to a service. -% Messages are stored in ets tables hosted by a service - --record(message, { - transaction_id, %% Transaction ID that message is tagged with. - timeout, %% Timeout, UTC - timeout_tref, %% Reference to erlang timer associated with this message. - timeout_cb, %% Callback to invoke when timeout occurs. - parameters, - signature, - certificate - }). - - --record(st, { - next_transaction_id = 1, %% Sequentially incremented transaction id. - services_tid = undefined %% Known services. - }). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, St} | -%% {ok, St, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([]) -> - %% Setup the relevant ets tables. - {ok, #st{ services_tid = ets:new(rvi_schedule_services, - [ set, private, - { keypos, #service.name } ])}}. - - -schedule_message(SvcName, - Timeout, - Callback, - Parameters, - Signature, - Certificate) -> - - gen_server:call(?SERVER, { - schedule_message, - SvcName, - Timeout, - Callback, - Parameters, - Signature, - Certificate - }). - -register_remote_services(NetworkAddress, AvailableServices) -> - gen_server:call(?SERVER, { register_remote_services, NetworkAddress, AvailableServices }). - -unregister_remote_services(ServiceNames) -> - gen_server:call(?SERVER, { unregister_remote_services, ServiceNames }). - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, St) -> -%% {reply, Reply, St} | -%% {reply, Reply, St, Timeout} | -%% {noreply, St} | -%% {noreply, St, Timeout} | -%% {stop, Reason, Reply, St} | -%% {stop, Reason, St} -%% @end -%%-------------------------------------------------------------------- -handle_call( { schedule_message, - SvcName, - Timeout, - Callback, - Parameters, - Signature, - Certificate - }, _From, St) -> - - ?debug("schedule:sched_msg(): service_name: ~p", [SvcName]), - ?debug("schedule:sched_msg(): timeout: ~p", [Timeout]), - ?debug("schedule:sched_msg(): callback: ~p", [Callback]), - ?debug("schedule:sched_msg(): parameters: ~p", [Parameters]), - ?debug("schedule:sched_msg(): signature: ~p", [Signature]), - ?debug("schedule:sched_msg(): certificate: ~p", [Certificate]), - ?debug("schedule:sched_msg(): St: ~p", [St]), - - - { ok, TransID, NSt } = queue_message(SvcName, - Timeout, - Callback, - Parameters, - Signature, - Certificate, - St), - - { reply, {ok, TransID}, NSt }; - -handle_call({rvi_call, schedule_message, Args}, From, St) -> - {_, SvcName} = lists:keyfind(service_name, 1, Args), - {_, Timeout} = lists:keyfind(timeout, 1, Args), - {_, Parameters} = lists:keyfind(parameters, 1, Args), - {_, Signature} = lists:keyfind(signature, 1, Args), - {_, Certificate} = lists:keyfind(certificate, 1, Args), - { reply, { ok, TransID}, NSt} = - handle_call( { schedule_message, - SvcName, - Timeout, - no_callback, - Parameters, - Signature, - Certificate - }, From, St), - { reply, { ok, [ - { status, rvi_common:json_rpc_status(ok)}, - { transaction_id, TransID }]}, NSt}; - - - - -handle_call( {register_remote_services, NetworkAddress, AvailableServices}, _From, St) -> - ?info("schedule:register_remote_services(): services(~p) -> ~p", [AvailableServices, NetworkAddress]), - {ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St), - {reply, ok, NSt}; - - -handle_call({rvi_call, register_remote_services, Args}, From, St) -> - {_, NetworkAddress } = lists:keyfind(network_address, 1, Args), - {_, AvailableServices } = lists:keyfind(services, 1, Args), - {reply, ok, NSt } = handle_call({ register_remote_services, - NetworkAddress, - AvailableServices}, From, St), - - { reply, { ok, [{ status, rvi_common:json_rpc_status(ok) }] } , NSt}; - -handle_call( {unregister_remote_services, ServiceNames}, _From, St) -> - ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]), - {ok, NSt} = multiple_services_unavailable(ServiceNames, St), - {reply, ok, NSt }; - -handle_call({rvi_call, unregister_remote_services, Args}, From, St) -> - {_, Services } = lists:keyfind(services, 1, Args), - {reply, ok, NSt} = handle_call({ unregister_remote_services, Services}, From, St), - { reply, { ok, [{ status, rvi_common:json_rpc_status(ok) } ]}, NSt}; - - - - -handle_call(_Request, _From, St) -> - Reply = ok, - {reply, Reply, St}. - - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, St) -> {noreply, St} | -%% {noreply, St, Timeout} | -%% {stop, Reason, St} -%% @end -%%-------------------------------------------------------------------- - -handle_cast(_Msg, St) -> - {noreply, St}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, St) -> {noreply, St} | -%% {noreply, St, Timeout} | -%% {stop, Reason, St} -%% @end -%%-------------------------------------------------------------------- - -%% Handle timeouts -handle_info({ rvi_message_timeout, SvcName, TransID}, #st { services_tid = SvcTid } = St) -> - - case ets:lookup(SvcTid, SvcName) of - [ Svc ] -> - %% Delete from ets. - case ets:lookup(Svc#service.messages_tid, TransID) of - [ Msg ] -> - ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), - do_timeout_callback(Svc, Msg), - ets:delete(Svc#service.messages_tid, TransID); - _ -> - ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), - ok - end; - _-> ok - end, - {noreply, St}; - - -handle_info(_Info, St) -> - {noreply, St}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, St) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _St) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process st when code is changed -%% -%% @spec code_change(OldVsn, St, Extra) -> {ok, NewSt} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, St, _Extra) -> - {ok, St}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - - -%% Queue a message for transmission once a service become available. -%% If the service does not exist, it will be created. -%% Once the message is queued, an attempt will be made to send all -%% messages queued in the given service. This attempt will fail -%% pretty quickly if the service's network address is set to -%% 'unavailable' -queue_message(SvcName, - Timeout, - Callback, - Parameters, - Signature, - Certificate, St) -> -%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), -%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), -%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), -%% ?info("schedule:sched_msg(): signature: ~p", [Signature]), -%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), - - %% Create a new transaction ID. - {NewTransID, NSt1} = create_transaction_id(St), - - %% Create a timer to handle message timeout - TRef = erlang:send_after(calculate_timeout_period(Timeout), self(), - { rvi_message_timeout, SvcName, NewTransID }), - - %% Build the message record. - Msg = #message { - transaction_id = NewTransID, - timeout = Timeout, - timeout_tref = TRef, - timeout_cb = Callback, - parameters = Parameters, - signature = Signature, - certificate = Certificate - }, - - %% Find or create the service - - %% If this is a message to a (yet) unknown service, we will create - %% it to act as a placeholder for its messages until they time out - %% or the service becomes available. - %% - %% If the service does exist, we queue it and try to send it. - %% - {ok, Service, IsNewService, NSt2 } = find_or_create_service(SvcName, NSt1), - - %% If the service was created by the call above, - %% We will try to bring up a data link to it. - case IsNewService of - true -> - bring_up_data_link(SvcName); - - false -> - ok - end, - - %% Add to ets table - ets:insert(Service#service.messages_tid, Msg), - - %% Attempt to send the message - {_, NSt3 } = try_sending_messages(Service, NSt2), - - %% Return - { ok, NewTransID, NSt3}. - - -%% Check if we can send messages queued up under the given service. -try_sending_messages(#service { - name = SvcName, - network_address = unknown_network_address, - messages_tid = _Tid } = _Service, St) -> - ?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]), - {not_available, St}; - - -try_sending_messages(#service { - name = SvcName, - network_address = NetworkAddress, - messages_tid = Tid } = Service, St) -> - - ?debug("schedule:try_send(): SvcName: ~p", [SvcName]), - ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]), - - %% Extract the first message of the queue. - case ets:first(Tid) of - %% No more messages to send. - '$end_of_table' -> - ?debug("schedule:try_send(): Nothing to send"), - { ok, St}; - - Key -> - ?debug("schedule:try_send(): Sending: ~p", [Key]), - %% Extract first message and try to send it - case ets:lookup(Tid, Key) of - [ Msg ] -> - case send_message(NetworkAddress, - SvcName, - Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature, - Msg#message.certificate) of - ok -> - %% Send successful - delete entry. - ets:delete(Tid, Key), - - %% Delete timeout ref - erlang:cancel_timer(Msg#message.timeout_tref), - - %% Send the rest. - try_sending_messages(Service, St); - - Err -> - ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]), - %% Failed to send message, leave in queue and err out. - { {send_failed, Err}, St} - end; - _ -> - ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), - { ok, St} - - end - end. - - -%% Mark a service as available, as reported by data_link_up -%% If the service does not exist, it will be created as a host -%% for future messages destined to that service. -%% -%% If the service does exist, its network address will be upadted -%% with whatever address was reported to this function. -%% -%% Once the service has been created or updated, we will attempt -%% to send any messages queued inside it. -%% -service_available(SvcName, NetworkAddress, St) -> - ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]), - - %% Find or create the service. - {ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St), - - try_sending_messages(Svc, NSt1). - -service_unavailable(SvcName, #st { services_tid = SvcTid } = St) -> - ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]), - ets:delete(SvcTid, SvcName), - { ok, St }. - - -bring_up_data_link(SvcName) -> - %% Resolve the service to a network address that we can - %% use to bring up the data link - case rvi_common:send_component_request(service_discovery, - resolve_remote_service, - [ {service, SvcName} ], - [ network_address ]) of - - { ok, ok, [ NetworkAddress] } -> - %% Tell data link to bring up a communicationc hannel. - case rvi_common:send_component_request(data_link, setup_data_link, - [ - {service, SvcName}, - {network_address, NetworkAddress} - ]) of - {ok, already_connected } -> - already_connected; - Que -> - ?info("schedule:bring_up_data_link() Failed:~p.", [Que]), - ok - end; - - {ok, not_found, _ } -> - ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p." - " Service not found.", - [ SvcName ]), - not_found; - - Err -> - ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p", - [ SvcName, Err ]), - err - end. - -send_message(NetworkAddress, SvcName, Timeout, - Parameters, Signature, Certificate) -> - - ?info("schedule:send_message(): service(~p) -> addr(~p) ", [SvcName, NetworkAddress]), -%% ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]), -%% ?info("schedule:send_message(): timeout: ~p", [Timeout]), -%% ?info("schedule:send_message(): parameters: ~p", [Parameters]), -%% ?info("schedule:send_message(): signature: ~p", [Signature]), -%% ?info("schedule:send_message(): certificate: ~p", [Certificate]), - - case rvi_common:send_component_request( - protocol, send_message, - [ - { service_name, SvcName }, - { timeout, Timeout}, - { network_address, NetworkAddress}, - { parameters, Parameters}, - { signature, Signature}, - { certificate, Certificate } - ]) of - { ok, _ } -> ok; - Err -> Err - end. - - - -%% -%% data_link_up has reported that multiple services are now -%% available at NetworkAddress. -%% Iterate through all services and mark them as available, -%% possibly sending any pending message targeting the newly activated -%% service. -%% -multiple_services_available([], _NetworkAddress, St) -> - {ok, St}; - -multiple_services_available([ Svc | T], NetworkAddress, St) -> - {ok, NSt} = service_available(Svc, NetworkAddress, St), - multiple_services_available(T, NetworkAddress, NSt). - -multiple_services_unavailable([], St) -> - {ok, St}; - -multiple_services_unavailable([ SvcName | T], St) -> - {ok, NSt} = service_unavailable(SvcName, St), - multiple_services_unavailable(T, NSt). - - -find_or_create_service(ServiceName, St) -> - %% Invoke with retain to keep any exising network addresses already - %% in place in an existing service. - find_or_create_service(ServiceName, retain_existing_address, St). - -find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) -> - ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]), - - case ets:lookup(SvcTid, ServiceName) of - [] -> %% The given service does not exist, create it. - ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]), - create_service(ServiceName, NetworkAddress, St); - - [ Svc1 ] when NetworkAddress =:= retain_existing_address -> - %% We found a service, and are instructed not to touch - %% its network address. Return it. - { ok, Svc1, false, St}; - - [ Svc2 ] -> - %% Update the network address, if it differs, and return - %% the new service / State as {ok, NSvc, false, NSt} - ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]), - update_service_network_address(Svc2, NetworkAddress, St) - end. - - -%% -%% Catch where no network address update is necessary. -%% -update_service_network_address(#service { network_address = NetworkAddress } = Service, - NetworkAddress, St) -> - { ok, Service, false, St}; %% False indicates that the service exists. - - -%% -%% Update the service in the ets table with a new network address. -%% -update_service_network_address(#service {} = Service, NetworkAddress, St) -> - %% Create a new service. - NewService = Service#service { network_address = NetworkAddress}, - - %% Replace existing serviceo in the ets table of services. - ets:insert(St#st.services_tid, [ NewService ]), - - { ok, NewService, false, St}. %% False indicates that the service exists. - - - - -%% If we are creating a new service, we need to remap the -%% retain_existing_address to unknown_network_address in order to get the correct -%% initial state, which is a created service with an unknown network address. -create_service(ServiceName, retain_existing_address, St) -> - create_service(ServiceName, unknown_network_address, St); - -%% Create a new service and return the new state (not currently modified) -%% and the newly initialized service revord. -%% -create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) -> - Svc = #service { - name = ServiceName, - network_address = NetworkAddress, - messages_tid = ets:new(rvi_messages, - [ ordered_set, private, - { keypos, #message.transaction_id } ]) - }, - - %% Insert new service to ets table. - ets:insert(SvcsTid, Svc), - - %% Return new service and existing state. - ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]), - ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), - { ok, Svc, true, St}. %% True indicates that the service is newly created. - - -%% Create a new and unique transaction id -create_transaction_id(St) -> - ?debug("schedule:create_transaction_id(): St: ~p", [ St ]), - ID = St#st.next_transaction_id, - - %% FIXME: Maybe interate pid into transaction to handle multiple - %% schedulers? - { ID, St#st { next_transaction_id = ID + 1 }}. - -%% Calculate a relative timeout based on the UTC TS we are provided with. -calculate_timeout_period(UTC) -> - { Mega, Sec, _Micro } = now(), - Now = Mega * 1000000 + Sec, - ?debug("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p", [ UTC, Now, UTC - Now ]), - - %% Cap the timeout value at something reasonable - TOut = - case UTC - Now >= 4294967295 of - true -> - ?info("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p: Truncated to 4294967295", [ UTC, Now, UTC - Now ]), - 4294967295; - - false -> UTC - Now - end, - - case TOut =< 0 of - true -> - 1; %% One millisec is the smallest value we will time out on - - false -> - TOut * 1000 - end. - -%% Handle a callback for a timed out message. - -do_timeout_callback(Service, #message { timeout_cb = { M, F, A}, - transaction_id = TransID}) -> - M:F(Service, TransID, A); - -%% callback element of #message is not an {M,F,A} format, ignore. -do_timeout_callback(_,_) -> - ok. - diff --git a/components/schedule/src/schedule_app.erl b/components/schedule/src/schedule_app.erl index 664ebe8..aa20fa7 100644 --- a/components/schedule/src/schedule_app.erl +++ b/components/schedule/src/schedule_app.erl @@ -25,8 +25,12 @@ start(_StartType, _StartArgs) -> start_phase(init, _, _) -> schedule_rpc:init(), - ok. + ok; + +start_phase(json_rpc, _, _) -> + schedule_rpc:start_json_server(), + ok. stop(_State) -> ok. diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index 2f2a30d..09ca2a4 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -10,23 +10,650 @@ -module(schedule_rpc). +-behaviour(gen_server). +-behaviour(rvi_schedule). + +-include_lib("lager/include/log.hrl"). + +%% API +-export([start_link/0]). +-export([schedule_message/6, + register_remote_services/2, + unregister_remote_services/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([start_json_server/0]). +-define(SERVER, ?MODULE). -export([handle_rpc/2]). --export([init/0]). --include_lib("lager/include/log.hrl"). -init() -> - case rvi_common:get_component_config(schedule, exo_http_opts) of - { ok, ExoHttpOpts } -> - exoport_exo_http:instance(schedule_sup, - schedule_rpc, - ExoHttpOpts); +%% Message structure and storage +%% A message is a piece of +%% +%% Service -> ETS -> Messages +-record(service, { + name = "", %% Fully qualified service name. + %% Set to { IP, Port} when service is available, else unknown_network_address. + network_address = unknown_network_address, + + %% Table containing #message records, + %% indexed by their transaction ID (and sequence of delivery) + messages_tid = undefined + }). + + +% A single message to be delivered to a service. +% Messages are stored in ets tables hosted by a service + +-record(message, { + transaction_id, %% Transaction ID that message is tagged with. + timeout, %% Timeout, UTC + timeout_tref, %% Reference to erlang timer associated with this message. + timeout_cb, %% Callback to invoke when timeout occurs. + parameters, + signature, + certificate + }). + + +-record(st, { + next_transaction_id = 1, %% Sequentially incremented transaction id. + services_tid = undefined %% Known services. + }). + + + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, St} | +%% {ok, St, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + %% Setup the relevant ets tables. + {ok, #st{ services_tid = ets:new(rvi_schedule_services, + [ set, private, + { keypos, #service.name } ])}}. + +start_json_server() -> + rvi_common:start_json_rpc_server(schedule, ?MODULE, schedule_sup). + +schedule_message(SvcName, + Timeout, + Callback, + Parameters, + Signature, + Certificate) -> + + gen_server:call(?SERVER, { + schedule_message, + SvcName, + Timeout, + Callback, + Parameters, + Signature, + Certificate + }). + +register_remote_services(NetworkAddress, AvailableServices) -> + gen_server:call(?SERVER, { register_remote_services, NetworkAddress, AvailableServices }). + +unregister_remote_services(ServiceNames) -> + gen_server:call(?SERVER, { unregister_remote_services, ServiceNames }). + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, St) -> +%% {reply, Reply, St} | +%% {reply, Reply, St, Timeout} | +%% {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, Reply, St} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- +handle_call( { schedule_message, + SvcName, + Timeout, + Callback, + Parameters, + Signature, + Certificate + }, _From, St) -> + + ?debug("schedule:sched_msg(): service_name: ~p", [SvcName]), + ?debug("schedule:sched_msg(): timeout: ~p", [Timeout]), + ?debug("schedule:sched_msg(): callback: ~p", [Callback]), + ?debug("schedule:sched_msg(): parameters: ~p", [Parameters]), + ?debug("schedule:sched_msg(): signature: ~p", [Signature]), + ?debug("schedule:sched_msg(): certificate: ~p", [Certificate]), + ?debug("schedule:sched_msg(): St: ~p", [St]), + + + { ok, TransID, NSt } = queue_message(SvcName, + Timeout, + Callback, + Parameters, + Signature, + Certificate, + St), + + { reply, {ok, TransID}, NSt }; + +handle_call({rvi_call, schedule_message, Args}, From, St) -> + {_, SvcName} = lists:keyfind(service_name, 1, Args), + {_, Timeout} = lists:keyfind(timeout, 1, Args), + {_, Parameters} = lists:keyfind(parameters, 1, Args), + {_, Signature} = lists:keyfind(signature, 1, Args), + {_, Certificate} = lists:keyfind(certificate, 1, Args), + { reply, { ok, TransID}, NSt} = + handle_call( { schedule_message, + SvcName, + Timeout, + no_callback, + Parameters, + Signature, + Certificate + }, From, St), + { reply, { ok, [ + { status, rvi_common:json_rpc_status(ok)}, + { transaction_id, TransID }]}, NSt}; + + + + +handle_call( {register_remote_services, NetworkAddress, AvailableServices}, _From, St) -> + ?info("schedule:register_remote_services(): services(~p) -> ~p", [AvailableServices, NetworkAddress]), + {ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St), + {reply, ok, NSt}; + + +handle_call({rvi_call, register_remote_services, Args}, From, St) -> + {_, NetworkAddress } = lists:keyfind(network_address, 1, Args), + {_, AvailableServices } = lists:keyfind(services, 1, Args), + {reply, ok, NSt } = handle_call({ register_remote_services, + NetworkAddress, + AvailableServices}, From, St), + + { reply, { ok, [{ status, rvi_common:json_rpc_status(ok) }] } , NSt}; + +handle_call( {unregister_remote_services, ServiceNames}, _From, St) -> + ?info("schedule:unregister_remote_services(): Services(~p)", [ServiceNames]), + {ok, NSt} = multiple_services_unavailable(ServiceNames, St), + {reply, ok, NSt }; + +handle_call({rvi_call, unregister_remote_services, Args}, From, St) -> + {_, Services } = lists:keyfind(services, 1, Args), + {reply, ok, NSt} = handle_call({ unregister_remote_services, Services}, From, St), + { reply, { ok, [{ status, rvi_common:json_rpc_status(ok) } ]}, NSt}; + + + + +handle_call(_Request, _From, St) -> + Reply = ok, + {reply, Reply, St}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, St) -> {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- + +handle_cast(_Msg, St) -> + {noreply, St}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, St) -> {noreply, St} | +%% {noreply, St, Timeout} | +%% {stop, Reason, St} +%% @end +%%-------------------------------------------------------------------- + +%% Handle timeouts +handle_info({ rvi_message_timeout, SvcName, TransID}, #st { services_tid = SvcTid } = St) -> + + case ets:lookup(SvcTid, SvcName) of + [ Svc ] -> + %% Delete from ets. + case ets:lookup(Svc#service.messages_tid, TransID) of + [ Msg ] -> + ?info("schedule:timeout(): trans_id(~p) service(~p)", [ TransID, SvcName]), + do_timeout_callback(Svc, Msg), + ets:delete(Svc#service.messages_tid, TransID); + _ -> + ?info("schedule:timeout(): trans_id(~p) service(~p): Yanked while processing", [ TransID, SvcName]), + ok + end; + _-> ok + end, + {noreply, St}; + + +handle_info(_Info, St) -> + {noreply, St}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, St) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _St) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process st when code is changed +%% +%% @spec code_change(OldVsn, St, Extra) -> {ok, NewSt} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + + +%% Queue a message for transmission once a service become available. +%% If the service does not exist, it will be created. +%% Once the message is queued, an attempt will be made to send all +%% messages queued in the given service. This attempt will fail +%% pretty quickly if the service's network address is set to +%% 'unavailable' +queue_message(SvcName, + Timeout, + Callback, + Parameters, + Signature, + Certificate, St) -> +%% ?info("schedule:sched_msg(): service(~p) timeout(~p)", [SvcName, Timeout]), +%% ?info("schedule:sched_msg(): timeout (~p)", [Timeout]), +%% ?info("schedule:sched_msg(): parameters: ~p", [Parameters]), +%% ?info("schedule:sched_msg(): signature: ~p", [Signature]), +%% ?info("schedule:sched_msg(): certificate: ~p", [Certificate]), + + %% Create a new transaction ID. + {NewTransID, NSt1} = create_transaction_id(St), + + %% Create a timer to handle message timeout + TRef = erlang:send_after(calculate_timeout_period(Timeout), self(), + { rvi_message_timeout, SvcName, NewTransID }), + + %% Build the message record. + Msg = #message { + transaction_id = NewTransID, + timeout = Timeout, + timeout_tref = TRef, + timeout_cb = Callback, + parameters = Parameters, + signature = Signature, + certificate = Certificate + }, + + %% Find or create the service + + %% If this is a message to a (yet) unknown service, we will create + %% it to act as a placeholder for its messages until they time out + %% or the service becomes available. + %% + %% If the service does exist, we queue it and try to send it. + %% + {ok, Service, IsNewService, NSt2 } = find_or_create_service(SvcName, NSt1), + + %% If the service was created by the call above, + %% We will try to bring up a data link to it. + case IsNewService of + true -> + bring_up_data_link(SvcName); + + false -> + ok + end, + + %% Add to ets table + ets:insert(Service#service.messages_tid, Msg), + + %% Attempt to send the message + {_, NSt3 } = try_sending_messages(Service, NSt2), + + %% Return + { ok, NewTransID, NSt3}. + + +%% Check if we can send messages queued up under the given service. +try_sending_messages(#service { + name = SvcName, + network_address = unknown_network_address, + messages_tid = _Tid } = _Service, St) -> + ?info("schedule:try_send(): SvcName: ~p: Not available", [SvcName]), + {not_available, St}; + + +try_sending_messages(#service { + name = SvcName, + network_address = NetworkAddress, + messages_tid = Tid } = Service, St) -> + + ?debug("schedule:try_send(): SvcName: ~p", [SvcName]), + ?debug("schedule:try_send(): Network Address: ~p", [NetworkAddress]), + + %% Extract the first message of the queue. + case ets:first(Tid) of + %% No more messages to send. + '$end_of_table' -> + ?debug("schedule:try_send(): Nothing to send"), + { ok, St}; + + Key -> + ?debug("schedule:try_send(): Sending: ~p", [Key]), + %% Extract first message and try to send it + case ets:lookup(Tid, Key) of + [ Msg ] -> + case send_message(NetworkAddress, + SvcName, + Msg#message.timeout, + Msg#message.parameters, + Msg#message.signature, + Msg#message.certificate) of + ok -> + %% Send successful - delete entry. + ets:delete(Tid, Key), + + %% Delete timeout ref + erlang:cancel_timer(Msg#message.timeout_tref), + + %% Send the rest. + try_sending_messages(Service, St); + + Err -> + ?info("schedule:try_send(): No send: ~p -> ~p : ~p", [SvcName, NetworkAddress, Err]), + %% Failed to send message, leave in queue and err out. + { {send_failed, Err}, St} + end; + _ -> + ?info("schedule:try_send(): Message was yanked while trying to send: ~p", [Key]), + { ok, St} + + end + end. + + +%% Mark a service as available, as reported by data_link_up +%% If the service does not exist, it will be created as a host +%% for future messages destined to that service. +%% +%% If the service does exist, its network address will be upadted +%% with whatever address was reported to this function. +%% +%% Once the service has been created or updated, we will attempt +%% to send any messages queued inside it. +%% +service_available(SvcName, NetworkAddress, St) -> + ?info("schedule:service_available(): service(~p) -> NetworkAddress(~p)", [ SvcName, NetworkAddress ]), + + %% Find or create the service. + {ok, Svc, _, NSt1} = find_or_create_service(SvcName, NetworkAddress, St), + + try_sending_messages(Svc, NSt1). + +service_unavailable(SvcName, #st { services_tid = SvcTid } = St) -> + ?info("schedule:service_unavailable(): Service(~p)", [ SvcName ]), + ets:delete(SvcTid, SvcName), + { ok, St }. + + +bring_up_data_link(SvcName) -> + %% Resolve the service to a network address that we can + %% use to bring up the data link + case rvi_common:send_component_request(service_discovery, + resolve_remote_service, + [ {service, SvcName} ], + [ network_address ]) of + + { ok, ok, [ NetworkAddress] } -> + %% Tell data link to bring up a communicationc hannel. + case rvi_common:send_component_request(data_link, setup_data_link, + [ + {service, SvcName}, + {network_address, NetworkAddress} + ]) of + {ok, already_connected } -> + already_connected; + Que -> + ?info("schedule:bring_up_data_link() Failed:~p.", [Que]), + ok + end; + + {ok, not_found, _ } -> + ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p." + " Service not found.", + [ SvcName ]), + not_found; + + Err -> + ?info("schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p", + [ SvcName, Err ]), + err + end. + +send_message(NetworkAddress, SvcName, Timeout, + Parameters, Signature, Certificate) -> + + ?info("schedule:send_message(): service(~p) -> addr(~p) ", [SvcName, NetworkAddress]), +%% ?info("schedule:send_message(): network_address: ~p", [NetworkAddress]), +%% ?info("schedule:send_message(): timeout: ~p", [Timeout]), +%% ?info("schedule:send_message(): parameters: ~p", [Parameters]), +%% ?info("schedule:send_message(): signature: ~p", [Signature]), +%% ?info("schedule:send_message(): certificate: ~p", [Certificate]), + + case rvi_common:send_component_request( + protocol, send_message, + [ + { service_name, SvcName }, + { timeout, Timeout}, + { network_address, NetworkAddress}, + { parameters, Parameters}, + { signature, Signature}, + { certificate, Certificate } + ]) of + { ok, _ } -> ok; Err -> Err + end. + + + +%% +%% data_link_up has reported that multiple services are now +%% available at NetworkAddress. +%% Iterate through all services and mark them as available, +%% possibly sending any pending message targeting the newly activated +%% service. +%% +multiple_services_available([], _NetworkAddress, St) -> + {ok, St}; + +multiple_services_available([ Svc | T], NetworkAddress, St) -> + {ok, NSt} = service_available(Svc, NetworkAddress, St), + multiple_services_available(T, NetworkAddress, NSt). + +multiple_services_unavailable([], St) -> + {ok, St}; + +multiple_services_unavailable([ SvcName | T], St) -> + {ok, NSt} = service_unavailable(SvcName, St), + multiple_services_unavailable(T, NSt). + + +find_or_create_service(ServiceName, St) -> + %% Invoke with retain to keep any exising network addresses already + %% in place in an existing service. + find_or_create_service(ServiceName, retain_existing_address, St). + +find_or_create_service(ServiceName, NetworkAddress, #st { services_tid = SvcTid } = St) -> + ?debug("schedule:find_or_create_service(): SvcName: ~p", [ ServiceName]), + + case ets:lookup(SvcTid, ServiceName) of + [] -> %% The given service does not exist, create it. + ?debug("schedule:find_or_create_service(): Creating new ~p", [ ServiceName]), + create_service(ServiceName, NetworkAddress, St); + + [ Svc1 ] when NetworkAddress =:= retain_existing_address -> + %% We found a service, and are instructed not to touch + %% its network address. Return it. + { ok, Svc1, false, St}; + + [ Svc2 ] -> + %% Update the network address, if it differs, and return + %% the new service / State as {ok, NSvc, false, NSt} + ?debug("schedule:find_or_create_service(): Updating existing ~p", [ ServiceName]), + update_service_network_address(Svc2, NetworkAddress, St) + end. + + +%% +%% Catch where no network address update is necessary. +%% +update_service_network_address(#service { network_address = NetworkAddress } = Service, + NetworkAddress, St) -> + { ok, Service, false, St}; %% False indicates that the service exists. + + +%% +%% Update the service in the ets table with a new network address. +%% +update_service_network_address(#service {} = Service, NetworkAddress, St) -> + %% Create a new service. + NewService = Service#service { network_address = NetworkAddress}, + + %% Replace existing serviceo in the ets table of services. + ets:insert(St#st.services_tid, [ NewService ]), + + { ok, NewService, false, St}. %% False indicates that the service exists. + + + + +%% If we are creating a new service, we need to remap the +%% retain_existing_address to unknown_network_address in order to get the correct +%% initial state, which is a created service with an unknown network address. +create_service(ServiceName, retain_existing_address, St) -> + create_service(ServiceName, unknown_network_address, St); + +%% Create a new service and return the new state (not currently modified) +%% and the newly initialized service revord. +%% +create_service(ServiceName, NetworkAddress, #st { services_tid = SvcsTid } = St) -> + Svc = #service { + name = ServiceName, + network_address = NetworkAddress, + messages_tid = ets:new(rvi_messages, + [ ordered_set, private, + { keypos, #message.transaction_id } ]) + }, + + %% Insert new service to ets table. + ets:insert(SvcsTid, Svc), + + %% Return new service and existing state. + ?debug("schedule:create_service(): service(~p) -> NetworkAddress(~p)", [ ServiceName, NetworkAddress]), + ?debug("schedule:create_service(): MessageTID: ~p", [ Svc#service.messages_tid]), + { ok, Svc, true, St}. %% True indicates that the service is newly created. + + +%% Create a new and unique transaction id +create_transaction_id(St) -> + ?debug("schedule:create_transaction_id(): St: ~p", [ St ]), + ID = St#st.next_transaction_id, + + %% FIXME: Maybe interate pid into transaction to handle multiple + %% schedulers? + { ID, St#st { next_transaction_id = ID + 1 }}. + +%% Calculate a relative timeout based on the UTC TS we are provided with. +calculate_timeout_period(UTC) -> + { Mega, Sec, _Micro } = now(), + Now = Mega * 1000000 + Sec, + ?debug("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p", [ UTC, Now, UTC - Now ]), + + %% Cap the timeout value at something reasonable + TOut = + case UTC - Now >= 4294967295 of + true -> + ?info("schedule:calculate_timeout_period(): Timeout(~p) - Now(~p) = ~p: Truncated to 4294967295", [ UTC, Now, UTC - Now ]), + 4294967295; + + false -> UTC - Now end, + + case TOut =< 0 of + true -> + 1; %% One millisec is the smallest value we will time out on + + false -> + TOut * 1000 + end. + +%% Handle a callback for a timed out message. + +do_timeout_callback(Service, #message { timeout_cb = { M, F, A}, + transaction_id = TransID}) -> + M:F(Service, TransID, A); + +%% callback element of #message is not an {M,F,A} format, ignore. +do_timeout_callback(_,_) -> ok. + %% JSON-RPC entry point %% CAlled by local exo http server handle_rpc("schedule_message", Args) -> @@ -49,7 +676,7 @@ handle_rpc("schedule_message", Args) -> Signature, Certificate), {ok, [ { status, rvi_common:json_rpc_status(ok)}, - { transaction_id, TransID } ] }. + { transaction_id, TransID } ] }; handle_rpc("register_remote_services", Args) -> @@ -57,14 +684,15 @@ handle_rpc("register_remote_services", Args) -> {ok, AvailableServices} = rvi_common:get_json_element(["services"], Args), ?debug("schedule_rpc:register_remote_services(): network_address: ~p", [ NetworkAddress]), ?debug("schedule_rpc:register_remote_services(): services: ~p", [ AvailableServices]), - schedule:register_remote_services(NetworkAddress, AvailableServices), - {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. + register_remote_services(NetworkAddress, AvailableServices), + {ok, [ { status, rvi_common:json_rpc_status(ok)}]}; handle_rpc("unregister_remote_services", Args) -> {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args), ?debug("schedule_rpc:unregister_remote_services(): services ~p", [ DiscountinuedServices]), - schedule:unregister_remote_services(DiscountinuedServices), - unregister_remote_services(DiscountinuedServices); + unregister_remote_services(DiscountinuedServices), + {ok, [ { status, rvi_common:json_rpc_status(ok)}]}; + handle_rpc(Other, _Args) -> ?debug("schedule_rpc:handle_rpc(~p): unknown", [ Other ]), diff --git a/components/schedule/src/schedule_sup.erl b/components/schedule/src/schedule_sup.erl index a6080e7..238d860 100644 --- a/components/schedule/src/schedule_sup.erl +++ b/components/schedule/src/schedule_sup.erl @@ -34,6 +34,6 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ - ?CHILD(schedule, worker) + ?CHILD(schedule_rpc, worker) ]} }. |