summaryrefslogtreecommitdiff
path: root/components/service_edge
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
committerUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
commit6cfeffca9f8e93e45dd885702a77896e2a1d0951 (patch)
tree620e2dd9006b52df7129d135fa7256d793571df1 /components/service_edge
parent7d098a34b25704dbaa8bea0217ca6b7be37a0e48 (diff)
downloadrvi_core-6cfeffca9f8e93e45dd885702a77896e2a1d0951.tar.gz
new protocol & setup scripts
Diffstat (limited to 'components/service_edge')
-rw-r--r--components/service_edge/src/service_edge_rpc.erl55
1 files changed, 39 insertions, 16 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index a7c75e0..576dc13 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -236,7 +236,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
-
+ LogId = log_id_json_tail(Params ++ Parameters),
?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]),
?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]),
@@ -244,7 +244,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
case gen_server:call(
?SERVER,
{rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}) of
+ [ SvcName, Timeout, Parameters | LogId ]}) of
[not_found] ->
{ok, [{status, rvi_common:json(not_found)}]};
[Res, TID] ->
@@ -318,9 +318,9 @@ handle_rpc(<<"message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ LogId = log_id_json_tail(Args ++ Parameters),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}),
-
+ [ SvcName, Timeout, Parameters | LogId]}),
{ok, [ { status, rvi_common:json_rpc_status(Res) },
{ transaction_id, TID },
{ method, <<"message">>}
@@ -396,12 +396,12 @@ handle_notification(Other, _Args) ->
%% the only calls invoked by other components, and not the locally
%% connected services that uses the same HTTP port to transmit their
%% register_service, and message calls.
-handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
+handle_call({ rvi, register_local_service, [SvcName, URL | T] }, _From, St) ->
?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]),
?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
FullSvcName = rvi_common:local_service_to_string(SvcName),
- CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs),
+ CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs),
?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]),
ets:insert(?SERVICE_TABLE, #service_entry {
@@ -416,7 +416,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
%% Return ok.
{ reply, [ ok, FullSvcName ], St };
-handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
+handle_call({ rvi, unregister_local_service, [SvcName | T] }, _From, St) ->
?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [SvcName]),
@@ -424,7 +424,7 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
- CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs),
+ CS = start_log(T, "unreg local service: ~s", [SvcName], St#st.cs),
service_discovery_rpc:unregister_services(CS, [SvcName], local),
%% Return ok.
@@ -444,11 +444,11 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
handle_call({ rvi, handle_local_message,
- [SvcName, TimeoutArg, Parameters] }, _From, St) ->
+ [SvcName, TimeoutArg, Parameters | Tail] }, _From, St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
- CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs),
+ CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs),
%%
%% Authorize local message and retrieve a certificate / signature
%% that will be accepted by the receiving node that will deliver
@@ -468,7 +468,7 @@ handle_call({ rvi, handle_local_message,
%% If the timeout is more than 24 hrs old when parsed as unix time,
%% then we are looking at a relative msec timeout. Convert accordingly
%%
- { Mega, Sec, _Micro } = now(),
+ { Mega, Sec, _Micro } = os:timestamp(),
Now = Mega * 1000000 + Sec,
Timeout =
@@ -746,11 +746,34 @@ announce_service_availability(Available, SvcName) ->
end
end, BlockURLs, ?SERVICE_TABLE).
+start_log(Info, Fmt, Args, CS) ->
+ ?debug("start_log(~p,~p,~p,~p)", [Info,Fmt,Args,CS]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Info) of
+ {ok, ID} ->
+ start_log_(ID, Fmt, Args, CS);
+ {error, _} ->
+ start_log(Fmt, Args, CS)
+ end.
+
+start_log(Fmt, Args, CS) ->
+ start_log_(rvi_log:new_id(pfx()), Fmt, Args, CS).
+
+start_log_(ID, Fmt, Args, CS) ->
+ CS1 = rvi_common:set_value(rvi_log_id, ID, CS),
+ log(Fmt, Args, CS1),
+ CS1.
-start_log(Pfx, Fmt, Args, CS) ->
- LogTID = rvi_log:new_id(Pfx),
- rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)),
- rvi_common:set_value(rvi_log_id, LogTID, CS).
+pfx() ->
+ <<"svc_edge">>.
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS).
+ rvi_log:flog(Fmt, Args, pfx(), CS).
+
+log_id_json_tail(Args) ->
+ ?debug("Args = ~p", [Args]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Args) of
+ {ok, ID} ->
+ [{<<"rvi_log_id">>, ID}];
+ {error, _} ->
+ []
+ end.