summaryrefslogtreecommitdiff
path: root/components/schedule
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-11-05 17:29:22 +0100
committerUlf Wiger <ulf@feuerlabs.com>2015-11-20 13:47:02 -0800
commitca1f0abd9f0f1478da8380bff28cb25aada34c1d (patch)
tree814c4a3c17b247cc300f62673771dad5a5c6ed77 /components/schedule
parent1b44c2448344a10ae63904a796b6211c40a3f212 (diff)
downloadrvi_core-ca1f0abd9f0f1478da8380bff28cb25aada34c1d.tar.gz
All tests (incl remote method inv) pass
- No signatures on messages (dlink_tls) - dlink_bt wasn't actually tested (test case passed erroneously) - added proto_msgpack component - fixed sneaky bug in 'setup'
Diffstat (limited to 'components/schedule')
-rw-r--r--components/schedule/src/rvi_routing.erl9
-rw-r--r--components/schedule/src/schedule_rpc.erl118
2 files changed, 49 insertions, 78 deletions
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index 67b68eb..9d7917e 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -235,7 +235,14 @@ normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) ->
normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
normalize_routes_([ {Pr, DL} | Rem ], Acc) ->
- normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]).
+ normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]);
+normalize_routes_([H|T], Acc) ->
+ ?error("Unrecognized routing rule: ~p", [H]),
+ normalize_routes_(T, Acc);
+normalize_routes_(Other, Acc) ->
+ ?error("Unrecognized routing entry (expected list): ~p", [Other]),
+ lists:reverse(Acc).
+
find_protocols_(_DataLink, [], Acc ) ->
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index b66bdd1..0faddba 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -13,7 +13,7 @@
-include_lib("rvi_common/include/rvi_common.hrl").
%% API
-export([start_link/0]).
--export([schedule_message/5]).
+-export([schedule_message/4]).
%% Invoked by service discovery
%% FIXME: Should be rvi_service_discovery behavior
@@ -63,8 +63,8 @@
protocol, %% Protocol to use. { Module Opts }
routes, %% Routes retrieved for this
timeout_tref, %% Reference to erlang timer associated with this message.
- parameters,
- signature
+ log_id,
+ parameters
}).
@@ -123,15 +123,13 @@ start_json_server() ->
schedule_message(CompSpec,
SvcName,
Timeout,
- Parameters,
- Signature) ->
+ Parameters) ->
rvi_common:request(schedule, ?MODULE,
schedule_message,
[{ service, SvcName },
{ timeout, Timeout },
- { parameters, Parameters },
- { signature, Signature }],
+ { parameters, Parameters }],
[status, transaction_id], CompSpec).
@@ -158,18 +156,17 @@ handle_rpc(<<"schedule_message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
- {ok, Signature} = rvi_common:get_json_element(["signature"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
?debug("schedule_rpc:schedule_request(): service: ~p", [ SvcName]),
?debug("schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]),
%% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]),
- ?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]),
[ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message,
[ SvcName,
Timeout,
- {struct, Parameters},
- Signature]}),
+ Parameters,
+ LogId ]}),
{ok, [ { status, rvi_common:json_rpc_status(ok)},
{ transaction_id, TransID } ] };
@@ -184,19 +181,23 @@ handle_rpc(Other, _Args) ->
handle_notification("service_available", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
gen_server:cast(?SERVER, { rvi, service_available,
[ SvcName,
- list_to_existing_atom(DataLinkModule) ]}),
+ list_to_existing_atom(DataLinkModule),
+ LogId ]}),
ok;
handle_notification("service_unavailable", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
gen_server:cast(?SERVER, { rvi, service_unavailable,
[ SvcName,
- list_to_atom(DataLinkModule) ]}),
+ list_to_atom(DataLinkModule),
+ LogId ]}),
ok;
@@ -207,25 +208,24 @@ handle_notification(Other, _Args) ->
handle_call( { rvi, schedule_message,
[SvcName,
Timeout,
- Parameters,
- Signature] }, _From, St) ->
+ Parameters | LogId] }, _From, St) ->
?debug("sched:sched_msg(): service: ~p", [SvcName]),
?debug("sched:sched_msg(): timeout: ~p", [Timeout]),
?debug("sched:sched_msg(): parameters: ~p", [Parameters]),
- ?debug("sched:sched_msg(): signature: ~p", [Signature]),
%%?debug("sched:sched_msg(): St: ~p", [St]),
%% Create a transaction ID
{ TransID, NSt1} = create_transaction_id(St),
-
+ log(LogId, "queue: tid=~w", [TransID]),
%% Queue the message
- {_, NSt2 }= queue_message(SvcName,
- TransID,
+ Msg = #message{transaction_id = TransID,
+ service = SvcName,
+ timeout = Timeout,
+ parameters = Parameters,
+ log_id = LogId},
+ {_, NSt2 }= queue_message(Msg,
rvi_routing:get_service_routes(SvcName), %% Can be [] (no route)
- Timeout,
- Parameters,
- Signature,
NSt1),
{ reply, [ok, TransID], NSt2 };
@@ -334,12 +334,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
%% Try to requeue message
{ _Res, NSt } =
- queue_message(SvcName,
- Msg#message.transaction_id,
+ queue_message(Msg,
Msg#message.routes,
- Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature,
St),
{noreply, NSt}
end;
@@ -418,13 +414,7 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) ->
%%
%% Stash the message in the unknown datalinvariant of the service
%% and opportunistically send it if the service
-queue_message(SvcName,
- TransID,
- [ ],
- Timeout,
- Parameters,
- Signature,
- St) ->
+queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [], St) ->
TOut = calc_relative_tout(Timeout),
?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.",
@@ -434,33 +424,21 @@ queue_message(SvcName,
store_message(SvcRec,
orphaned,
- #message {
- transaction_id = TransID,
- service = SvcName,
- timeout = Timeout,
+ Msg#message {
data_link = undefined,
protocol = undefined,
routes = [],
- timeout_tref = 0,
- parameters = Parameters,
- signature = Signature
+ timeout_tref = 0
},
TOut),
{ok, St};
-
%% Try to queue message
-queue_message(SvcName,
- TransID,
+queue_message(#message{service = SvcName, timeout = Timeout} = Msg,
[ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ],
- Timeout,
- Parameters,
- Signature,
St) ->
?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]),
- %%?debug("sched:q(~p:~s): parameters: ~p", [DLMod, SvcName, Parameters]),
- %%?debug("sched:q(~p:~s): signature: ~p", [DLMod, SvcName, Signature]),
SvcRec = find_or_create_service(SvcName, DLMod, St),
@@ -470,16 +448,11 @@ queue_message(SvcName,
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
%%
- Msg = #message {
- transaction_id = TransID,
- service = SvcName,
- timeout = Timeout,
+ Msg1 = Msg#message {
data_link = { DLMod, DLOpt },
protocol = { ProtoMod, ProtoOpt },
routes = RemainingRoutes,
- timeout_tref = 0,
- parameters = Parameters,
- signature = Signature
+ timeout_tref = 0
},
case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of
@@ -489,7 +462,7 @@ queue_message(SvcName,
?debug("sched:q(~p:~s): ~p seconds to compe up.",
[ DLMod, SvcName, TOut / 1000.0]),
- store_message(SvcRec, DLMod, Msg, TOut),
+ store_message(SvcRec, DLMod, Msg1, TOut),
{ok, St};
[ already_connected, _] ->
@@ -499,7 +472,7 @@ queue_message(SvcName,
%% Will re-queue message if cannot send.
{ _, NSt } =
- send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St),
+ send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg1, St),
{ ok, NSt };
@@ -508,13 +481,7 @@ queue_message(SvcName,
%%
[ Err, _Reason] ->
?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]),
- queue_message(SvcName,
- TransID,
- RemainingRoutes,
- Timeout,
- Parameters,
- Signature,
- St)
+ queue_message(Msg1, RemainingRoutes, St)
end.
@@ -528,15 +495,13 @@ send_message(local, _, _, _, Msg, St) ->
service_edge_rpc:handle_remote_message(St#st.cs,
Msg#message.service,
Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature),
+ Msg#message.parameters),
{ok, St};
%% Forward message to protocol.
send_message(DataLinkMod, DataLinkOpts,
ProtoMod, ProtoOpts,
Msg, St) ->
-
?debug("sched:send_msg(): ~p:~p:~p",
[ProtoMod, DataLinkMod, Msg#message.service]),
@@ -549,8 +514,7 @@ send_message(DataLinkMod, DataLinkOpts,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Msg#message.parameters,
- Msg#message.signature) of
+ Msg#message.parameters) of
%% Success
[ok] ->
@@ -563,15 +527,10 @@ send_message(DataLinkMod, DataLinkOpts,
[ProtoMod, DataLinkMod, Msg#message.service, Err]),
%% Requeue this message with the next route
- queue_message(Msg#message.service,
- Msg#message.transaction_id,
- Msg#message.routes,
- Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature,
- St)
+ queue_message(Msg, Msg#message.routes, St)
end.
+
%% The service is not available
send_queued_messages(#service {
key = { SvcName, _ },
@@ -869,3 +828,8 @@ select_timeout(TimeOut1, TimeOut2) ->
{ false, false } -> min(TimeOut1, TimeOut2)
end.
+
+log([ID], Fmt, Args) ->
+ rvi_log:log(ID, <<"schedule">>, rvi_log:format(Fmt, Args));
+log(_, _, _) ->
+ ok.