diff options
author | Magnus <mfeuer@jaguarlandrover.com> | 2014-08-19 16:43:55 -0700 |
---|---|---|
committer | Magnus <mfeuer@jaguarlandrover.com> | 2014-08-19 16:43:55 -0700 |
commit | 4f5fdd76a04ececf8d720fa2ac0a9d3eb312ee29 (patch) | |
tree | 47acde3b3353994fa596a29d3cb9ba9a487fade2 | |
parent | 5952b5c6b0d0350972ecaf1cda147eb3197999e4 (diff) | |
download | rvi_core-4f5fdd76a04ececf8d720fa2ac0a9d3eb312ee29.tar.gz |
Removed network addresses from scheduled messages in order to have them JIT resolved by service edge. Now sends all pending messages for a service when it becomes available
Signed-off-by: Magnus <mfeuer@jaguarlandrover.com>
-rw-r--r-- | src/schedule.erl | 245 | ||||
-rw-r--r-- | src/schedule_rpc.erl | 30 |
2 files changed, 116 insertions, 159 deletions
diff --git a/src/schedule.erl b/src/schedule.erl index 088e777..35db723 100644 --- a/src/schedule.erl +++ b/src/schedule.erl @@ -22,9 +22,9 @@ %% API -export([start_link/0]). --export([schedule_message/6, - data_link_up/2, - data_link_down/2]). +-export([schedule_message/5, + register_remote_services/2, + unregister_remote_services/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -43,8 +43,7 @@ }). -record(service, { - target = "", %% Target service, as reported by data_link_up - network_address = unavailable, %% Network address of target, as reported by data_link_up. + target = "", %% Target service, as reported by register_remote_services queue = undefined %% Queue of #messages awaiting target. }). @@ -86,7 +85,6 @@ init([]) -> schedule_message(Target, Timeout, - NetworkAddress, Parameters, Signature, Certificate) -> @@ -94,17 +92,16 @@ schedule_message(Target, schedule_message, Target, Timeout, - NetworkAddress, Parameters, Signature, Certificate }). -data_link_up(NetworkAddress, AvailableServices) -> - gen_server:cast(?SERVER, { data_link_up, NetworkAddress, AvailableServices }). +register_remote_services(NetworkAddress, AvailableServices) -> + gen_server:cast(?SERVER, { register_remote_services, NetworkAddress, AvailableServices }). -data_link_down(NetworkAddress, DiscontinuedServices) -> - gen_server:cast(?SERVER, { data_link_down, NetworkAddress, DiscontinuedServices }). +unregister_remote_services(NetworkAddress, DiscontinuedServices) -> + gen_server:cast(?SERVER, { unregister_remote_services, NetworkAddress, DiscontinuedServices }). %%-------------------------------------------------------------------- %% @private @@ -137,15 +134,13 @@ handle_call(_Request, _From, St) -> handle_cast( { schedule_message, Target, Timeout, - NetworkAddress, Parameters, Signature, Certificate }, St) -> ?debug(" schedule:sched_msg(): target: ~p", [Target]), ?debug(" schedule:sched_msg(): timeout: ~p", [Timeout]), - ?debug(" schedule:sched_msg(): network_address: ~p", [NetworkAddress]), - ?debug(" schedule:sched_msg(): parameters: ~p", [Parameters]), + ?debug(" schedule:sched_msg(): parameters: ~p", [Parameters]), ?debug(" schedule:sched_msg(): signature: ~p", [Signature]), ?debug(" schedule:sched_msg(): certificate: ~p", [Certificate]), ?debug(" schedule:sched_msg(): St: ~p", [St]), @@ -159,44 +154,40 @@ handle_cast( { schedule_message, certificate = Certificate }, - NSt = %% Get the new state from queue_message case queue_message(Target, Msg, St) of { first_message, TmpSt } -> ?debug(" schedule:sched_msg(): Queued message is the first one. " "Bring up data link"), - rvi_common:send_component_request(data_link, setup_data_link, - [ - {service, Target}, - {network_address, NetworkAddress} - ]), + bring_up_data_link(Target), TmpSt; - { send_now, NetworkAddress, TmpSt } -> + { send_now, TmpSt } -> ?debug(" schedule:sched_msg(): Servivce is already available. Send now."), - ok = send_message(NetworkAddress, Target, Timeout, - Parameters, Signature, Certificate), + ok = send_message(Target, Timeout, Parameters, Signature, Certificate), TmpSt; + { ok, TmpSt } -> ?debug(" schedule:sched_msg(): Message queued."), - TmpSt end, {noreply, NSt }; -handle_cast( {data_link_up, NetworkAddress, AvailableServices}, St) -> +handle_cast( {register_remote_services, NetworkAddress, AvailableServices}, St) -> + + ?debug(" schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]), + ?debug(" schedule:register_remote_services(): AvailableService: ~p", [AvailableServices]), + {ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St), {noreply, NSt}; - - - -handle_cast( {data_link_down, _NetworkAddress, DiscontinuedServices}, St) -> - ?debug(" schedule:data_link_down(): NetworkAddress: ~p DiscontinuedServices: ~p", - [_NetworkAddress, DiscontinuedServices]), +handle_cast( {unregister_remote_services, NetworkAddress, DiscontinuedServices}, St) -> + ?debug(" schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]), + ?debug(" schedule:register_remote_services(): DiscontinuedServices: ~p", + [DiscontinuedServices]), {ok, NSt } = multiple_services_unavailable(DiscontinuedServices, St), {noreply, NSt }; @@ -262,14 +253,11 @@ code_change(_OldVsn, St, _Extra) -> %% queue, awaiting a data_link_up command from data_link so that they can be sent. %% queue_message(Target, Msg, St) -> - - ?debug(" schedule:sched_msg(): target: ~p", [Target]), ?debug(" schedule:sched_msg(): msg.timeout: ~p", [Msg#message.timeout]), ?debug(" schedule:sched_msg(): msg.parameters: ~p", [Msg#message.parameters]), ?debug(" schedule:sched_msg(): msg.signature: ~p", [Msg#message.signature]), ?debug(" schedule:sched_msg(): msg.certificate: ~p", [Msg#message.certificate]), - %% Does the service even exist? case find_service(Target, St) of @@ -285,43 +273,13 @@ queue_message(Target, Msg, St) -> %% Create a new service with unavailable network address and %% a single element queue with the new message. - { first_message, modify_service(Target, unavailable, Q, St) }; + { first_message, modify_service(Target, Q, St) }; - %% Svc exists, but is unavailable add the element to the queue. - %% - { unavailable, Svc} -> - %% Remove the service from the NSt - - %% Update the queue. - NQ = queue:in(Msg, Svc#service.queue), - - %% Replace the existing service with a new one having the - %% updated queue. - NSt = modify_service(Target, unavailable, NQ, St ), - - %% Check if the exiting queue (before we add Msg) is empty. - %% If so, this is the first element queued, and we should - %% reply with a first_message response - - case queue:is_empty(Svc#service.queue) of - true -> %% Nothing to send - ?debug(" schedule:sched_msg(): Unavailable - first messge"), - { first_message, NSt }; - - false -> %% There are already elements in the queue. - ?debug(" schedule:sched_msg(): Unavailable - additional messsge"), - { ok, NSt } - - end; - - %% Service exists, and has a network address. Do not queue. Send now. - {available, #service { network_address = NetworkAddress } = Svc }-> + {ok, Svc }-> ?debug(" schedule:sched_msg(): SEND NOW: ~p", [Svc]), - ?debug(" schedule:sched_msg(): SEND NOW: ~p", [NetworkAddress]), - { send_now, NetworkAddress, St } + { send_now, St } end. - %% Mark a service as available, as reported by data_link_up @@ -332,39 +290,35 @@ queue_message(Target, Msg, St) -> %% if messages for the given service are now ready to be sent. %% service_available(Target, NetworkAddress, St) -> - ?debug(" schedule:service_available(): target: ~p~n", [ Target ]), - ?debug(" schedule:service_available(): network_address: ~p~n", [ NetworkAddress ]), + ?debug(" schedule:service_available(): target: ~p", [ Target ]), + ?debug(" schedule:service_available(): network_address: ~p", [ NetworkAddress ]), case find_service(Target, St) of not_found -> %% The given service does not exist, create it. - ?debug(" schedule:service_available(): New service. Adding.~n"), - { ok, modify_service(Target, NetworkAddress, St) }; + {ok, St}; - { _, Svc } -> + { ok, Svc } -> %% Check if we have elements ready to transmit case queue:is_empty(Svc#service.queue) of - true -> %% Nothing to send. Add service and return ok - ?debug(" schedule:service_available(): Existing service. No pending messages.~n"), - { ok, modify_service(Target, NetworkAddress, Svc#service.queue, St) }; - - false -> %% We have elements to send. Add service and report send. - ?debug(" schedule:service_available(): Existing service. Pending messages.~n"), - { send_messages, modify_service(Target, NetworkAddress, Svc#service.queue, St) } + true -> %% Nothing to send. + { ok, St }; + + false -> %% We have elements to send. Send them + ?debug(" schedule:service_available(): Existing service. Pending messages."), + NewSvc = send_messages(Svc, NetworkAddress), + %% Update service in state. + {ok, modify_service_rec(NewSvc, St) } end end. -%% Add a service with a fresh queue, or modify an existing one -modify_service(Target, NetworkAddress, St) -> - modify_service(Target, NetworkAddress, queue:new(), St). %% Add or modify a service with a carried over queue. %% Any existing services with the same target name is replaced. -modify_service(Target, NetworkAddress, Queue, St) -> +modify_service(Target, Queue, St) -> ?debug(" schedule:modify_service(): Target: ~p", [ Target]), - ?debug(" schedule:modify_service(): NetworkAddress: ~p", [ NetworkAddress]), ?debug(" schedule:modify_service(): Queue: ~p", [ Queue]), - ?debug(" schedule:modify_service(): St: ~p", [ St]), + ?debug(" schedule:modify_service(): St: ~p", [ St]), %% Delete old service NewSvcs = lists:keydelete(Target, #service.target, St#st.services), @@ -372,14 +326,12 @@ modify_service(Target, NetworkAddress, Queue, St) -> %% Modify state with the new service setup. St#st { services = [ #service { target = Target, - network_address = NetworkAddress, queue = Queue } | NewSvcs ] }. -modify_service(SvcRec, St) -> +modify_service_rec(SvcRec, St) -> ?debug(" schedule:modify_service(rec): Target: ~p", [ SvcRec#service.target]), - ?debug(" schedule:modify_service(rec): NetworkAddress: ~p", [ SvcRec#service.network_address]), ?debug(" schedule:modify_service(rec): Queue: ~p", [ SvcRec#service.queue]), - ?debug(" schedule:modify_service(rec): St: ~p", [ St]), + ?debug(" schedule:modify_service(rec): St: ~p", [ St]), %% Delete old service NewSvcs = lists:keydelete(SvcRec#service.target, #service.target, St#st.services), @@ -387,31 +339,67 @@ modify_service(SvcRec, St) -> %% Modify state with the new service setup. St#st { services = [ SvcRec | NewSvcs ] }. +bring_up_data_link(Target) -> + %% Resolve the target to a network address that we can + %% use to bring up the data link + case rvi_common:send_component_request(service_discovery, + resolve_remote_service, + [ {service, Target} ], + [ network_address ]) of + + { ok, ok, [ NetworkAddress], _SDJSON } -> + %% Tell data link to bring up a communicationc hannel. + rvi_common:send_component_request(data_link, setup_data_link, + [ + {service, Target}, + {network_address, NetworkAddress} + ]), + ok; + + {ok, not_found, _, _} -> + ?debug(" schedule:bring_up_data_link() Failed to resolve remote Service: ~p. Service not found.", + [ Target ]), + not_found; + + Err -> + ?debug(" schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p", + [ Target, Err ]), + err + end. +send_message(Target, Timeout, + Parameters, Signature, Certificate) -> + %% Resolve the target to a network address + case rvi_common:send_component_request(service_discovery, + resolve_remote_service, + [ {service, Target} ], + [ network_address ]) of + { ok, ok, [ NetworkAddress], _SDJSON } -> + send_message(NetworkAddress, Target, Timeout, + Parameters, Signature, Certificate); + + {ok, not_found, _, _} -> + ?debug(" schedule:send_message() Failed to resolve remote Service: ~p. Service not found.", + [ Target ]), + not_found; + + Err -> + ?debug(" schedule:send_message() Failed to resolve remote Service: ~p: ~p", + [ Target, Err ]), -%% Send a single message to protocol. -%% Check that we don't get invoked with network_address == unavailable, -%% which means that the service cannot be reached. -send_message(unavailable, _Target, _Timeout, - _Parameters, _Signature, _Certificate) -> + err + end. - ?debug(" schedule:send_message(): network_address: UNAVAILABLE"), - ?debug(" schedule:send_message(): target: ~p", [_Target]), - ?debug(" schedule:send_message(): timeout: ~p", [_Timeout]), - ?debug(" schedule:send_message(): parameters: ~p", [_Parameters]), - ?debug(" schedule:send_message(): signature: ~p", [_Signature]), - ?debug(" schedule:send_message(): certificate: ~p", [_Certificate]), - {error, unavailable}; send_message(NetworkAddress, Target, Timeout, Parameters, Signature, Certificate) -> ?debug(" schedule:send_message(): network_address: ~p", [NetworkAddress]), - ?debug(" schedule:send_message(): target: ~p", [Target]), - ?debug(" schedule:send_message(): timeout: ~p", [Timeout]), - ?debug(" schedule:send_message(): parameters: ~p", [Parameters]), - ?debug(" schedule:send_message(): signature: ~p", [Signature]), - ?debug(" schedule:send_message(): certificate: ~p", [Certificate]), + ?debug(" schedule:send_message(): target: ~p", [Target]), + ?debug(" schedule:send_message(): timeout: ~p", [Timeout]), + ?debug(" schedule:send_message(): parameters: ~p", [Parameters]), + ?debug(" schedule:send_message(): signature: ~p", [Signature]), + ?debug(" schedule:send_message(): certificate: ~p", [Certificate]), case rvi_common:send_component_request( protocol, send_message, @@ -428,9 +416,9 @@ send_message(NetworkAddress, Target, Timeout, end. -send_messages(#service { network_address = NetworkAddress, - target = Target, - queue = Queue } = Svc) -> +send_messages(#service { target = Target, + queue = Queue } = Svc, + NetworkAddress) -> ?debug(" schedule:send_messages(): target: ~p", [Target]), ?debug(" schedule:send_messages(): network_address: ~p", [NetworkAddress]), @@ -446,13 +434,13 @@ send_messages(#service { network_address = NetworkAddress, Msg#message.signature, Msg#message.certificate) of ok -> - send_messages(Svc#service { queue = NQ}); + send_messages(Svc#service { queue = NQ}, NetworkAddress); Err -> ?warning("schedule:send_messages(): Failed: ~p", [ Err ]), - send_messages(Svc#service { queue = NQ}) + send_messages(Svc#service { queue = NQ}, NetworkAddress) end; @@ -474,35 +462,11 @@ multiple_services_available([], _NetworkAddress, St) -> {ok, St}; multiple_services_available([ Svc | T], NetworkAddress, St) -> - - NSt = case service_available(Svc, NetworkAddress, St) of - { send_messages, TmpSt } -> - %% Locate the newly available service record - { _, SvcRec } = find_service(Svc, TmpSt), - - %% Send all messages pending for the service. - NSvcRec = send_messages(SvcRec), - - %% Modify the service with the updated service - %% state, and return new genserv state. - modify_service(NSvcRec, TmpSt); - - - { ok, TmpSt } -> - TmpSt - - end, + {ok, NSt} = service_available(Svc, NetworkAddress, St), multiple_services_available(T, NetworkAddress, NSt). - - -multiple_services_unavailable([], St) -> - {ok, St}; - -multiple_services_unavailable([ Svc | T], St) -> - { _, SvcRec } = find_service(Svc, St), - NSt = modify_service(SvcRec#service.target, unavailable, SvcRec#service.queue, St), - multiple_services_unavailable(T, NSt). +multiple_services_unavailable(_, St) -> + {ok, St}. find_service(Target, #st { services = Svcs } = _St) -> ?debug(" schedule:find_service(): St: ~p", [ _St]), @@ -510,10 +474,7 @@ find_service(Target, #st { services = Svcs } = _St) -> false -> %% The given service does not exist, create it. not_found; - Svc when Svc#service.network_address =:= unavailable -> - { unavailable, Svc }; - Svc -> - { available, Svc } + { ok, Svc } end. diff --git a/src/schedule_rpc.erl b/src/schedule_rpc.erl index ee6e2d1..ce89069 100644 --- a/src/schedule_rpc.erl +++ b/src/schedule_rpc.erl @@ -27,32 +27,30 @@ init() -> end, ok. -schedule_message(Target, Timeout, NetworkAddress, Parameters, Signature, Certificate) -> +schedule_message(Target, Timeout, Parameters, Signature, Certificate) -> ?debug(" schedule_rpc:schedule_request(): target: ~p", [ Target]), ?debug(" schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]), - ?debug(" schedule_rpc:schedule_request(): network_address: ~p", [ NetworkAddress]), ?debug(" schedule_rpc:schedule_request(): parameters: ~p", [Parameters]), ?debug(" schedule_rpc:schedule_request(): signature: ~p", [Signature]), ?debug(" schedule_rpc:schedule_request(): certificate: ~p", [Certificate]), schedule:schedule_message(Target, Timeout, - NetworkAddress, Parameters, Signature, Certificate), {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. -data_link_up(NetworkAddress, AvailableServices) -> - ?debug(" schedule_rpc:data_link_up(): network_address: ~p", [ NetworkAddress]), - ?debug(" schedule_rpc:data_link_up(): services: ~p", [ AvailableServices]), - schedule:data_link_up(NetworkAddress, AvailableServices), +register_remote_services(NetworkAddress, AvailableServices) -> + ?debug(" schedule_rpc:register_remote_services(): network_address: ~p", [ NetworkAddress]), + ?debug(" schedule_rpc:register_remote_services(): services: ~p", [ AvailableServices]), + schedule:register_remote_services(NetworkAddress, AvailableServices), {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. -data_link_down(NetworkAddress, DiscountinuedServices) -> - ?debug(" schedule_rpc:data_link_down(): network_address: ~p", [ NetworkAddress]), - ?debug(" schedule_rpc:data_link_down(): services ~p", [ DiscountinuedServices]), - schedule:data_link_down(NetworkAddress, DiscountinuedServices), +unregister_remote_services(NetworkAddress, DiscountinuedServices) -> + ?debug(" schedule_rpc:unregister_remote_services(): network_address: ~p", [ NetworkAddress]), + ?debug(" schedule_rpc:unregister_remote_services(): services ~p", [ DiscountinuedServices]), + schedule:unregister_remote_services(NetworkAddress, DiscountinuedServices), {ok, [ { status, rvi_common:json_rpc_status(ok)}]}. @@ -61,26 +59,24 @@ data_link_down(NetworkAddress, DiscountinuedServices) -> handle_rpc("schedule_message", Args) -> {ok, Target} = rvi_common:get_json_element(["target"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), - {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), {ok, Signature} = rvi_common:get_json_element(["signature"], Args), {ok, Certificate} = rvi_common:get_json_element(["certificate"], Args), schedule_message(Target, Timeout, - NetworkAddress, Parameters, Signature, Certificate); -handle_rpc("data_link_up", Args) -> +handle_rpc("register_remote_services", Args) -> {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), {ok, AvailableServices} = rvi_common:get_json_element(["services"], Args), - data_link_up(NetworkAddress, AvailableServices); + register_remote_services(NetworkAddress, AvailableServices); -handle_rpc("data_link_down", Args) -> +handle_rpc("unregister_remote_services", Args) -> {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), {ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args), - data_link_down(NetworkAddress, DiscountinuedServices); + unregister_remote_services(NetworkAddress, DiscountinuedServices); handle_rpc(Other, _Args) -> ?debug(" schedule_rpc:handle_rpc(~p): unknown", [ Other ]), |