summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-16 18:32:25 -0700
committerMagnus Feuer <mfeuer@jaguarlandrover.com>2015-04-16 18:32:25 -0700
commitd20136a1149989857cf0727141c60fa83dc38e8c (patch)
tree1386a9ea5d42936e214487bb8972f62a97392403
parent2d720d9341c3ca4ad50e7b27bb65019cb4555705 (diff)
downloadrvi_core-d20136a1149989857cf0727141c60fa83dc38e8c.tar.gz
More stuff
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl45
-rw-r--r--components/schedule/src/rvi_routing.erl38
-rw-r--r--components/schedule/src/schedule_rpc.erl481
-rw-r--r--rvi_backend.config2
-rw-r--r--rvi_sample.config2
-rw-r--r--test/test_plan.md4
-rw-r--r--tizen.config4
7 files changed, 381 insertions, 195 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index eaf6978..16ed4ed 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -520,25 +520,32 @@ handle_cast(Other, St) ->
handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
- case proplists:get_value(target, Opts, undefined) of
- undefined ->
- ?info("dlink_tcp:setup_data_link(~p) Failed: no target given in options.",
- [Service]),
- { reply, [ok, -1 ], St };
-
- Addr ->
- [ Address, Port] = string:tokens(Addr, ":"),
-
- case connect_remote(Address, list_to_integer(Port), St#st.cs) of
- ok ->
- { reply, [ok, 2000], St }; %% 2 second timeout
-
- already_connected ->
- { reply, [already_connected, 2000], St }; %% 2 second timeout to send message
-
- Err ->
- { reply, [Err, 0], St }
- end
+ %% Do we already have a connection that support service?
+ case get_connections_by_service(Service) of
+ [] -> %% Nop[e
+ case proplists:get_value(target, Opts, undefined) of
+ undefined ->
+ ?info("dlink_tcp:setup_data_link(~p) Failed: no target given in options.",
+ [Service]),
+ { reply, [ok, -1 ], St };
+
+ Addr ->
+ [ Address, Port] = string:tokens(Addr, ":"),
+
+ case connect_remote(Address, list_to_integer(Port), St#st.cs) of
+ ok ->
+ { reply, [ok, 2000], St }; %% 2 second timeout
+
+ already_connected -> %% We are already connected
+ { reply, [already_connected, -1], St };
+
+ Err ->
+ { reply, [Err, 0], St }
+ end
+ end;
+
+ _ -> %% Yes - We do have a connection that knows of service
+ { reply, [already_connected, -1], St }
end;
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index f870569..d5bf645 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -14,11 +14,9 @@
%% API
-export([get_service_routes/1]).
+-export([get_service_protocols/2]).
-export([start_link/0]).
--export([find_routes_/2,
- normalize_routes_/2]).
-
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,6 +40,10 @@
get_service_routes(Service) ->
gen_server:call(?SERVER, { rvi_get_service_routes, Service }).
+%%
+%% Retrieve all protocols matching the service / data link pair
+get_service_protocols(Service, DataLink) ->
+ gen_server:call(?SERVER, { rvi_get_protocols, Service, DataLink }).
%%--------------------------------------------------------------------
@@ -94,7 +96,10 @@ init([]) ->
%% @end
%%--------------------------------------------------------------------
handle_call( { rvi_get_service_routes, Service }, _From, St) ->
- {reply, find_routes_( St#st.routes, Service), St};
+ {reply, find_routes( St#st.routes, Service), St};
+
+handle_call( { rvi_get_protocols, Service, DataLink }, _From, St) ->
+ {reply, find_protocols( St#st.routes, Service, DataLink), St};
handle_call(_Request, _From, St) ->
Reply = ok,
@@ -200,7 +205,7 @@ find_routes_(Rt, _Svc, CurRoutes, CurMatchLen) ->
{ CurRoutes, CurMatchLen }.
-find_routes_(Routes, Service) ->
+find_routes(Routes, Service) ->
case find_routes_(Routes, Service, undefined, 0) of
{ undefined, 0 } ->
?debug("rvi_routing(): ~p -> unknown", [ Service]),
@@ -231,3 +236,26 @@ normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) ->
normalize_routes_([ {Pr, DL} | Rem ], Acc) ->
normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]).
+
+find_protocols_(_DataLink, [], Acc ) ->
+ lists:reverse(Acc);
+
+
+%% Matching data link. This is an allowed protocol
+find_protocols_(DataLink, [ {{ Pr, PrOp }, { DL, DLOp }} | T],
+ Acc) when DataLink =:= DL ->
+
+ find_protocols_(DataLink, T, [ { Pr, PrOp, DLOp } | Acc ]);
+
+
+%% No match
+find_protocols_(DataLink, [ {{ _Pr, _PrOp }, { _DL, _DLOp }} | T], Acc) ->
+ find_protocols_(DataLink, T, Acc ).
+
+
+find_protocols(AllRoutes, Service, DataLink) ->
+ SvcRoutes = find_routes(AllRoutes, Service),
+ Res = find_protocols_(DataLink, SvcRoutes, []),
+ ?debug("find_protocols(~p:~p): -> ~p", [ DataLink, Service, Res]),
+ Res.
+
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index 35cc965..4b09250 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -37,11 +37,11 @@
%%
%% Service -> ETS -> Messages
-record(service, {
- key = { "", unknown }, %% Service name and data link modu
+ key = { "", unknown }, %% Service name and data link module
%% Is this service currently available on the data link module
available = false,
-
+
%% Table containing #message records,
%% indexed by their transaction ID (and sequence of delivery)
messages_tid = undefined,
@@ -57,12 +57,12 @@
-record(message, {
transaction_id, %% Transaction ID that message is tagged with.
+ service, %% Target service
timeout, %% Timeout, UTC
data_link, %% Data Link Module to use. { Module, Opts}
protocol, %% Protocol to use. { Module Opts }
routes, %% Routes retrieved for this
timeout_tref, %% Reference to erlang timer associated with this message.
- timeout_cb, %% Callback to invoke when timeout occurs.
parameters,
signature,
certificate
@@ -232,7 +232,6 @@ handle_call( { rvi, schedule_message,
TransID,
rvi_routing:get_service_routes(SvcName), %% Can be no_route
Timeout,
- calc_relative_tout(Timeout),
Parameters,
Signature,
Certificate,
@@ -259,7 +258,7 @@ handle_call(Other, _From, St) ->
%%--------------------------------------------------------------------
-handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) ->
+handle_cast( {rvi, service_available, [ SvcName, DataLinkModule ]}, St) ->
%% Find or create the service.
?debug("sched:service_available(): ~p:~s", [ DataLinkModule, SvcName ]),
@@ -267,10 +266,15 @@ handle_cast( {rvi, service_available, [SvcName, DataLinkModule]}, St) ->
%% Create a new or update an existing service.
SvcRec = update_service(SvcName, DataLinkModule, available, St),
- %% Try to send any pending messages.
- { _, NSt} = try_sending_messages(SvcRec, St),
+ %% Try to send any pending messages waiting for this
+ %% service / data link combo.
+ { _, NSt1} = send_queued_messages(SvcRec, St),
+
+ %% Send any orphaned messages waiting for the service
+ %% to come up
+ NSt2 = send_orphaned_messages(SvcName, DataLinkModule, NSt1),
+ { noreply, NSt2 };
- { noreply, NSt};
handle_cast( {rvi, service_unavailable, [SvcName, DataLinkModule]},
#st { services_tid = SvcTid } = St) ->
@@ -325,18 +329,30 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
?debug("sched:timeout(~p:~p): Rescheduling", [ DLMod, SvcName]),
ets:delete(SvcRec#service.messages_tid, TransID),
- %% Try to requeue message
- { _Res, NSt } =
- queue_message(SvcName,
- Msg#message.transaction_id,
- Msg#message.routes,
- Msg#message.timeout,
- calc_relative_tout(Msg#message.timeout),
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate,
- St),
- {noreply, NSt};
+ %% Calculate
+ TOut = calc_relative_tout(Msg#message.timeout),
+
+ %% Has the message itself, not only the current
+ %% data link attempt, timed out?
+ case TOut =:= -1 of
+ true ->
+ %% Yes!
+ do_timeout_callback(St#st.cs, SvcName, TransID),
+ {noreply, St};
+ false ->
+
+ %% Try to requeue message
+ { _Res, NSt } =
+ queue_message(SvcName,
+ Msg#message.transaction_id,
+ Msg#message.routes,
+ Msg#message.timeout,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St),
+ {noreply, NSt}
+ end;
_ ->
?info("sched:timeout(): trans_id(~p) service(~p): Yanked while processing",
@@ -388,22 +404,23 @@ code_change(_OldVsn, St, _Extra) ->
+store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) ->
+ { SvcName, _ } = SvcRec#service.key,
+
+ TRef = erlang:send_after(RelativeTimeout, self(),
+ { rvi_message_timeout,
+ SvcName,
+ DataLinkMod,
+ Message#message.transaction_id }),
+
+ %% Add message to the service's queue, with an updated
+ %% timeout ref.
+ ets:insert(SvcRec#service.messages_tid,
+ Message#message { timeout_tref = TRef }),
+
+ ok.
-%%
-%% The message has timed out
-%%
-queue_message(SvcName,
- TransID,
- _Routes,
- _MessageTimeout,
- -1, %% We are timed out
- _Parameters,
- _Signature,
- _Certificate,
- St) ->
- do_timeout_callback(St#st.cs, SvcName, TransID),
- { timeout, St };
%%
%% No more routes to try, or no routes found at all
@@ -412,25 +429,43 @@ queue_message(SvcName,
%% Stash the message in the unknown datalinvariant of the service
%% and opportunistically send it if the service
queue_message(SvcName,
- _TransID,
+ TransID,
[ ],
- _MessageTimeout,
- _RelativeTimeout,
- _Parameters,
- _Signature,
- _Certificate, St) ->
-
- %% FIXME: Handle route failure
- ?notice("sched:queue_message(): Ran out of routes to try for ~p", [SvcName]),
- { route_failed, St };
+ Timeout,
+ Parameters,
+ Signature,
+ Certificate,
+ St) ->
+ TOut = calc_relative_tout(Timeout),
+ ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.",
+ [ SvcName, TOut / 1000.0]),
+ %% Stash in Service / orphaned
+ SvcRec = find_or_create_service(SvcName, orphaned, St),
+
+ store_message(SvcRec,
+ orphaned,
+ #message {
+ transaction_id = TransID,
+ service = SvcName,
+ timeout = Timeout,
+ data_link = undefined,
+ protocol = undefined,
+ routes = [],
+ timeout_tref = 0,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ },
+ TOut),
+ {ok, St};
+
%% Try to queue message
queue_message(SvcName,
TransID,
- [ { { ProtoMod, ProtoOpt }, { DLMod, DLOp } } | RemainingRoutes ],
+ [ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ],
Timeout,
- RelativeTimeout,
Parameters,
Signature,
Certificate,
@@ -443,81 +478,69 @@ queue_message(SvcName,
SvcRec = find_or_create_service(SvcName, DLMod, St),
+
%%
%% Bring up the relevant data link for the given route.
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
%%
- case DLMod:setup_data_link(St#st.cs, SvcName, DLOp) of
+ case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of
[ ok, DLTimeout ] ->
-
- TOut = select_timeout(RelativeTimeout, DLTimeout),
- %% Setup a timeout that triggers on whatever
- %% comes first of the message's general timeout
- %% or the timeout of the data link bringup
- %%
- ?debug("sched:q(~p:~s): ~p seconds for link up.",
- [ DLMod, SvcName, DLTimeout / 1000.0]),
-
- TRef = erlang:send_after(TOut, self(),
- { rvi_message_timeout, SvcName, DLMod, TransID }),
-
-
- %% Setup a message to be added to the service / dl table.
- Msg = #message {
- transaction_id = TransID,
- data_link = { DLMod, DLOp },
- routes = RemainingRoutes,
- protocol = { ProtoMod, ProtoOpt },
- timeout = Timeout,
- timeout_tref = TRef,
- parameters = Parameters,
- signature = Signature,
- certificate = Certificate
- },
-
- %% Add to ets table
- ets:insert(SvcRec#service.messages_tid, Msg),
+ TOut = select_timeout(calc_relative_tout(Timeout), DLTimeout),
+ ?debug("sched:q(~p:~s): ~p seconds to compe up.",
+ [ DLMod, SvcName, TOut / 1000.0]),
+
+ store_message(SvcRec,
+ DLMod,
+ #message {
+ transaction_id = TransID,
+ service = SvcName,
+ timeout = Timeout,
+ data_link = { DLMod, DLOpt },
+ protocol = { ProtoMod, ProtoOpt },
+ routes = RemainingRoutes,
+ timeout_tref = 0,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ },
+ TOut),
{ok, St};
- [ already_connected, DLTimeout] ->
+ [ already_connected, _] ->
%% The service may already be available, give it a shot.
- ?debug("sched:q(~p:~s): already up ~p seconds to send.",
- [ DLMod, SvcName, DLTimeout / 1000.0]),
-
- TOut = select_timeout(RelativeTimeout, DLTimeout),
-
- TRef = erlang:send_after(TOut, self(),
- { rvi_message_timeout, SvcName, DLMod, TransID }),
-
-
- Msg = #message {
- transaction_id = TransID,
- data_link = { DLMod, DLOp },
- routes = RemainingRoutes,
- protocol = { ProtoMod, ProtoOpt },
- timeout = Timeout,
- timeout_tref = TRef,
- parameters = Parameters,
- signature = Signature,
- certificate = Certificate
- },
-
- %% Add to ets table
- ets:insert(SvcRec#service.messages_tid, Msg),
- { _, NSt } = try_sending_messages(SvcRec, St) ,
+ ?debug("sched:q(~p:~s): already up. Sending.",
+ [ DLMod, SvcName]),
+
+ %% Will re-queue message if cannot send.
+ { _, NSt } =
+ send_message(DLMod, DLOpt,
+ ProtoMod, ProtoOpt,
+ #message {
+ transaction_id = TransID,
+ timeout = Timeout,
+ service = SvcName,
+ data_link = { DLMod, DLOpt },
+ protocol = { ProtoMod, ProtoOpt },
+ routes = RemainingRoutes,
+ timeout_tref = 0,
+ parameters = Parameters,
+ signature = Signature,
+ certificate = Certificate
+ }, St),
{ ok, NSt };
- %% We failed with this route. Try the next one
- [ error, _Reason] ->
- ?debug("sched:q(~p:~s): failed to setup.", [ DLMod, SvcName]),
+ %%
+ %% We failed to setup a data link. Try the next route.
+ %%
+ [ Err, _Reason] ->
+ ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]),
queue_message(SvcName,
TransID,
+ RemainingRoutes,
Timeout,
- calc_relative_tout(Timeout),
- RemainingRoutes,
Parameters,
Signature,
Certificate,
@@ -526,99 +549,227 @@ queue_message(SvcName,
-
+%% Send messages to locally connected service
+send_message(local, _, _, _, Msg, St) ->
+
+ ?debug("sched:send_msg(local:~s). WIll send to local",
+ [ Msg#message.service]),
+
+ service_edge_rpc:handle_remote_message(St#st.cs,
+ Msg#message.service,
+ Msg#message.timeout,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate),
+ {ok, St};
+
+%% Forward message to protocol.
+send_message(DataLinkMod, DataLinkOpts,
+ ProtoMod, ProtoOpts,
+ Msg, St) ->
+
+ ?debug("sched:send_msg(): ~p:~p:~p",
+ [ProtoMod, DataLinkMod, Msg#message.service]),
+
+ %% Send off message to the correct protocol module
+ case ProtoMod:send_message(
+ St#st.cs,
+ Msg#message.service,
+ Msg#message.timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate) of
+
+ %% Success
+ [ok] ->
+ %% Send the rest of the messages associated with this service/dl
+ {ok, St};
+
+ %% Failed
+ [Err] ->
+ ?info("sched:send_msg(): Failed: ~p:~p:~p -> ~p",
+ [ProtoMod, DataLinkMod, Msg#message.service, Err]),
+
+ %% Requeue this message with the next route
+ queue_message(Msg#message.service,
+ Msg#message.transaction_id,
+ Msg#message.routes,
+ Msg#message.timeout,
+ Msg#message.parameters,
+ Msg#message.signature,
+ Msg#message.certificate,
+ St)
+ end.
%% The service is not available
-try_sending_messages(#service {
- key = { SvcName, _ },
- available = unavailable,
- messages_tid = _Tid } = _SvcRec, St) ->
+send_queued_messages(#service {
+ key = { SvcName, _ },
+ available = unavailable,
+ messages_tid = _Tid } = _SvcRec, St) ->
- ?info("sched:try_send(): SvcName: ~p: Not available", [SvcName]),
+ ?info("sched:send(): SvcName: ~p: Not available", [SvcName]),
{ not_available, St };
-try_sending_messages(#service {
+send_queued_messages(#service {
key = { SvcName, DataLinkMod },
available = available,
messages_tid = Tid } = SvcRec, St) ->
- ?debug("sched:try_send(): Service: ~p:~s", [DataLinkMod, SvcName]),
+ ?debug("sched:send(): Service: ~p:~s", [DataLinkMod, SvcName]),
%% Extract the first message of the queue.
case first_service_message(SvcRec) of
empty ->
- ?debug("sched:try_send(): Nothing to send"),
+ ?debug("sched:send(): Nothing to send"),
{ ok, St };
yanked ->
- ?info("sched:try_send(): Message was yanked while trying to send: ~p",
+ ?info("sched:send(): Message was yanked while trying to send: ~p",
[SvcRec#service.key]),
{ ok, St};
- Msg ->
+ Msg->
%% Wipe from ets table and cancel timer
ets:delete(Tid, Msg#message.transaction_id),
+ erlang:cancel_timer(Msg#message.timeout_tref),
+ %% Extract the protocol / data link to use
+ { DataLink, DataLinkOpts } = Msg#message.data_link,
+ { Proto, ProtoOpts } = Msg#message.protocol,
+ { _, NSt} = send_message(DataLink, DataLinkOpts,
+ Proto, ProtoOpts,
+ Msg, St),
+
+ send_queued_messages(SvcRec, NSt)
+ end.
+
+
+%% Check in the orphaned queue for our locally connected service,
+%% where messages are placed while waiting for a final timeout after
+%% all routes have failed.
+%%
+%% If messages exist in the orphaned queue for SvcName,
+%% we will try to send them using the DataLink/Protocol combo
+%% provided on the command line.
+send_orphaned_messages(SvcName, local, St) ->
+
+ %% See if there is an orphaned queue for SvcName
+ case find_service(SvcName, orphaned, St) of
+ not_found ->
+ ?debug("sched:send_orph(~p:~p): No orphaned messages waiting",
+ [ SvcName]),
+ St;
+
+ %% We have orphaned messages for the service and
+ %% we have at least one protocol that we can use over
+ %% the given data link
+ %% Start chugging out messages
+ SvcRec ->
+ send_orphaned_messages_(undefined, undefined,
+ local, undefined,
+ SvcRec, St)
+ end;
+
+%% Check in the orphaned queue for the given service, where messages
+%% are placed while waiting for a final timeout after all routes
+%% have failed.
+%%
+%% If messages exist in the orphaned queue for SvcName,
+%% we will try to send them using the DataLink/Protocol combo
+%% provided on the command line.
+send_orphaned_messages(SvcName, DataLinkMod, St) ->
+
+ %% See if there is an orphaned queue for SvcName
+ case { find_service(SvcName, orphaned, St),
+ rvi_routing:get_service_protocols(SvcName, DataLinkMod) } of
+ { not_found, _ } -> %% No orphaned messages destined for the service
+ ?debug("sched:send_orph(~p:~p): No orphaned messages waiting",
+ [DataLinkMod, SvcName]),
+ St;
+
+ %% We have messages waiting for the service, but no protocol has been configured
+ %% for transmitting them over the given data link module
+ { _, [] } ->
+ ?debug("sched:send_orph(~p:~p): No protocol configured. Skipped",
+ [DataLinkMod, SvcName]),
+ St;
+
+ %% We have orphaned messages for the service and
+ %% we have at least one protocol that we can use over
+ %% the given data link
+ %% Start chugging out messages
+ { SvcRec, [{ Proto, ProtoOpts, DataLinkOpts } | _]} ->
+ send_orphaned_messages_(Proto, ProtoOpts,
+ DataLinkMod, DataLinkOpts,
+ SvcRec, St)
+ end.
+
+
+
+send_orphaned_messages_(Protocol, ProtocolOpts,
+ DataLinkMod, DataLinkOpts,
+ #service {
+ key = { SvcName, _ },
+ messages_tid = Tid } = SvcRec, St) ->
+
+
+ %% Extract the first message of the queue.
+ case first_service_message(SvcRec) of
+ empty ->
+ ?debug("sched:send_orph(~p:~p): Nothing to send",
+ [DataLinkMod, SvcName ]),
+ St;
+ yanked ->
+ ?info("sched:send_orph(~p:~p): Message was yanked while processing",
+ [DataLinkMod, SvcName ]),
+ send_orphaned_messages_(DataLinkMod, DataLinkOpts,
+ Protocol, ProtocolOpts,
+ SvcRec, St);
+
+ Msg->
+ %% Wipe from ets table and cancel timer
+ ets:delete(Tid, Msg#message.transaction_id),
erlang:cancel_timer(Msg#message.timeout_tref),
- %% Forward to protocol.
- ?debug("sched:try_send(): DataLink: ~p", [Msg#message.data_link]),
- { _, DataLinkOpts } = Msg#message.data_link,
-
- ?debug("sched:try_send(): Proto: ~p", [Msg#message.protocol]),
- { ProtoMod, ProtoOpts } = Msg#message.protocol,
-
- %% Send off message to the correct protocol module
- case ProtoMod:send_message(
- St#st.cs,
- SvcName,
- Msg#message.timeout,
- ProtoOpts,
- DataLinkMod,
- DataLinkOpts,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate) of
-
- %% Success
- [ok] ->
- %% Send the rest of the messages associated with this service/dl
- try_sending_messages(SvcRec, St);
-
- %% Failed
- [Err] ->
- ?info("sched:try_send(): No send: ~p:~p:~p -> ~p : ~p",
- [ProtoMod, DataLinkMod, SvcName, Err]),
-
- %% Requeue this message with the next route
- { _, St1} = queue_message(SvcName,
- Msg#message.transaction_id,
- Msg#message.timeout,
- calc_relative_tout(Msg#message.timeout),
- Msg#message.routes,
- Msg#message.parameters,
- Msg#message.signature,
- Msg#message.certificate,
- St),
+ ?debug("sched:send_orph(~p:~p): Sending Trans(~p) Pr(~p) PrOp(~p) DlOp(~p)",
+ [DataLinkMod, SvcName,
+ Msg#message.transaction_id,
+ Protocol, ProtocolOpts, DataLinkOpts]),
- %% Send the rest of the messgages targeting
- %% the same service through the same data link
- %% If they all fail due to some data link error,
- %% they will all be requeued to the next route.
- try_sending_messages(SvcRec, St1)
- end
+ { _, NSt} = send_message( DataLinkMod, DataLinkOpts,
+ Protocol, ProtocolOpts,
+ Msg, St),
+
+ send_orphaned_messages_(DataLinkMod, DataLinkOpts,
+ Protocol, ProtocolOpts,
+ SvcRec, NSt)
end.
-find_or_create_service(SvcName, DataLinkMod, #st { services_tid = SvcTid } = St) ->
+
+find_service(SvcName, DataLinkMod, #st { services_tid = SvcTid }) ->
?debug("sched:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]),
case ets:lookup(SvcTid, { SvcName, DataLinkMod }) of
- [] -> %% The given service does not exist, create it.
+ [] -> %% The given service does not exist, return not found
+ not_found;
+
+ [ SvcRec ] -> %%
+ SvcRec
+ end.
+
+find_or_create_service(SvcName, DataLinkMod, St) ->
+ ?debug("sched:find_or_create_service(): ~p:~p", [ DataLinkMod, SvcName]),
+
+ case find_service(SvcName, DataLinkMod, St) of
+ not_found -> %% The given service does not exist, create it.
?debug("sched:find_or_create_service(): Creating new ~p", [ SvcName]),
update_service(SvcName, DataLinkMod, unavailable, St);
- [ SvcRec ] ->
+ SvcRec ->
%% Update the network address, if it differs, and return
%% the new service / State as {ok, NSvcRec, false, NSt}
?debug("sched:find_or_create_service(): Updating existing ~p", [ SvcName]),
@@ -757,5 +908,3 @@ select_timeout(TimeOut1, TimeOut2) ->
{ false, false } -> min(TimeOut1, TimeOut2)
end.
-
-
diff --git a/rvi_backend.config b/rvi_backend.config
index 577415b..480a3b8 100644
--- a/rvi_backend.config
+++ b/rvi_backend.config
@@ -32,8 +32,8 @@
rvi,
rvi_common,
- service_edge,
service_discovery,
+ service_edge,
authorize,
schedule,
dlink_tcp,
diff --git a/rvi_sample.config b/rvi_sample.config
index 085c14e..8a678c2 100644
--- a/rvi_sample.config
+++ b/rvi_sample.config
@@ -43,8 +43,8 @@
%%
rvi,
rvi_common,
- service_edge,
service_discovery,
+ service_edge,
authorize,
schedule,
dlink_tcp,
diff --git a/test/test_plan.md b/test/test_plan.md
index e5634d8..cf62b82 100644
--- a/test/test_plan.md
+++ b/test/test_plan.md
@@ -15,7 +15,9 @@ While we are implementing an automated test suite, we will start with a simple m
## SERVICE INVOCATION
-## INVOCATION OF NON-EXISTENT SERVICE
+## INVOCATION OF NON-EXISTENT SERVICE
+Ensure that message is forwarded if target service becomes available
+before message times out.
## STRESS TEST
diff --git a/tizen.config b/tizen.config
index 36c7deb..b0ffff8 100644
--- a/tizen.config
+++ b/tizen.config
@@ -43,8 +43,8 @@
%%
rvi,
rvi_common,
- service_edge,
service_discovery,
+ service_edge,
authorize,
schedule,
dlink_tcp,
@@ -330,7 +330,7 @@
{ bert_rpc_server, [ { port, 8807 }]}
%% Setup persistent connections
- %% { persistent_connections, [ "38.129.64.13:8807" ]}
+ { persistent_connections, [ "38.129.64.13:8807" ]}
]
}
]