diff options
Diffstat (limited to 'components/schedule/src/schedule_rpc.erl')
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 118 |
1 files changed, 41 insertions, 77 deletions
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index b66bdd1..0faddba 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -13,7 +13,7 @@ -include_lib("rvi_common/include/rvi_common.hrl"). %% API -export([start_link/0]). --export([schedule_message/5]). +-export([schedule_message/4]). %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior @@ -63,8 +63,8 @@ protocol, %% Protocol to use. { Module Opts } routes, %% Routes retrieved for this timeout_tref, %% Reference to erlang timer associated with this message. - parameters, - signature + log_id, + parameters }). @@ -123,15 +123,13 @@ start_json_server() -> schedule_message(CompSpec, SvcName, Timeout, - Parameters, - Signature) -> + Parameters) -> rvi_common:request(schedule, ?MODULE, schedule_message, [{ service, SvcName }, { timeout, Timeout }, - { parameters, Parameters }, - { signature, Signature }], + { parameters, Parameters }], [status, transaction_id], CompSpec). @@ -158,18 +156,17 @@ handle_rpc(<<"schedule_message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), - {ok, Signature} = rvi_common:get_json_element(["signature"], Args), + LogId = rvi_common:get_json_log_id(Args), ?debug("schedule_rpc:schedule_request(): service: ~p", [ SvcName]), ?debug("schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]), %% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]), - ?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]), [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message, [ SvcName, Timeout, - {struct, Parameters}, - Signature]}), + Parameters, + LogId ]}), {ok, [ { status, rvi_common:json_rpc_status(ok)}, { transaction_id, TransID } ] }; @@ -184,19 +181,23 @@ handle_rpc(Other, _Args) -> handle_notification("service_available", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), + LogId = rvi_common:get_json_log_id(Args), gen_server:cast(?SERVER, { rvi, service_available, [ SvcName, - list_to_existing_atom(DataLinkModule) ]}), + list_to_existing_atom(DataLinkModule), + LogId ]}), ok; handle_notification("service_unavailable", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), + LogId = rvi_common:get_json_log_id(Args), gen_server:cast(?SERVER, { rvi, service_unavailable, [ SvcName, - list_to_atom(DataLinkModule) ]}), + list_to_atom(DataLinkModule), + LogId ]}), ok; @@ -207,25 +208,24 @@ handle_notification(Other, _Args) -> handle_call( { rvi, schedule_message, [SvcName, Timeout, - Parameters, - Signature] }, _From, St) -> + Parameters | LogId] }, _From, St) -> ?debug("sched:sched_msg(): service: ~p", [SvcName]), ?debug("sched:sched_msg(): timeout: ~p", [Timeout]), ?debug("sched:sched_msg(): parameters: ~p", [Parameters]), - ?debug("sched:sched_msg(): signature: ~p", [Signature]), %%?debug("sched:sched_msg(): St: ~p", [St]), %% Create a transaction ID { TransID, NSt1} = create_transaction_id(St), - + log(LogId, "queue: tid=~w", [TransID]), %% Queue the message - {_, NSt2 }= queue_message(SvcName, - TransID, + Msg = #message{transaction_id = TransID, + service = SvcName, + timeout = Timeout, + parameters = Parameters, + log_id = LogId}, + {_, NSt2 }= queue_message(Msg, rvi_routing:get_service_routes(SvcName), %% Can be [] (no route) - Timeout, - Parameters, - Signature, NSt1), { reply, [ok, TransID], NSt2 }; @@ -334,12 +334,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, %% Try to requeue message { _Res, NSt } = - queue_message(SvcName, - Msg#message.transaction_id, + queue_message(Msg, Msg#message.routes, - Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature, St), {noreply, NSt} end; @@ -418,13 +414,7 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) -> %% %% Stash the message in the unknown datalinvariant of the service %% and opportunistically send it if the service -queue_message(SvcName, - TransID, - [ ], - Timeout, - Parameters, - Signature, - St) -> +queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [], St) -> TOut = calc_relative_tout(Timeout), ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.", @@ -434,33 +424,21 @@ queue_message(SvcName, store_message(SvcRec, orphaned, - #message { - transaction_id = TransID, - service = SvcName, - timeout = Timeout, + Msg#message { data_link = undefined, protocol = undefined, routes = [], - timeout_tref = 0, - parameters = Parameters, - signature = Signature + timeout_tref = 0 }, TOut), {ok, St}; - %% Try to queue message -queue_message(SvcName, - TransID, +queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ], - Timeout, - Parameters, - Signature, St) -> ?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]), - %%?debug("sched:q(~p:~s): parameters: ~p", [DLMod, SvcName, Parameters]), - %%?debug("sched:q(~p:~s): signature: ~p", [DLMod, SvcName, Signature]), SvcRec = find_or_create_service(SvcName, DLMod, St), @@ -470,16 +448,11 @@ queue_message(SvcName, %% Once up, the data link will invoke service_availble() %% to indicate that the service is available for the given DL. %% - Msg = #message { - transaction_id = TransID, - service = SvcName, - timeout = Timeout, + Msg1 = Msg#message { data_link = { DLMod, DLOpt }, protocol = { ProtoMod, ProtoOpt }, routes = RemainingRoutes, - timeout_tref = 0, - parameters = Parameters, - signature = Signature + timeout_tref = 0 }, case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of @@ -489,7 +462,7 @@ queue_message(SvcName, ?debug("sched:q(~p:~s): ~p seconds to compe up.", [ DLMod, SvcName, TOut / 1000.0]), - store_message(SvcRec, DLMod, Msg, TOut), + store_message(SvcRec, DLMod, Msg1, TOut), {ok, St}; [ already_connected, _] -> @@ -499,7 +472,7 @@ queue_message(SvcName, %% Will re-queue message if cannot send. { _, NSt } = - send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St), + send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg1, St), { ok, NSt }; @@ -508,13 +481,7 @@ queue_message(SvcName, %% [ Err, _Reason] -> ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]), - queue_message(SvcName, - TransID, - RemainingRoutes, - Timeout, - Parameters, - Signature, - St) + queue_message(Msg1, RemainingRoutes, St) end. @@ -528,15 +495,13 @@ send_message(local, _, _, _, Msg, St) -> service_edge_rpc:handle_remote_message(St#st.cs, Msg#message.service, Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature), + Msg#message.parameters), {ok, St}; %% Forward message to protocol. send_message(DataLinkMod, DataLinkOpts, ProtoMod, ProtoOpts, Msg, St) -> - ?debug("sched:send_msg(): ~p:~p:~p", [ProtoMod, DataLinkMod, Msg#message.service]), @@ -549,8 +514,7 @@ send_message(DataLinkMod, DataLinkOpts, ProtoOpts, DataLinkMod, DataLinkOpts, - Msg#message.parameters, - Msg#message.signature) of + Msg#message.parameters) of %% Success [ok] -> @@ -563,15 +527,10 @@ send_message(DataLinkMod, DataLinkOpts, [ProtoMod, DataLinkMod, Msg#message.service, Err]), %% Requeue this message with the next route - queue_message(Msg#message.service, - Msg#message.transaction_id, - Msg#message.routes, - Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature, - St) + queue_message(Msg, Msg#message.routes, St) end. + %% The service is not available send_queued_messages(#service { key = { SvcName, _ }, @@ -869,3 +828,8 @@ select_timeout(TimeOut1, TimeOut2) -> { false, false } -> min(TimeOut1, TimeOut2) end. + +log([ID], Fmt, Args) -> + rvi_log:log(ID, <<"schedule">>, rvi_log:format(Fmt, Args)); +log(_, _, _) -> + ok. |