summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus <mfeuer@jaguarlandrover.com>2014-08-19 16:43:55 -0700
committerMagnus <mfeuer@jaguarlandrover.com>2014-08-19 16:43:55 -0700
commit4f5fdd76a04ececf8d720fa2ac0a9d3eb312ee29 (patch)
tree47acde3b3353994fa596a29d3cb9ba9a487fade2
parent5952b5c6b0d0350972ecaf1cda147eb3197999e4 (diff)
downloadrvi_core-4f5fdd76a04ececf8d720fa2ac0a9d3eb312ee29.tar.gz
Removed network addresses from scheduled messages in order to have them JIT resolved by service edge. Now sends all pending messages for a service when it becomes available
Signed-off-by: Magnus <mfeuer@jaguarlandrover.com>
-rw-r--r--src/schedule.erl245
-rw-r--r--src/schedule_rpc.erl30
2 files changed, 116 insertions, 159 deletions
diff --git a/src/schedule.erl b/src/schedule.erl
index 088e777..35db723 100644
--- a/src/schedule.erl
+++ b/src/schedule.erl
@@ -22,9 +22,9 @@
%% API
-export([start_link/0]).
--export([schedule_message/6,
- data_link_up/2,
- data_link_down/2]).
+-export([schedule_message/5,
+ register_remote_services/2,
+ unregister_remote_services/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -43,8 +43,7 @@
}).
-record(service, {
- target = "", %% Target service, as reported by data_link_up
- network_address = unavailable, %% Network address of target, as reported by data_link_up.
+ target = "", %% Target service, as reported by register_remote_services
queue = undefined %% Queue of #messages awaiting target.
}).
@@ -86,7 +85,6 @@ init([]) ->
schedule_message(Target,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate) ->
@@ -94,17 +92,16 @@ schedule_message(Target,
schedule_message,
Target,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate
}).
-data_link_up(NetworkAddress, AvailableServices) ->
- gen_server:cast(?SERVER, { data_link_up, NetworkAddress, AvailableServices }).
+register_remote_services(NetworkAddress, AvailableServices) ->
+ gen_server:cast(?SERVER, { register_remote_services, NetworkAddress, AvailableServices }).
-data_link_down(NetworkAddress, DiscontinuedServices) ->
- gen_server:cast(?SERVER, { data_link_down, NetworkAddress, DiscontinuedServices }).
+unregister_remote_services(NetworkAddress, DiscontinuedServices) ->
+ gen_server:cast(?SERVER, { unregister_remote_services, NetworkAddress, DiscontinuedServices }).
%%--------------------------------------------------------------------
%% @private
@@ -137,15 +134,13 @@ handle_call(_Request, _From, St) ->
handle_cast( { schedule_message,
Target,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate
}, St) ->
?debug(" schedule:sched_msg(): target: ~p", [Target]),
?debug(" schedule:sched_msg(): timeout: ~p", [Timeout]),
- ?debug(" schedule:sched_msg(): network_address: ~p", [NetworkAddress]),
- ?debug(" schedule:sched_msg(): parameters: ~p", [Parameters]),
+ ?debug(" schedule:sched_msg(): parameters: ~p", [Parameters]),
?debug(" schedule:sched_msg(): signature: ~p", [Signature]),
?debug(" schedule:sched_msg(): certificate: ~p", [Certificate]),
?debug(" schedule:sched_msg(): St: ~p", [St]),
@@ -159,44 +154,40 @@ handle_cast( { schedule_message,
certificate = Certificate
},
-
NSt = %% Get the new state from queue_message
case queue_message(Target, Msg, St) of
{ first_message, TmpSt } ->
?debug(" schedule:sched_msg(): Queued message is the first one. "
"Bring up data link"),
- rvi_common:send_component_request(data_link, setup_data_link,
- [
- {service, Target},
- {network_address, NetworkAddress}
- ]),
+ bring_up_data_link(Target),
TmpSt;
- { send_now, NetworkAddress, TmpSt } ->
+ { send_now, TmpSt } ->
?debug(" schedule:sched_msg(): Servivce is already available. Send now."),
- ok = send_message(NetworkAddress, Target, Timeout,
- Parameters, Signature, Certificate),
+ ok = send_message(Target, Timeout, Parameters, Signature, Certificate),
TmpSt;
+
{ ok, TmpSt } ->
?debug(" schedule:sched_msg(): Message queued."),
-
TmpSt
end,
{noreply, NSt };
-handle_cast( {data_link_up, NetworkAddress, AvailableServices}, St) ->
+handle_cast( {register_remote_services, NetworkAddress, AvailableServices}, St) ->
+
+ ?debug(" schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]),
+ ?debug(" schedule:register_remote_services(): AvailableService: ~p", [AvailableServices]),
+
{ok, NSt} = multiple_services_available(AvailableServices, NetworkAddress, St),
{noreply, NSt};
-
-
-
-handle_cast( {data_link_down, _NetworkAddress, DiscontinuedServices}, St) ->
- ?debug(" schedule:data_link_down(): NetworkAddress: ~p DiscontinuedServices: ~p",
- [_NetworkAddress, DiscontinuedServices]),
+handle_cast( {unregister_remote_services, NetworkAddress, DiscontinuedServices}, St) ->
+ ?debug(" schedule:register_remote_services(): NetworkAddress: ~p", [NetworkAddress]),
+ ?debug(" schedule:register_remote_services(): DiscontinuedServices: ~p",
+ [DiscontinuedServices]),
{ok, NSt } = multiple_services_unavailable(DiscontinuedServices, St),
{noreply, NSt };
@@ -262,14 +253,11 @@ code_change(_OldVsn, St, _Extra) ->
%% queue, awaiting a data_link_up command from data_link so that they can be sent.
%%
queue_message(Target, Msg, St) ->
-
-
?debug(" schedule:sched_msg(): target: ~p", [Target]),
?debug(" schedule:sched_msg(): msg.timeout: ~p", [Msg#message.timeout]),
?debug(" schedule:sched_msg(): msg.parameters: ~p", [Msg#message.parameters]),
?debug(" schedule:sched_msg(): msg.signature: ~p", [Msg#message.signature]),
?debug(" schedule:sched_msg(): msg.certificate: ~p", [Msg#message.certificate]),
-
%% Does the service even exist?
case find_service(Target, St) of
@@ -285,43 +273,13 @@ queue_message(Target, Msg, St) ->
%% Create a new service with unavailable network address and
%% a single element queue with the new message.
- { first_message, modify_service(Target, unavailable, Q, St) };
+ { first_message, modify_service(Target, Q, St) };
- %% Svc exists, but is unavailable add the element to the queue.
- %%
- { unavailable, Svc} ->
- %% Remove the service from the NSt
-
- %% Update the queue.
- NQ = queue:in(Msg, Svc#service.queue),
-
- %% Replace the existing service with a new one having the
- %% updated queue.
- NSt = modify_service(Target, unavailable, NQ, St ),
-
- %% Check if the exiting queue (before we add Msg) is empty.
- %% If so, this is the first element queued, and we should
- %% reply with a first_message response
-
- case queue:is_empty(Svc#service.queue) of
- true -> %% Nothing to send
- ?debug(" schedule:sched_msg(): Unavailable - first messge"),
- { first_message, NSt };
-
- false -> %% There are already elements in the queue.
- ?debug(" schedule:sched_msg(): Unavailable - additional messsge"),
- { ok, NSt }
-
- end;
-
-
%% Service exists, and has a network address. Do not queue. Send now.
- {available, #service { network_address = NetworkAddress } = Svc }->
+ {ok, Svc }->
?debug(" schedule:sched_msg(): SEND NOW: ~p", [Svc]),
- ?debug(" schedule:sched_msg(): SEND NOW: ~p", [NetworkAddress]),
- { send_now, NetworkAddress, St }
+ { send_now, St }
end.
-
%% Mark a service as available, as reported by data_link_up
@@ -332,39 +290,35 @@ queue_message(Target, Msg, St) ->
%% if messages for the given service are now ready to be sent.
%%
service_available(Target, NetworkAddress, St) ->
- ?debug(" schedule:service_available(): target: ~p~n", [ Target ]),
- ?debug(" schedule:service_available(): network_address: ~p~n", [ NetworkAddress ]),
+ ?debug(" schedule:service_available(): target: ~p", [ Target ]),
+ ?debug(" schedule:service_available(): network_address: ~p", [ NetworkAddress ]),
case find_service(Target, St) of
not_found -> %% The given service does not exist, create it.
- ?debug(" schedule:service_available(): New service. Adding.~n"),
- { ok, modify_service(Target, NetworkAddress, St) };
+ {ok, St};
- { _, Svc } ->
+ { ok, Svc } ->
%% Check if we have elements ready to transmit
case queue:is_empty(Svc#service.queue) of
- true -> %% Nothing to send. Add service and return ok
- ?debug(" schedule:service_available(): Existing service. No pending messages.~n"),
- { ok, modify_service(Target, NetworkAddress, Svc#service.queue, St) };
-
- false -> %% We have elements to send. Add service and report send.
- ?debug(" schedule:service_available(): Existing service. Pending messages.~n"),
- { send_messages, modify_service(Target, NetworkAddress, Svc#service.queue, St) }
+ true -> %% Nothing to send.
+ { ok, St };
+
+ false -> %% We have elements to send. Send them
+ ?debug(" schedule:service_available(): Existing service. Pending messages."),
+ NewSvc = send_messages(Svc, NetworkAddress),
+ %% Update service in state.
+ {ok, modify_service_rec(NewSvc, St) }
end
end.
-%% Add a service with a fresh queue, or modify an existing one
-modify_service(Target, NetworkAddress, St) ->
- modify_service(Target, NetworkAddress, queue:new(), St).
%% Add or modify a service with a carried over queue.
%% Any existing services with the same target name is replaced.
-modify_service(Target, NetworkAddress, Queue, St) ->
+modify_service(Target, Queue, St) ->
?debug(" schedule:modify_service(): Target: ~p", [ Target]),
- ?debug(" schedule:modify_service(): NetworkAddress: ~p", [ NetworkAddress]),
?debug(" schedule:modify_service(): Queue: ~p", [ Queue]),
- ?debug(" schedule:modify_service(): St: ~p", [ St]),
+ ?debug(" schedule:modify_service(): St: ~p", [ St]),
%% Delete old service
NewSvcs = lists:keydelete(Target, #service.target, St#st.services),
@@ -372,14 +326,12 @@ modify_service(Target, NetworkAddress, Queue, St) ->
%% Modify state with the new service setup.
St#st { services = [ #service {
target = Target,
- network_address = NetworkAddress,
queue = Queue } | NewSvcs ] }.
-modify_service(SvcRec, St) ->
+modify_service_rec(SvcRec, St) ->
?debug(" schedule:modify_service(rec): Target: ~p", [ SvcRec#service.target]),
- ?debug(" schedule:modify_service(rec): NetworkAddress: ~p", [ SvcRec#service.network_address]),
?debug(" schedule:modify_service(rec): Queue: ~p", [ SvcRec#service.queue]),
- ?debug(" schedule:modify_service(rec): St: ~p", [ St]),
+ ?debug(" schedule:modify_service(rec): St: ~p", [ St]),
%% Delete old service
NewSvcs = lists:keydelete(SvcRec#service.target, #service.target, St#st.services),
@@ -387,31 +339,67 @@ modify_service(SvcRec, St) ->
%% Modify state with the new service setup.
St#st { services = [ SvcRec | NewSvcs ] }.
+bring_up_data_link(Target) ->
+ %% Resolve the target to a network address that we can
+ %% use to bring up the data link
+ case rvi_common:send_component_request(service_discovery,
+ resolve_remote_service,
+ [ {service, Target} ],
+ [ network_address ]) of
+
+ { ok, ok, [ NetworkAddress], _SDJSON } ->
+ %% Tell data link to bring up a communicationc hannel.
+ rvi_common:send_component_request(data_link, setup_data_link,
+ [
+ {service, Target},
+ {network_address, NetworkAddress}
+ ]),
+ ok;
+
+ {ok, not_found, _, _} ->
+ ?debug(" schedule:bring_up_data_link() Failed to resolve remote Service: ~p. Service not found.",
+ [ Target ]),
+ not_found;
+
+ Err ->
+ ?debug(" schedule:bring_up_data_link() Failed to resolve remote Service: ~p: ~p",
+ [ Target, Err ]),
+ err
+ end.
+send_message(Target, Timeout,
+ Parameters, Signature, Certificate) ->
+ %% Resolve the target to a network address
+ case rvi_common:send_component_request(service_discovery,
+ resolve_remote_service,
+ [ {service, Target} ],
+ [ network_address ]) of
+ { ok, ok, [ NetworkAddress], _SDJSON } ->
+ send_message(NetworkAddress, Target, Timeout,
+ Parameters, Signature, Certificate);
+
+ {ok, not_found, _, _} ->
+ ?debug(" schedule:send_message() Failed to resolve remote Service: ~p. Service not found.",
+ [ Target ]),
+ not_found;
+
+ Err ->
+ ?debug(" schedule:send_message() Failed to resolve remote Service: ~p: ~p",
+ [ Target, Err ]),
-%% Send a single message to protocol.
-%% Check that we don't get invoked with network_address == unavailable,
-%% which means that the service cannot be reached.
-send_message(unavailable, _Target, _Timeout,
- _Parameters, _Signature, _Certificate) ->
+ err
+ end.
- ?debug(" schedule:send_message(): network_address: UNAVAILABLE"),
- ?debug(" schedule:send_message(): target: ~p", [_Target]),
- ?debug(" schedule:send_message(): timeout: ~p", [_Timeout]),
- ?debug(" schedule:send_message(): parameters: ~p", [_Parameters]),
- ?debug(" schedule:send_message(): signature: ~p", [_Signature]),
- ?debug(" schedule:send_message(): certificate: ~p", [_Certificate]),
- {error, unavailable};
send_message(NetworkAddress, Target, Timeout,
Parameters, Signature, Certificate) ->
?debug(" schedule:send_message(): network_address: ~p", [NetworkAddress]),
- ?debug(" schedule:send_message(): target: ~p", [Target]),
- ?debug(" schedule:send_message(): timeout: ~p", [Timeout]),
- ?debug(" schedule:send_message(): parameters: ~p", [Parameters]),
- ?debug(" schedule:send_message(): signature: ~p", [Signature]),
- ?debug(" schedule:send_message(): certificate: ~p", [Certificate]),
+ ?debug(" schedule:send_message(): target: ~p", [Target]),
+ ?debug(" schedule:send_message(): timeout: ~p", [Timeout]),
+ ?debug(" schedule:send_message(): parameters: ~p", [Parameters]),
+ ?debug(" schedule:send_message(): signature: ~p", [Signature]),
+ ?debug(" schedule:send_message(): certificate: ~p", [Certificate]),
case rvi_common:send_component_request(
protocol, send_message,
@@ -428,9 +416,9 @@ send_message(NetworkAddress, Target, Timeout,
end.
-send_messages(#service { network_address = NetworkAddress,
- target = Target,
- queue = Queue } = Svc) ->
+send_messages(#service { target = Target,
+ queue = Queue } = Svc,
+ NetworkAddress) ->
?debug(" schedule:send_messages(): target: ~p", [Target]),
?debug(" schedule:send_messages(): network_address: ~p", [NetworkAddress]),
@@ -446,13 +434,13 @@ send_messages(#service { network_address = NetworkAddress,
Msg#message.signature,
Msg#message.certificate) of
ok ->
- send_messages(Svc#service { queue = NQ});
+ send_messages(Svc#service { queue = NQ}, NetworkAddress);
Err ->
?warning("schedule:send_messages(): Failed: ~p",
[ Err ]),
- send_messages(Svc#service { queue = NQ})
+ send_messages(Svc#service { queue = NQ}, NetworkAddress)
end;
@@ -474,35 +462,11 @@ multiple_services_available([], _NetworkAddress, St) ->
{ok, St};
multiple_services_available([ Svc | T], NetworkAddress, St) ->
-
- NSt = case service_available(Svc, NetworkAddress, St) of
- { send_messages, TmpSt } ->
- %% Locate the newly available service record
- { _, SvcRec } = find_service(Svc, TmpSt),
-
- %% Send all messages pending for the service.
- NSvcRec = send_messages(SvcRec),
-
- %% Modify the service with the updated service
- %% state, and return new genserv state.
- modify_service(NSvcRec, TmpSt);
-
-
- { ok, TmpSt } ->
- TmpSt
-
- end,
+ {ok, NSt} = service_available(Svc, NetworkAddress, St),
multiple_services_available(T, NetworkAddress, NSt).
-
-
-multiple_services_unavailable([], St) ->
- {ok, St};
-
-multiple_services_unavailable([ Svc | T], St) ->
- { _, SvcRec } = find_service(Svc, St),
- NSt = modify_service(SvcRec#service.target, unavailable, SvcRec#service.queue, St),
- multiple_services_unavailable(T, NSt).
+multiple_services_unavailable(_, St) ->
+ {ok, St}.
find_service(Target, #st { services = Svcs } = _St) ->
?debug(" schedule:find_service(): St: ~p", [ _St]),
@@ -510,10 +474,7 @@ find_service(Target, #st { services = Svcs } = _St) ->
false -> %% The given service does not exist, create it.
not_found;
- Svc when Svc#service.network_address =:= unavailable ->
- { unavailable, Svc };
-
Svc ->
- { available, Svc }
+ { ok, Svc }
end.
diff --git a/src/schedule_rpc.erl b/src/schedule_rpc.erl
index ee6e2d1..ce89069 100644
--- a/src/schedule_rpc.erl
+++ b/src/schedule_rpc.erl
@@ -27,32 +27,30 @@ init() ->
end,
ok.
-schedule_message(Target, Timeout, NetworkAddress, Parameters, Signature, Certificate) ->
+schedule_message(Target, Timeout, Parameters, Signature, Certificate) ->
?debug(" schedule_rpc:schedule_request(): target: ~p", [ Target]),
?debug(" schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]),
- ?debug(" schedule_rpc:schedule_request(): network_address: ~p", [ NetworkAddress]),
?debug(" schedule_rpc:schedule_request(): parameters: ~p", [Parameters]),
?debug(" schedule_rpc:schedule_request(): signature: ~p", [Signature]),
?debug(" schedule_rpc:schedule_request(): certificate: ~p", [Certificate]),
schedule:schedule_message(Target,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate),
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
-data_link_up(NetworkAddress, AvailableServices) ->
- ?debug(" schedule_rpc:data_link_up(): network_address: ~p", [ NetworkAddress]),
- ?debug(" schedule_rpc:data_link_up(): services: ~p", [ AvailableServices]),
- schedule:data_link_up(NetworkAddress, AvailableServices),
+register_remote_services(NetworkAddress, AvailableServices) ->
+ ?debug(" schedule_rpc:register_remote_services(): network_address: ~p", [ NetworkAddress]),
+ ?debug(" schedule_rpc:register_remote_services(): services: ~p", [ AvailableServices]),
+ schedule:register_remote_services(NetworkAddress, AvailableServices),
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
-data_link_down(NetworkAddress, DiscountinuedServices) ->
- ?debug(" schedule_rpc:data_link_down(): network_address: ~p", [ NetworkAddress]),
- ?debug(" schedule_rpc:data_link_down(): services ~p", [ DiscountinuedServices]),
- schedule:data_link_down(NetworkAddress, DiscountinuedServices),
+unregister_remote_services(NetworkAddress, DiscountinuedServices) ->
+ ?debug(" schedule_rpc:unregister_remote_services(): network_address: ~p", [ NetworkAddress]),
+ ?debug(" schedule_rpc:unregister_remote_services(): services ~p", [ DiscountinuedServices]),
+ schedule:unregister_remote_services(NetworkAddress, DiscountinuedServices),
{ok, [ { status, rvi_common:json_rpc_status(ok)}]}.
@@ -61,26 +59,24 @@ data_link_down(NetworkAddress, DiscountinuedServices) ->
handle_rpc("schedule_message", Args) ->
{ok, Target} = rvi_common:get_json_element(["target"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
- {ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
{ok, Signature} = rvi_common:get_json_element(["signature"], Args),
{ok, Certificate} = rvi_common:get_json_element(["certificate"], Args),
schedule_message(Target,
Timeout,
- NetworkAddress,
Parameters,
Signature,
Certificate);
-handle_rpc("data_link_up", Args) ->
+handle_rpc("register_remote_services", Args) ->
{ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
{ok, AvailableServices} = rvi_common:get_json_element(["services"], Args),
- data_link_up(NetworkAddress, AvailableServices);
+ register_remote_services(NetworkAddress, AvailableServices);
-handle_rpc("data_link_down", Args) ->
+handle_rpc("unregister_remote_services", Args) ->
{ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args),
{ok, DiscountinuedServices} = rvi_common:get_json_element(["services"], Args),
- data_link_down(NetworkAddress, DiscountinuedServices);
+ unregister_remote_services(NetworkAddress, DiscountinuedServices);
handle_rpc(Other, _Args) ->
?debug(" schedule_rpc:handle_rpc(~p): unknown", [ Other ]),