diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-10-06 12:56:11 +0200 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-11-20 13:43:07 -0800 |
commit | 34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (patch) | |
tree | edfb4bb844c3b90565e7a0bb00f678703d084188 /components/service_edge | |
parent | e6299ff287e767dae71fb47009f9bf4620cc3d78 (diff) | |
download | rvi_core-34aa86b5a2e97650fe6299ccf794d5eb5d052d91.tar.gz |
w.i.p. transition to jsx json codec
Diffstat (limited to 'components/service_edge')
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 372 |
1 files changed, 176 insertions, 196 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index e48c2b7..2cf9c3a 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -% Mozilla Public License, version 2.0. The full text of the +% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -17,9 +17,9 @@ -export([start_link/0]). --export([init/1, - handle_call/3, - handle_cast/2, +-export([init/1, + handle_call/3, + handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -27,7 +27,7 @@ -export([handle_remote_message/6, handle_local_timeout/3]). --export([start_json_server/0, +-export([start_json_server/0, start_websocket/0]). %% Invoked by service discovery @@ -42,10 +42,10 @@ -include_lib("rvi_common/include/rvi_common.hrl"). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). --record(st, { +-record(st, { %% Component specification cs = #component_spec{} }). @@ -67,14 +67,14 @@ start_link() -> init([]) -> CompSpec = rvi_common:get_component_specification(), - URL = rvi_common:get_module_json_rpc_url(service_edge, - ?MODULE, + URL = rvi_common:get_module_json_rpc_url(service_edge, + ?MODULE, CompSpec), ?notice("---- Service Edge URL: ~p", [ URL ]), ?notice("---- Node Service Prefix: ~s", [ rvi_common:local_service_prefix()]), - ets:new(?SERVICE_TABLE, [ set, public, named_table, + ets:new(?SERVICE_TABLE, [ set, public, named_table, { keypos, #service_entry.service }]), service_discovery_rpc:subscribe(CompSpec, ?MODULE), @@ -85,41 +85,41 @@ init([]) -> start_json_server() -> ?debug("service_edge_rpc:start_json_server()"), - case rvi_common:start_json_rpc_server(service_edge, - ?MODULE, + case rvi_common:start_json_rpc_server(service_edge, + ?MODULE, service_edge_sup) of ok -> ok; - Err -> - ?warning("service_edge_rpc:start_json_server(): Failed to start: ~p", + Err -> + ?warning("service_edge_rpc:start_json_server(): Failed to start: ~p", [Err]), Err end. - + start_websocket() -> %% %% Fire up the websocket subsystem, if configured %% - case rvi_common:get_module_config(service_edge, + case rvi_common:get_module_config(service_edge, service_edge_rpc, - websocket, + websocket, not_found, rvi_common:get_component_specification()) of - {ok, not_found} -> + {ok, not_found} -> ?notice("service_edge:init(): No websocket config specified. Will use JSON-RPC/HTTP only."), ok; { ok, WSOpts } -> case proplists:get_value(port, WSOpts, undefined ) of - undefined -> + undefined -> ok; - + Port -> %% FIXME: MONITOR AND RESTART - wse_server:start(Port, - ?MODULE, handle_websocket, undefined, + wse_server:start(Port, + ?MODULE, handle_websocket, undefined, [{type, text} | proplists:delete(port, WSOpts)]), ok end @@ -131,25 +131,25 @@ start_websocket() -> %% Invoked by service_discovery to announce service availability %% Must be handled either as a JSON-RPC call or a gen_server call. service_available(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(service_edge, ?MODULE, - service_available, - [{ service, SvcName }, + rvi_common:notification(service_edge, ?MODULE, + service_available, + [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). service_unavailable(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(service_edge, ?MODULE, - service_unavailable, - [{ service, SvcName }, + rvi_common:notification(service_edge, ?MODULE, + service_unavailable, + [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) -> {IP, Port} = Conn, - rvi_common:notification(service_edge, ?MODULE, - handle_remote_message, + rvi_common:notification(service_edge, ?MODULE, + handle_remote_message, [{ ip, IP }, { port, Port }, - { service, SvcName }, + { service, SvcName }, { timeout, Timeout }, { parameters, Params }, { signature, Signature }], CompSpec). @@ -159,9 +159,9 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) -> %% A message originated from a locally connected service %% has timed out handle_local_timeout(CompSpec, SvcName, TransID) -> - rvi_common:notification(service_edge, ?SERVER, handle_local_timeout, - [ { service, SvcName}, - { transaction_id, TransID} ], + rvi_common:notification(service_edge, ?SERVER, handle_local_timeout, + [ { service, SvcName}, + { transaction_id, TransID} ], CompSpec). @@ -173,10 +173,10 @@ handle_websocket(WSock, Mesg, Arg) -> ?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]), - case handle_ws_json_rpc(WSock, Method, {array,Params}, Arg) of + case handle_ws_json_rpc(WSock, Method, Params, Arg) of ok -> ok; - {ok, Reply} -> - EncReply = binary_to_list(iolist_to_binary(exo_json:encode({struct, [ { id, ID} |Reply]}))), + {ok, Reply} -> + EncReply = rvi_common:term_to_json([{id, ID} |Reply]), ?debug("service_edge_rpc:handle_websocket(~p/~p) reply: ~s", [ WSock, ID, EncReply]), wse_server:send(WSock, list_to_binary(EncReply)) end, @@ -194,26 +194,31 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]), ?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]), - [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ SvcName, Timeout, Parameters]}), - - ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]), - { ok, [ { status, rvi_common:json_rpc_status(Res) }, - { transaction_id, TID}, - { method, "message"} ] }; + case gen_server:call( + ?SERVER, + {rvi, handle_local_message, + [ SvcName, Timeout, Parameters]}) of + [not_found] -> + {ok, [{status, rvi_common:json(not_found)}]}; + [Res, TID] -> + ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]), + { ok, [ { status, rvi_common:json_rpc_status(Res) }, + { transaction_id, TID}, + { method, <<"message">>}] } + end; handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]), - [ok, FullSvcName ] = gen_server:call(?SERVER, - { rvi, - register_local_service, - [ SvcName, + [ok, FullSvcName ] = gen_server:call(?SERVER, + { rvi, + register_local_service, + [ SvcName, "ws:" ++ pid_to_list(WSock)]}), - - { ok, [ { status, rvi_common:json_rpc_status(ok)}, + + { ok, [ { status, rvi_common:json_rpc_status(ok)}, { service, FullSvcName }, - { method, "register_service"}]}; + { method, <<"register_service">>}]}; handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), @@ -226,53 +231,52 @@ handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) -> [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), { ok, [ { status, rvi_common:json_rpc_status(ok)}, { services, Services}, - { method, "get_available_services"}] }. + { method, <<"get_available_services">>}] }. %% Invoked by locally connected services. %% Will always be routed as JSON-RPC since that, and websocket, %% are the only access paths in. %% -handle_rpc("register_service", Args) -> - {ok, SvcName} = rvi_common:get_json_element(["service"], Args), - {ok, URL} = rvi_common:get_json_element(["network_address"], Args), - [ok, FullSvcName ] = gen_server:call(?SERVER, - { rvi, register_local_service, +handle_rpc(<<"register_service">>, Args) -> + {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args), + {ok, URL} = rvi_common:get_json_element([<<"network_address">>], Args), + [ok, FullSvcName ] = gen_server:call(?SERVER, + { rvi, register_local_service, [ SvcName, URL]}), - {ok, [ {status, rvi_common:json_rpc_status(ok) }, + {ok, [ {status, rvi_common:json_rpc_status(ok) }, { service, FullSvcName }, - { method, "register_service"} + { method, <<"register_service">>} ]}; -handle_rpc("unregister_service", Args) -> +handle_rpc(<<"unregister_service">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName]}), {ok, [ { status, rvi_common:json_rpc_status(ok) }, - { method, "unregister_service"} + { method, <<"unregister_service">>} ]}; -handle_rpc("get_available_services", _Args) -> +handle_rpc(<<"get_available_services">>, _Args) -> [ Status, Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), ?debug("get_available_services(): ~p ~p", [ Status, Services ]), {ok, [ { status, rvi_common:json_rpc_status(ok)}, - { services, {array, Services}}, - { method, "get_available_services"} + { services, Services}, + { method, <<"get_available_services">>} ]}; - -handle_rpc("message", Args) -> +handle_rpc(<<"message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), - [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, + [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, [ SvcName, Timeout, Parameters]}), {ok, [ { status, rvi_common:json_rpc_status(Res) }, { transaction_id, TID }, - { method, "message"} + { method, <<"message">>} ]}; handle_rpc(Other, _Args) -> @@ -280,13 +284,13 @@ handle_rpc(Other, _Args) -> {ok,[ { status, rvi_common:json_rpc_status(invalid_command)} ]}. -handle_notification("service_available", Args) -> - {ok, SvcName} = rvi_common:get_json_element(["service"], Args), - {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), +handle_notification(<<"service_available">>, Args) -> + {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args), + {ok, DataLinkModule} = rvi_common:get_json_element([<<"data_link_module">>], Args), ?debug("service_edge:service_available(): service: ~p", [ SvcName]), ?debug("service_edge:service_available(): data_link: ~p", [ DataLinkModule]), - gen_server:cast(?SERVER, { rvi, service_available, + gen_server:cast(?SERVER, { rvi, service_available, [ SvcName, DataLinkModule ]}), ok; @@ -296,24 +300,24 @@ handle_notification("service_unavailable", Args) -> ?debug("service_edge:service_unavailable(): service: ~p", [ SvcName]), ?debug("service_edge:service_unavailable(): data_link: ~p", [ DataLinkModule]), - gen_server:cast(?SERVER, { rvi, service_unavailable, + gen_server:cast(?SERVER, { rvi, service_unavailable, [ SvcName, DataLinkModule ]}), ok; -handle_notification("handle_remote_message", Args) -> +handle_notification(<<"handle_remote_message">>, Args) -> { ok, IP } = rvi_common:get_json_element(["ip"], Args), { ok, Port } = rvi_common:get_json_element(["port"], Args), { ok, SvcName } = rvi_common:get_json_element(["service"], Args), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Args), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Args), { ok, Signature } = rvi_common:get_json_element(["signature"], Args), - gen_server:cast(?SERVER, { rvi, handle_remote_message, - [ + gen_server:cast(?SERVER, { rvi, handle_remote_message, + [ IP, Port, - SvcName, - Timeout, + SvcName, + Timeout, Parameters, Signature ]}), @@ -325,10 +329,10 @@ handle_notification("handle_remote_message", Args) -> %% JSON-RPC entry point %% Called by local exo http server -handle_notification("handle_local_timeout", Args) -> +handle_notification(<<"handle_local_timeout">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, TransactionID} = rvi_common:get_json_element(["transaction_id"], Args), - gen_server:cast(?SERVER, { rvi, handle_local_timeout, + gen_server:cast(?SERVER, { rvi, handle_local_timeout, [ SvcName, TransactionID]}), ok; @@ -361,7 +365,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> %% Register with service discovery, will trigger callback to service_available() %% that forwards the registration to other connected services. service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local), - + %% Return ok. { reply, [ ok, FullSvcName ], St }; @@ -392,11 +396,23 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% [{struct,[{"a","b"}]}] %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] -handle_call({ rvi, handle_local_message, +handle_call({ rvi, handle_local_message, [SvcName, TimeoutArg, Parameters] }, _From, St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), + %% + %% Authorize local message and retrieve a certificate / signature + %% that will be accepted by the receiving node that will deliver + %% the messaage to its locally connected service_name service. + %% + [ok, Signature ] = + authorize_rpc:authorize_local_message( + St#st.cs, SvcName, [{service_name, SvcName}, + {timeout, TimeoutArg}, + %% {parameters, Parameters}, + {parameters, Parameters} + ]), %% %% Slick but ugly. @@ -406,10 +422,10 @@ handle_call({ rvi, handle_local_message, { Mega, Sec, _Micro } = now(), Now = Mega * 1000000 + Sec, - Timeout = + Timeout = case TimeoutArg - Now < -86400 of true -> %% Relative timeout arg. Convert to unix time msec - ?debug("service_edge_rpc:local_msg(): Timeout ~p is relative.", + ?debug("service_edge_rpc:local_msg(): Timeout ~p is relative.", [TimeoutArg]), (Now * 1000) + TimeoutArg; @@ -423,7 +439,7 @@ handle_call({ rvi, handle_local_message, %% that will be accepted by the receiving node that will deliver %% the messaage to its locally connected service_name service. %% - [ok, Signature ] = + [ok, Signature ] = authorize_rpc:authorize_local_message( St#st.cs, SvcName, [{service_name, SvcName}, {timeout, Timeout}, @@ -432,24 +448,26 @@ handle_call({ rvi, handle_local_message, ]), %% - %% Check if this is a local service by trying to resolve its service name. + %% Check if this is a local service by trying to resolve its service name. %% If successful, just forward it to its service_name. - %% - case ets:lookup(?SERVICE_TABLE, SvcName) of + %% + LookupRes = ets:lookup(?SERVICE_TABLE, SvcName), + ?debug("Service LookupRes = ~p", [LookupRes]), + case LookupRes of [ #service_entry { url = URL } ] -> %% SvcName is local. Forward message ?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."), - Res = forward_message_to_local_service(URL, - SvcName, - {struct, Parameters}, + Res = forward_message_to_local_service(URL, + SvcName, + Parameters, St#st.cs), { reply, Res , St}; _ -> %% SvcName is remote %% Ask Schedule the request to resolve the network address ?debug("service_edge_rpc:local_msg(): Service is remote. Scheduling."), - [ _, TID ] = schedule_rpc:schedule_message(St#st.cs, - SvcName, - Timeout, + [ _, TID ] = schedule_rpc:schedule_message(St#st.cs, + SvcName, + Timeout, Parameters, Signature), { reply, [ok, TID ], St} @@ -461,21 +479,21 @@ handle_call(Other, _From, St) -> { reply, [ invalid_command ], St}. - + handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) -> ?debug("service_edge_rpc: Service available: ~p:", [ SvcName]), announce_service_availability(available, SvcName), { noreply, St }; - + handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) -> ?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]), announce_service_availability(unavailable, SvcName), { noreply, St }; - -handle_cast({rvi, handle_remote_message, + +handle_cast({rvi, handle_remote_message, [ IP, Port, @@ -495,19 +513,18 @@ handle_cast({rvi, handle_remote_message, %% Check if this is a local message. case ets:lookup(?SERVICE_TABLE, SvcName) of [ #service_entry { url = URL }] -> %% This is a local message - {struct, Parameters1} = Parameters, + Parameters1 = Parameters, case authorize_rpc:authorize_remote_message( - St#st.cs, - SvcName, + St#st.cs, + SvcName, [{remote_ip, IP}, {remote_port, Port}, {service_name, SvcName}, {timeout, Timeout}, - %% {parameters, [ {struct, Parameters}]}, {parameters, Parameters1}, {signature, Signature}]) of - [ ok ] -> - forward_message_to_local_service(URL, SvcName, + [ ok ] -> + forward_message_to_local_service(URL, SvcName, Parameters, St#st.cs), { noreply, St}; @@ -520,15 +537,15 @@ handle_cast({rvi, handle_remote_message, ?notice("service_entry:remote_msg(): Service Disappeared ~p", [SvcName]), { noreply, St} - + end; handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) -> %% FIXME: Should be forwarded to service. - ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", + ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ", [SvcName, TransactionID]), - + { noreply, St}; handle_cast(Other, St) -> @@ -544,85 +561,50 @@ code_change(_OldVsn, St, _Extra) -> {ok, St}. - - -%% -%% Depending on the format of NetworkAddress -%% Dispatch to websocket or JSON-RPC server -%% FIXME: Should be a pluggable setup where -%% different dispatchers are triggered depending -%% on prefix in NetworkAddress -%% - -flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) -> - flatten_ws_args( List ++ T, Acc); - - -flatten_ws_args([{ Key, Val}| T], Acc ) -> - NKey = case is_atom(Key) of - true -> atom_to_list(Key); - false -> Key - end, - - NVal = flatten_ws_args(Val), - - flatten_ws_args(T, [ NKey, NVal] ++ Acc); - -flatten_ws_args([], Acc) -> - Acc; - - -flatten_ws_args(Other, []) -> - Other; - -flatten_ws_args(Other, Acc) -> - [ Other | Acc ]. - - -flatten_ws_args(Args) -> - flatten_ws_args(Args, []). - - json_rpc_notification(Method, Parameters) -> - iolist_to_binary( - exo_json:encode( - {struct, - [ { "json-rpc", "2.0"}, - { "method", Method }, - { "params", {struct, Parameters}} - ]})). - -dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, - {struct, [{ services, { array, Services}}]} ) -> - ?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p", + jsx:encode( + [{<<"json-rpc">>, <<"2.0">>}, + {<<"method">>, Method}, + {<<"params">>, Parameters} + ]). + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available, + [{services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p", [ WSPidStr, Services]), - wse_server:send(list_to_pid(WSPidStr), - json_rpc_notification("services_available", - [{"services", {array, Services}}])), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification(<<"services_available">>, + [{<<"services">>, Services}])), %% No reply ok; -dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, - {struct, [{ services, { array, Services}}]} ) -> - ?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p", +dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable, + [{services, Services}] ) -> + ?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p", [ WSPidStr, Services]), - wse_server:send(list_to_pid(WSPidStr), - json_rpc_notification("services_unavailable", - [{"services", {array, Services}}])), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification(<<"services_unavailable">>, + [{<<"services">>, Services}])), ok; -dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, - {struct, [{ service_name, SvcName }, - { parameters, Parameters } - ]} ) -> - - ?info("service_edge:dispatch_to_local_service(message, websock): ~p", - [Parameters]), - wse_server:send(list_to_pid(WSPidStr), - json_rpc_notification("message", - [{ "service_name", SvcName}, - {parameters, Parameters}])), +%% dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, +%% [{service_name, SvcName}, {parameters, [Args]}]) -> +%% ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]), +%% wse_server:send(list_to_pid(WSPidStr), +%% json_rpc_notification("message", +%% [{ "service_name", SvcName}, {parameters, Args}])), +%% %% No response expected. +%% ?debug("service_edge:dispatch_to_local_service(message, websock): Done"), +%% ok; + +dispatch_to_local_service([ $w, $s, $: | WSPidStr], message, + [{ service_name, SvcName}, { parameters,[Args]}]) -> + ?info("service_edge:dispatch_to_local_service(message/alt, websock): ~p", [Args]), + wse_server:send(list_to_pid(WSPidStr), + json_rpc_notification(<<"message">>, + [{<<"service_name">>, SvcName}, + {<<"parameters">>, Args}])), %% No response expected. ?debug("service_edge:dispatch_to_local_service(message, websock): Done"), ok; @@ -633,7 +615,7 @@ dispatch_to_local_service([ $w, $s, $: | _WSPidStr], message, Other) -> %% Dispatch to regular JSON-RPC over HTTP. dispatch_to_local_service(URL, Command, Args) -> - CmdStr = atom_to_list(Command), + CmdStr = atom_to_binary(Command, latin1), ?debug("dispatch_to_local_service(): Command: ~p",[ Command]), ?debug("dispatch_to_local_service(): Args: ~p",[ Args]), ?debug("dispatch_to_local_service(): URL: ~p",[ URL]), @@ -655,20 +637,21 @@ forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) -> %% a service_name that is identical to the service name %% it registered with. %% - LocalSvcName = string:substr(SvcName, - length(rvi_common:local_service_prefix())), - + Pfx = rvi_common:local_service_prefix(), + Sz = byte_size(Pfx)-1, + <<_:Sz/binary, LocalSvcName/binary>> = SvcName, + ?debug("Service name: ~p (Pfx = ~p)", [LocalSvcName, Pfx]), %% Deliver the message to the local service, which can %% be either a wse websocket, or a regular HTTP JSON-RPC call spawn(fun() -> rvi_common:get_request_result( - dispatch_to_local_service(URL, - message, - {struct, [ { service_name, LocalSvcName }, - { parameters, Parameters }]})) + dispatch_to_local_service(URL, + message, + [{<<"service_name">>, LocalSvcName }, + {<<"parameters">>, Parameters }])) end), [ ok, -1 ]. - + announce_service_availability(Available, SvcName) -> Cmd = case Available of @@ -681,14 +664,14 @@ announce_service_availability(Available, SvcName) -> %% available to the URL tha originated the newly registered service. %% %% We also want to make sure that we don't send the notification - %% to a local service more than once. + %% to a local service more than once. %% We will build up a list of blocked URLs not to resend to %% as we go along BlockURLs = case ets:lookup(?SERVICE_TABLE, SvcName) of [ #service_entry { url = URL } ] -> [URL]; [] -> [] end, - ets:foldl(fun(Term, _Acc) -> + ets:foldl(fun(Term, _Acc) -> ?debug("~p: ~p~n", [ ?SERVICE_TABLE, Term]), ok end, ok, ?SERVICE_TABLE), @@ -701,19 +684,16 @@ announce_service_availability(Available, SvcName) -> %% If the URL is not on the blackout %% list, send a notification ?debug(" URL: ~p - Acc : ~p ", [ URL, Acc]), - case lists:member(URL, Acc) of + case lists:member(URL, Acc) of false -> ?debug("DISPATCH: ~p: ~p", [ URL, Cmd]), - dispatch_to_local_service(URL, Cmd, - {struct, [ { services, - { array, [SvcName]} - } - ]}), + dispatch_to_local_service(URL, Cmd, [{<<"services">>, + [SvcName]}]), %% Add the current URL to the blackout list - [URL | Acc]; + [URL | Acc]; %% URL is on blackout list - true -> + true -> Acc end end, BlockURLs, ?SERVICE_TABLE). |