summaryrefslogtreecommitdiff
path: root/components/service_edge
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-10-06 12:56:11 +0200
committerUlf Wiger <ulf@feuerlabs.com>2015-11-20 13:43:07 -0800
commit34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (patch)
treeedfb4bb844c3b90565e7a0bb00f678703d084188 /components/service_edge
parente6299ff287e767dae71fb47009f9bf4620cc3d78 (diff)
downloadrvi_core-34aa86b5a2e97650fe6299ccf794d5eb5d052d91.tar.gz
w.i.p. transition to jsx json codec
Diffstat (limited to 'components/service_edge')
-rw-r--r--components/service_edge/src/service_edge_rpc.erl372
1 files changed, 176 insertions, 196 deletions
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index e48c2b7..2cf9c3a 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-% Mozilla Public License, version 2.0. The full text of the
+% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -17,9 +17,9 @@
-export([start_link/0]).
--export([init/1,
- handle_call/3,
- handle_cast/2,
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
@@ -27,7 +27,7 @@
-export([handle_remote_message/6,
handle_local_timeout/3]).
--export([start_json_server/0,
+-export([start_json_server/0,
start_websocket/0]).
%% Invoked by service discovery
@@ -42,10 +42,10 @@
-include_lib("rvi_common/include/rvi_common.hrl").
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
--record(st, {
+-record(st, {
%% Component specification
cs = #component_spec{}
}).
@@ -67,14 +67,14 @@ start_link() ->
init([]) ->
CompSpec = rvi_common:get_component_specification(),
- URL = rvi_common:get_module_json_rpc_url(service_edge,
- ?MODULE,
+ URL = rvi_common:get_module_json_rpc_url(service_edge,
+ ?MODULE,
CompSpec),
?notice("---- Service Edge URL: ~p", [ URL ]),
?notice("---- Node Service Prefix: ~s", [ rvi_common:local_service_prefix()]),
- ets:new(?SERVICE_TABLE, [ set, public, named_table,
+ ets:new(?SERVICE_TABLE, [ set, public, named_table,
{ keypos, #service_entry.service }]),
service_discovery_rpc:subscribe(CompSpec, ?MODULE),
@@ -85,41 +85,41 @@ init([]) ->
start_json_server() ->
?debug("service_edge_rpc:start_json_server()"),
- case rvi_common:start_json_rpc_server(service_edge,
- ?MODULE,
+ case rvi_common:start_json_rpc_server(service_edge,
+ ?MODULE,
service_edge_sup) of
ok ->
ok;
- Err ->
- ?warning("service_edge_rpc:start_json_server(): Failed to start: ~p",
+ Err ->
+ ?warning("service_edge_rpc:start_json_server(): Failed to start: ~p",
[Err]),
Err
end.
-
+
start_websocket() ->
%%
%% Fire up the websocket subsystem, if configured
%%
- case rvi_common:get_module_config(service_edge,
+ case rvi_common:get_module_config(service_edge,
service_edge_rpc,
- websocket,
+ websocket,
not_found,
rvi_common:get_component_specification()) of
- {ok, not_found} ->
+ {ok, not_found} ->
?notice("service_edge:init(): No websocket config specified. Will use JSON-RPC/HTTP only."),
ok;
{ ok, WSOpts } ->
case proplists:get_value(port, WSOpts, undefined ) of
- undefined ->
+ undefined ->
ok;
-
+
Port ->
%% FIXME: MONITOR AND RESTART
- wse_server:start(Port,
- ?MODULE, handle_websocket, undefined,
+ wse_server:start(Port,
+ ?MODULE, handle_websocket, undefined,
[{type, text} | proplists:delete(port, WSOpts)]),
ok
end
@@ -131,25 +131,25 @@ start_websocket() ->
%% Invoked by service_discovery to announce service availability
%% Must be handled either as a JSON-RPC call or a gen_server call.
service_available(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(service_edge, ?MODULE,
- service_available,
- [{ service, SvcName },
+ rvi_common:notification(service_edge, ?MODULE,
+ service_available,
+ [{ service, SvcName },
{ data_link_module, DataLinkModule }], CompSpec).
service_unavailable(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(service_edge, ?MODULE,
- service_unavailable,
- [{ service, SvcName },
+ rvi_common:notification(service_edge, ?MODULE,
+ service_unavailable,
+ [{ service, SvcName },
{ data_link_module, DataLinkModule }], CompSpec).
handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) ->
{IP, Port} = Conn,
- rvi_common:notification(service_edge, ?MODULE,
- handle_remote_message,
+ rvi_common:notification(service_edge, ?MODULE,
+ handle_remote_message,
[{ ip, IP },
{ port, Port },
- { service, SvcName },
+ { service, SvcName },
{ timeout, Timeout },
{ parameters, Params },
{ signature, Signature }], CompSpec).
@@ -159,9 +159,9 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) ->
%% A message originated from a locally connected service
%% has timed out
handle_local_timeout(CompSpec, SvcName, TransID) ->
- rvi_common:notification(service_edge, ?SERVER, handle_local_timeout,
- [ { service, SvcName},
- { transaction_id, TransID} ],
+ rvi_common:notification(service_edge, ?SERVER, handle_local_timeout,
+ [ { service, SvcName},
+ { transaction_id, TransID} ],
CompSpec).
@@ -173,10 +173,10 @@ handle_websocket(WSock, Mesg, Arg) ->
?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]),
- case handle_ws_json_rpc(WSock, Method, {array,Params}, Arg) of
+ case handle_ws_json_rpc(WSock, Method, Params, Arg) of
ok -> ok;
- {ok, Reply} ->
- EncReply = binary_to_list(iolist_to_binary(exo_json:encode({struct, [ { id, ID} |Reply]}))),
+ {ok, Reply} ->
+ EncReply = rvi_common:term_to_json([{id, ID} |Reply]),
?debug("service_edge_rpc:handle_websocket(~p/~p) reply: ~s", [ WSock, ID, EncReply]),
wse_server:send(WSock, list_to_binary(EncReply))
end,
@@ -194,26 +194,31 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]),
- [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}),
-
- ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]),
- { ok, [ { status, rvi_common:json_rpc_status(Res) },
- { transaction_id, TID},
- { method, "message"} ] };
+ case gen_server:call(
+ ?SERVER,
+ {rvi, handle_local_message,
+ [ SvcName, Timeout, Parameters]}) of
+ [not_found] ->
+ {ok, [{status, rvi_common:json(not_found)}]};
+ [Res, TID] ->
+ ?debug("service_edge_rpc:wse_message(~p) Res: ~p", [ WSock, Res ]),
+ { ok, [ { status, rvi_common:json_rpc_status(Res) },
+ { transaction_id, TID},
+ { method, <<"message">>}] }
+ end;
handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]),
- [ok, FullSvcName ] = gen_server:call(?SERVER,
- { rvi,
- register_local_service,
- [ SvcName,
+ [ok, FullSvcName ] = gen_server:call(?SERVER,
+ { rvi,
+ register_local_service,
+ [ SvcName,
"ws:" ++ pid_to_list(WSock)]}),
-
- { ok, [ { status, rvi_common:json_rpc_status(ok)},
+
+ { ok, [ { status, rvi_common:json_rpc_status(ok)},
{ service, FullSvcName },
- { method, "register_service"}]};
+ { method, <<"register_service">>}]};
handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
@@ -226,53 +231,52 @@ handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) ->
[ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
{ ok, [ { status, rvi_common:json_rpc_status(ok)},
{ services, Services},
- { method, "get_available_services"}] }.
+ { method, <<"get_available_services">>}] }.
%% Invoked by locally connected services.
%% Will always be routed as JSON-RPC since that, and websocket,
%% are the only access paths in.
%%
-handle_rpc("register_service", Args) ->
- {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
- {ok, URL} = rvi_common:get_json_element(["network_address"], Args),
- [ok, FullSvcName ] = gen_server:call(?SERVER,
- { rvi, register_local_service,
+handle_rpc(<<"register_service">>, Args) ->
+ {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args),
+ {ok, URL} = rvi_common:get_json_element([<<"network_address">>], Args),
+ [ok, FullSvcName ] = gen_server:call(?SERVER,
+ { rvi, register_local_service,
[ SvcName, URL]}),
- {ok, [ {status, rvi_common:json_rpc_status(ok) },
+ {ok, [ {status, rvi_common:json_rpc_status(ok) },
{ service, FullSvcName },
- { method, "register_service"}
+ { method, <<"register_service">>}
]};
-handle_rpc("unregister_service", Args) ->
+handle_rpc(<<"unregister_service">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName]}),
{ok, [ { status, rvi_common:json_rpc_status(ok) },
- { method, "unregister_service"}
+ { method, <<"unregister_service">>}
]};
-handle_rpc("get_available_services", _Args) ->
+handle_rpc(<<"get_available_services">>, _Args) ->
[ Status, Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
?debug("get_available_services(): ~p ~p", [ Status, Services ]),
{ok, [ { status, rvi_common:json_rpc_status(ok)},
- { services, {array, Services}},
- { method, "get_available_services"}
+ { services, Services},
+ { method, <<"get_available_services">>}
]};
-
-handle_rpc("message", Args) ->
+handle_rpc(<<"message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
- [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
+ [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
[ SvcName, Timeout, Parameters]}),
{ok, [ { status, rvi_common:json_rpc_status(Res) },
{ transaction_id, TID },
- { method, "message"}
+ { method, <<"message">>}
]};
handle_rpc(Other, _Args) ->
@@ -280,13 +284,13 @@ handle_rpc(Other, _Args) ->
{ok,[ { status, rvi_common:json_rpc_status(invalid_command)} ]}.
-handle_notification("service_available", Args) ->
- {ok, SvcName} = rvi_common:get_json_element(["service"], Args),
- {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
+handle_notification(<<"service_available">>, Args) ->
+ {ok, SvcName} = rvi_common:get_json_element([<<"service">>], Args),
+ {ok, DataLinkModule} = rvi_common:get_json_element([<<"data_link_module">>], Args),
?debug("service_edge:service_available(): service: ~p", [ SvcName]),
?debug("service_edge:service_available(): data_link: ~p", [ DataLinkModule]),
- gen_server:cast(?SERVER, { rvi, service_available,
+ gen_server:cast(?SERVER, { rvi, service_available,
[ SvcName, DataLinkModule ]}),
ok;
@@ -296,24 +300,24 @@ handle_notification("service_unavailable", Args) ->
?debug("service_edge:service_unavailable(): service: ~p", [ SvcName]),
?debug("service_edge:service_unavailable(): data_link: ~p", [ DataLinkModule]),
- gen_server:cast(?SERVER, { rvi, service_unavailable,
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
[ SvcName, DataLinkModule ]}),
ok;
-handle_notification("handle_remote_message", Args) ->
+handle_notification(<<"handle_remote_message">>, Args) ->
{ ok, IP } = rvi_common:get_json_element(["ip"], Args),
{ ok, Port } = rvi_common:get_json_element(["port"], Args),
{ ok, SvcName } = rvi_common:get_json_element(["service"], Args),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Args),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Args),
{ ok, Signature } = rvi_common:get_json_element(["signature"], Args),
- gen_server:cast(?SERVER, { rvi, handle_remote_message,
- [
+ gen_server:cast(?SERVER, { rvi, handle_remote_message,
+ [
IP,
Port,
- SvcName,
- Timeout,
+ SvcName,
+ Timeout,
Parameters,
Signature
]}),
@@ -325,10 +329,10 @@ handle_notification("handle_remote_message", Args) ->
%% JSON-RPC entry point
%% Called by local exo http server
-handle_notification("handle_local_timeout", Args) ->
+handle_notification(<<"handle_local_timeout">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, TransactionID} = rvi_common:get_json_element(["transaction_id"], Args),
- gen_server:cast(?SERVER, { rvi, handle_local_timeout,
+ gen_server:cast(?SERVER, { rvi, handle_local_timeout,
[ SvcName, TransactionID]}),
ok;
@@ -361,7 +365,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local),
-
+
%% Return ok.
{ reply, [ ok, FullSvcName ], St };
@@ -392,11 +396,23 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% [{struct,[{"a","b"}]}]
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
-handle_call({ rvi, handle_local_message,
+handle_call({ rvi, handle_local_message,
[SvcName, TimeoutArg, Parameters] }, _From, St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
+ %%
+ %% Authorize local message and retrieve a certificate / signature
+ %% that will be accepted by the receiving node that will deliver
+ %% the messaage to its locally connected service_name service.
+ %%
+ [ok, Signature ] =
+ authorize_rpc:authorize_local_message(
+ St#st.cs, SvcName, [{service_name, SvcName},
+ {timeout, TimeoutArg},
+ %% {parameters, Parameters},
+ {parameters, Parameters}
+ ]),
%%
%% Slick but ugly.
@@ -406,10 +422,10 @@ handle_call({ rvi, handle_local_message,
{ Mega, Sec, _Micro } = now(),
Now = Mega * 1000000 + Sec,
- Timeout =
+ Timeout =
case TimeoutArg - Now < -86400 of
true -> %% Relative timeout arg. Convert to unix time msec
- ?debug("service_edge_rpc:local_msg(): Timeout ~p is relative.",
+ ?debug("service_edge_rpc:local_msg(): Timeout ~p is relative.",
[TimeoutArg]),
(Now * 1000) + TimeoutArg;
@@ -423,7 +439,7 @@ handle_call({ rvi, handle_local_message,
%% that will be accepted by the receiving node that will deliver
%% the messaage to its locally connected service_name service.
%%
- [ok, Signature ] =
+ [ok, Signature ] =
authorize_rpc:authorize_local_message(
St#st.cs, SvcName, [{service_name, SvcName},
{timeout, Timeout},
@@ -432,24 +448,26 @@ handle_call({ rvi, handle_local_message,
]),
%%
- %% Check if this is a local service by trying to resolve its service name.
+ %% Check if this is a local service by trying to resolve its service name.
%% If successful, just forward it to its service_name.
- %%
- case ets:lookup(?SERVICE_TABLE, SvcName) of
+ %%
+ LookupRes = ets:lookup(?SERVICE_TABLE, SvcName),
+ ?debug("Service LookupRes = ~p", [LookupRes]),
+ case LookupRes of
[ #service_entry { url = URL } ] -> %% SvcName is local. Forward message
?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."),
- Res = forward_message_to_local_service(URL,
- SvcName,
- {struct, Parameters},
+ Res = forward_message_to_local_service(URL,
+ SvcName,
+ Parameters,
St#st.cs),
{ reply, Res , St};
_ -> %% SvcName is remote
%% Ask Schedule the request to resolve the network address
?debug("service_edge_rpc:local_msg(): Service is remote. Scheduling."),
- [ _, TID ] = schedule_rpc:schedule_message(St#st.cs,
- SvcName,
- Timeout,
+ [ _, TID ] = schedule_rpc:schedule_message(St#st.cs,
+ SvcName,
+ Timeout,
Parameters,
Signature),
{ reply, [ok, TID ], St}
@@ -461,21 +479,21 @@ handle_call(Other, _From, St) ->
{ reply, [ invalid_command ], St}.
-
+
handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) ->
?debug("service_edge_rpc: Service available: ~p:", [ SvcName]),
announce_service_availability(available, SvcName),
{ noreply, St };
-
+
handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) ->
?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]),
announce_service_availability(unavailable, SvcName),
{ noreply, St };
-
-handle_cast({rvi, handle_remote_message,
+
+handle_cast({rvi, handle_remote_message,
[
IP,
Port,
@@ -495,19 +513,18 @@ handle_cast({rvi, handle_remote_message,
%% Check if this is a local message.
case ets:lookup(?SERVICE_TABLE, SvcName) of
[ #service_entry { url = URL }] -> %% This is a local message
- {struct, Parameters1} = Parameters,
+ Parameters1 = Parameters,
case authorize_rpc:authorize_remote_message(
- St#st.cs,
- SvcName,
+ St#st.cs,
+ SvcName,
[{remote_ip, IP},
{remote_port, Port},
{service_name, SvcName},
{timeout, Timeout},
- %% {parameters, [ {struct, Parameters}]},
{parameters, Parameters1},
{signature, Signature}]) of
- [ ok ] ->
- forward_message_to_local_service(URL, SvcName,
+ [ ok ] ->
+ forward_message_to_local_service(URL, SvcName,
Parameters, St#st.cs),
{ noreply, St};
@@ -520,15 +537,15 @@ handle_cast({rvi, handle_remote_message,
?notice("service_entry:remote_msg(): Service Disappeared ~p",
[SvcName]),
{ noreply, St}
-
+
end;
handle_cast({ rvi, handle_local_timeout, [SvcName, TransactionID] }, St) ->
%% FIXME: Should be forwarded to service.
- ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
+ ?info("service_edge_rpc:handle_local_timeout(): service: ~p trans_id: ~p ",
[SvcName, TransactionID]),
-
+
{ noreply, St};
handle_cast(Other, St) ->
@@ -544,85 +561,50 @@ code_change(_OldVsn, St, _Extra) ->
{ok, St}.
-
-
-%%
-%% Depending on the format of NetworkAddress
-%% Dispatch to websocket or JSON-RPC server
-%% FIXME: Should be a pluggable setup where
-%% different dispatchers are triggered depending
-%% on prefix in NetworkAddress
-%%
-
-flatten_ws_args([{ struct, List} | T], Acc ) when is_list(List) ->
- flatten_ws_args( List ++ T, Acc);
-
-
-flatten_ws_args([{ Key, Val}| T], Acc ) ->
- NKey = case is_atom(Key) of
- true -> atom_to_list(Key);
- false -> Key
- end,
-
- NVal = flatten_ws_args(Val),
-
- flatten_ws_args(T, [ NKey, NVal] ++ Acc);
-
-flatten_ws_args([], Acc) ->
- Acc;
-
-
-flatten_ws_args(Other, []) ->
- Other;
-
-flatten_ws_args(Other, Acc) ->
- [ Other | Acc ].
-
-
-flatten_ws_args(Args) ->
- flatten_ws_args(Args, []).
-
-
json_rpc_notification(Method, Parameters) ->
- iolist_to_binary(
- exo_json:encode(
- {struct,
- [ { "json-rpc", "2.0"},
- { "method", Method },
- { "params", {struct, Parameters}}
- ]})).
-
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
- {struct, [{ services, { array, Services}}]} ) ->
- ?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p",
+ jsx:encode(
+ [{<<"json-rpc">>, <<"2.0">>},
+ {<<"method">>, Method},
+ {<<"params">>, Parameters}
+ ]).
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_available,
+ [{services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_available, websock, ~p): ~p",
[ WSPidStr, Services]),
- wse_server:send(list_to_pid(WSPidStr),
- json_rpc_notification("services_available",
- [{"services", {array, Services}}])),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification(<<"services_available">>,
+ [{<<"services">>, Services}])),
%% No reply
ok;
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
- {struct, [{ services, { array, Services}}]} ) ->
- ?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p",
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], services_unavailable,
+ [{services, Services}] ) ->
+ ?info("service_edge:dispatch_to_local_service(service_unavailable, websock, ~p): ~p",
[ WSPidStr, Services]),
- wse_server:send(list_to_pid(WSPidStr),
- json_rpc_notification("services_unavailable",
- [{"services", {array, Services}}])),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification(<<"services_unavailable">>,
+ [{<<"services">>, Services}])),
ok;
-dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
- {struct, [{ service_name, SvcName },
- { parameters, Parameters }
- ]} ) ->
-
- ?info("service_edge:dispatch_to_local_service(message, websock): ~p",
- [Parameters]),
- wse_server:send(list_to_pid(WSPidStr),
- json_rpc_notification("message",
- [{ "service_name", SvcName},
- {parameters, Parameters}])),
+%% dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
+%% [{service_name, SvcName}, {parameters, [Args]}]) ->
+%% ?info("service_edge:dispatch_to_local_service(message, websock): ~p", [Args]),
+%% wse_server:send(list_to_pid(WSPidStr),
+%% json_rpc_notification("message",
+%% [{ "service_name", SvcName}, {parameters, Args}])),
+%% %% No response expected.
+%% ?debug("service_edge:dispatch_to_local_service(message, websock): Done"),
+%% ok;
+
+dispatch_to_local_service([ $w, $s, $: | WSPidStr], message,
+ [{ service_name, SvcName}, { parameters,[Args]}]) ->
+ ?info("service_edge:dispatch_to_local_service(message/alt, websock): ~p", [Args]),
+ wse_server:send(list_to_pid(WSPidStr),
+ json_rpc_notification(<<"message">>,
+ [{<<"service_name">>, SvcName},
+ {<<"parameters">>, Args}])),
%% No response expected.
?debug("service_edge:dispatch_to_local_service(message, websock): Done"),
ok;
@@ -633,7 +615,7 @@ dispatch_to_local_service([ $w, $s, $: | _WSPidStr], message, Other) ->
%% Dispatch to regular JSON-RPC over HTTP.
dispatch_to_local_service(URL, Command, Args) ->
- CmdStr = atom_to_list(Command),
+ CmdStr = atom_to_binary(Command, latin1),
?debug("dispatch_to_local_service(): Command: ~p",[ Command]),
?debug("dispatch_to_local_service(): Args: ~p",[ Args]),
?debug("dispatch_to_local_service(): URL: ~p",[ URL]),
@@ -655,20 +637,21 @@ forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) ->
%% a service_name that is identical to the service name
%% it registered with.
%%
- LocalSvcName = string:substr(SvcName,
- length(rvi_common:local_service_prefix())),
-
+ Pfx = rvi_common:local_service_prefix(),
+ Sz = byte_size(Pfx)-1,
+ <<_:Sz/binary, LocalSvcName/binary>> = SvcName,
+ ?debug("Service name: ~p (Pfx = ~p)", [LocalSvcName, Pfx]),
%% Deliver the message to the local service, which can
%% be either a wse websocket, or a regular HTTP JSON-RPC call
spawn(fun() ->
rvi_common:get_request_result(
- dispatch_to_local_service(URL,
- message,
- {struct, [ { service_name, LocalSvcName },
- { parameters, Parameters }]}))
+ dispatch_to_local_service(URL,
+ message,
+ [{<<"service_name">>, LocalSvcName },
+ {<<"parameters">>, Parameters }]))
end),
[ ok, -1 ].
-
+
announce_service_availability(Available, SvcName) ->
Cmd = case Available of
@@ -681,14 +664,14 @@ announce_service_availability(Available, SvcName) ->
%% available to the URL tha originated the newly registered service.
%%
%% We also want to make sure that we don't send the notification
- %% to a local service more than once.
+ %% to a local service more than once.
%% We will build up a list of blocked URLs not to resend to
%% as we go along
BlockURLs = case ets:lookup(?SERVICE_TABLE, SvcName) of
[ #service_entry { url = URL } ] -> [URL];
[] -> []
end,
- ets:foldl(fun(Term, _Acc) ->
+ ets:foldl(fun(Term, _Acc) ->
?debug("~p: ~p~n", [ ?SERVICE_TABLE, Term]),
ok
end, ok, ?SERVICE_TABLE),
@@ -701,19 +684,16 @@ announce_service_availability(Available, SvcName) ->
%% If the URL is not on the blackout
%% list, send a notification
?debug(" URL: ~p - Acc : ~p ", [ URL, Acc]),
- case lists:member(URL, Acc) of
+ case lists:member(URL, Acc) of
false ->
?debug("DISPATCH: ~p: ~p", [ URL, Cmd]),
- dispatch_to_local_service(URL, Cmd,
- {struct, [ { services,
- { array, [SvcName]}
- }
- ]}),
+ dispatch_to_local_service(URL, Cmd, [{<<"services">>,
+ [SvcName]}]),
%% Add the current URL to the blackout list
- [URL | Acc];
+ [URL | Acc];
%% URL is on blackout list
- true ->
+ true ->
Acc
end
end, BlockURLs, ?SERVICE_TABLE).