summaryrefslogtreecommitdiff
path: root/components
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-11-05 17:29:22 +0100
committerUlf Wiger <ulf@feuerlabs.com>2015-11-20 13:47:02 -0800
commitca1f0abd9f0f1478da8380bff28cb25aada34c1d (patch)
tree814c4a3c17b247cc300f62673771dad5a5c6ed77 /components
parent1b44c2448344a10ae63904a796b6211c40a3f212 (diff)
downloadrvi_core-ca1f0abd9f0f1478da8380bff28cb25aada34c1d.tar.gz
All tests (incl remote method inv) pass
- No signatures on messages (dlink_tls) - dlink_bt wasn't actually tested (test case passed erroneously) - added proto_msgpack component - fixed sneaky bug in 'setup'
Diffstat (limited to 'components')
-rw-r--r--components/authorize/src/authorize_keys.erl26
-rw-r--r--components/authorize/src/authorize_rpc.erl65
-rw-r--r--components/dlink_bt/src/bt_connection_manager.erl56
-rw-r--r--components/dlink_bt/src/bt_listener.erl2
-rw-r--r--components/dlink_bt/src/dlink_bt.app.src4
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl21
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl9
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl3
-rw-r--r--components/proto_bert/src/proto_bert_rpc.erl60
-rw-r--r--components/proto_json/src/proto_json_rpc.erl45
-rw-r--r--components/proto_msgpack/src/proto_msgpack.app.src26
-rw-r--r--components/proto_msgpack/src/proto_msgpack_app.erl38
-rw-r--r--components/proto_msgpack/src/proto_msgpack_rpc.erl180
-rw-r--r--components/proto_msgpack/src/proto_msgpack_sup.erl38
-rw-r--r--components/schedule/src/rvi_routing.erl9
-rw-r--r--components/schedule/src/schedule_rpc.erl118
-rw-r--r--components/service_edge/src/service_edge_rpc.erl35
17 files changed, 499 insertions, 236 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index 834935e..1d205b6 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -11,7 +11,8 @@
save_cert/4]).
-export([get_certificates/0,
get_certificates/1]).
--export([validate_message/2]).
+-export([validate_message/2,
+ validate_service_call/2]).
-export([filter_by_service/2,
find_cert_by_service/1]).
-export([public_key_to_json/1,
@@ -106,6 +107,9 @@ authorize_jwt() ->
validate_message(JWT, Conn) ->
gen_server:call(?MODULE, {validate_message, JWT, Conn}).
+validate_service_call(Service, Conn) ->
+ gen_server:call(?MODULE, {validate_service_call, Service, Conn}).
+
get_certificates() ->
get_certificates(local).
@@ -176,6 +180,8 @@ handle_call_({save_keys, Keys, Conn}, _, S) ->
{reply, ok, S};
handle_call_({validate_message, JWT, Conn}, _, S) ->
{reply, validate_message_(JWT, Conn), S};
+handle_call_({validate_service_call, Svc, Conn}, _, S) ->
+ {reply, validate_service_call_(Svc, Conn), S};
handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) ->
case process_cert_struct(Cert, JWT) of
invalid ->
@@ -223,6 +229,14 @@ certs_by_conn(Conn) ->
?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]),
[C || {C,V} <- Certs, check_validity(V, UTC)].
+cert_recs_by_conn(Conn) ->
+ ?debug("cert_recs_by_conn(~p)~n", [Conn]),
+ UTC = rvi_common:utc_timestamp(),
+ Certs = ets:select(?CERTS, [{ {{Conn,'_'}, '$1'},
+ [], ['$1'] }]),
+ ?debug("rough selection: ~p~n", [[abbrev_bin(C#cert.id) || C <- Certs]]),
+ [C || C <- Certs, check_validity(C#cert.validity, UTC)].
+
filter_by_service_(Services, Conn) ->
?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]),
Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1',
@@ -524,6 +538,16 @@ validate_message_1([{_,K}|T], JWT) ->
validate_message_1([], _) ->
error(invalid).
+validate_service_call_(Svc, Conn) ->
+ case lists:filter(fun(C) -> can_invoke(Svc, C) end, cert_recs_by_conn(Conn)) of
+ [] ->
+ invalid;
+ [#cert{id = ID}|_] ->
+ {ok, ID}
+ end.
+
+can_invoke(Svc, #cert{invoke = In}) ->
+ lists:any(fun(I) -> match_svc(I, Svc) end, In).
pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) ->
P = integer_to_binary(Pub),
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index 54a9657..c91b216 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -112,7 +112,7 @@ authorize_local_message(CompSpec, Service, Params) ->
rvi_common:request(authorize, ?MODULE, authorize_local_message,
[{service, Service},
{parameters, Params}],
- [status, signature], CompSpec).
+ [status], CompSpec).
authorize_remote_message(CompSpec, Service, Params) ->
?debug("authorize_rpc:authorize_remote_msg(): service: ~p ~n", [Service]),
@@ -301,17 +301,16 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _F
handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) ->
do_store_certs(Certs, Conn, LogId),
{reply, [ok], State};
-handle_call({rvi, authorize_local_message, [Service, Params | LogId] } = R, _From,
- #st{private_key = Key} = State) ->
+handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) ->
?debug("authorize_rpc:handle_call(~p)~n", [R]),
case authorize_keys:find_cert_by_service(Service) of
- {ok, {ID, Cert}} ->
- Msg = Params ++ [{<<"certificate">>, Cert}],
- ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n",
- [authorize_keys:abbrev_payload(Msg)]),
- Sig = authorize_sig:encode_jwt(Msg, Key),
+ {ok, {ID, _Cert}} ->
+ %% Msg = Params ++ [{<<"certificate">>, Cert}],
+ %% ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n",
+ %% [authorize_keys:abbrev_payload(Msg)]),
+ %% Sig = authorize_sig:encode_jwt(Msg, Key),
log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]),
- {reply, [ok, Sig], State};
+ {reply, [ok], State};
_ ->
log(LogId, "NO CERTS for ~s", [Service]),
{reply, [ not_found ], State}
@@ -324,29 +323,19 @@ handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]},
Timeout = proplists:get_value(timeout, Params),
SvcName = proplists:get_value(service_name, Params),
Parameters = proplists:get_value(parameters, Params),
- Signature = proplists:get_value(signature, Params),
?debug("authorize_rpc:authorize_remote_message(): remote_ip: ~p~n", [IP]),
?debug("authorize_rpc:authorize_remote_message(): remote_port: ~p~n", [Port]),
?debug("authorize_rpc:authorize_remote_message(): timeout: ~p~n", [Timeout]),
?debug("authorize_rpc:authorize_remote_message(): service_name: ~p~n", [SvcName]),
?debug("authorize_rpc:authorize_remote_message(): parameters: ~p~n", [Parameters]),
- ?debug("authorize_rpc:authorize_remote_message(): signature: ~40s~n", [Signature]),
- case authorize_keys:validate_message(
- iolist_to_binary(Signature), {IP, Port}) of
+ case authorize_keys:validate_service_call(SvcName, {IP, Port}) of
invalid ->
- log(LogId, "signature INVALID", []),
+ log(LogId, "remote msg REJECTED", []),
{reply, [ not_found ], State};
- Msg ->
- case check_msg([{"timeout", Timeout},
- {"service_name", SvcName},
- {"parameters", Parameters}], Msg) of
- ok ->
- log(LogId, "params verified", []),
- {reply, [ok], State};
- {error, {mismatch, Bad}} ->
- log(LogId, "params MISMATCH: ~p", [Bad]),
- {reply, [not_found], State}
- end
+ {ok, CertID} ->
+ ?debug("validated Cert ID=~p", [CertID]),
+ log(LogId, "remote msg allowed: Cert=~s", [CertID]),
+ {reply, [ok], State}
end;
handle_call({rvi, filter_by_service, [Services, Conn | _LogId]}, _From, State) ->
@@ -417,8 +406,8 @@ log([ID], Fmt, Args) ->
log(_, _, _) ->
ok.
-check_msg(Checks, Params) ->
- check_msg(Checks, Params, []).
+%% check_msg(Checks, Params) ->
+%% check_msg(Checks, Params, []).
%% {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg),
%% {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg),
@@ -438,14 +427,14 @@ check_msg(Checks, Params) ->
%% end
%% end;
-check_msg([], _, []) ->
- ok;
-check_msg([{Key, Expect}|T], Msg, Acc) ->
- case rvi_common:get_json_element([Key], Msg) of
- {ok, Expect} ->
- check_msg(T, Msg, Acc);
- _ ->
- check_msg(T, Msg, [Key|Acc])
- end;
-check_msg([], _, [_|_] = Acc) ->
- {error, {mismatch, lists:reverse(Acc)}}.
+%% check_msg([], _, []) ->
+%% ok;
+%% check_msg([{Key, Expect}|T], Msg, Acc) ->
+%% case rvi_common:get_json_element([Key], Msg) of
+%% {ok, Expect} ->
+%% check_msg(T, Msg, Acc);
+%% _ ->
+%% check_msg(T, Msg, [Key|Acc])
+%% end;
+%% check_msg([], _, [_|_] = Acc) ->
+%% {error, {mismatch, lists:reverse(Acc)}}.
diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl
index de665a2..1b1e049 100644
--- a/components/dlink_bt/src/bt_connection_manager.erl
+++ b/components/dlink_bt/src/bt_connection_manager.erl
@@ -2,10 +2,10 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
-%%
+%%
%%%-------------------------------------------------------------------
%%% @author magnus <magnus@t520.home>
%%% @copyright (C) 2014, magnus
@@ -32,7 +32,7 @@
-export([find_connection_by_pid/1]).
-export([find_connection_by_address/2]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(st, {
conn_by_pid = undefined,
@@ -104,11 +104,11 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call({add_connection, BTAddr, Channel, Pid}, _From,
- #st { conn_by_pid = ConPid,
+handle_call({add_connection, BTAddr, Channel, Pid}, _From,
+ #st { conn_by_pid = ConPid,
conn_by_addr = ConBTAddr} = St) ->
- ?debug("~p:handle_call(add): Adding Pid: ~p, BTAddress: ~p",
+ ?debug("~p:handle_call(add): Adding Pid: ~p, BTAddress: ~p",
[ ?MODULE, Pid, { BTAddr, Channel }]),
%% Store so that we can find connection both by pid and by address
NConPid = dict:store(Pid, { BTAddr, Channel }, ConPid),
@@ -119,19 +119,19 @@ handle_call({add_connection, BTAddr, Channel, Pid}, _From,
{reply, ok, NSt};
%% Delete connection by pid
-handle_call({delete_connection_by_pid, Pid}, _From,
- #st { conn_by_pid = ConPid,
+handle_call({delete_connection_by_pid, Pid}, _From,
+ #st { conn_by_pid = ConPid,
conn_by_addr = ConBTAddr} = St) when is_pid(Pid)->
%% Find address associated with Pid
case dict:find(Pid, ConPid) of
- error ->
- ?debug("~p:handle_call(del_by_pid): not found: ~p",
+ error ->
+ ?debug("~p:handle_call(del_by_pid): not found: ~p",
[ ?MODULE, Pid]),
{ reply, not_found, St};
-
+
{ok, BTAddr } ->
- ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, BTAddress: ~p",
+ ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, BTAddress: ~p",
[ ?MODULE, Pid, BTAddr]),
NConPid = dict:erase(Pid, ConPid),
@@ -145,19 +145,19 @@ handle_call({delete_connection_by_pid, Pid}, _From,
%% Delete connection by address
-handle_call({ delete_connection_by_address, BTAddr, Channel}, _From,
- #st { conn_by_pid = ConPid,
+handle_call({ delete_connection_by_address, BTAddr, Channel}, _From,
+ #st { conn_by_pid = ConPid,
conn_by_addr = ConBTAddr} = St) ->
%% Find Pid associated with BTAddress
case dict:find({BTAddr, Channel}, ConBTAddr) of
- error ->
- ?debug("~p:handle_call(del_by_addr): not found: ~p",
+ error ->
+ ?debug("~p:handle_call(del_by_addr): not found: ~p",
[ ?MODULE, {BTAddr, Channel}]),
{ reply, not_found, St};
-
+
{ok, Pid } ->
- ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, BTAddress: ~p",
+ ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, BTAddress: ~p",
[ ?MODULE, Pid, {BTAddr, Channel}]),
NConPid = dict:erase(Pid, ConPid),
NConBTAddr = dict:erase({ BTAddr, Channel }, ConBTAddr),
@@ -168,36 +168,36 @@ handle_call({ delete_connection_by_address, BTAddr, Channel}, _From,
%% Find connection by pid
-handle_call({ find_connection_by_pid, Pid}, _From,
+handle_call({ find_connection_by_pid, Pid}, _From,
#st { conn_by_pid = ConPid} = St) when is_pid(Pid)->
%% Find address associated with Pid
case dict:find(Pid, ConPid) of
- error ->
- ?debug("~p:handle_call(find_by_pid): not found: ~p",
+ error ->
+ ?debug("~p:handle_call(find_by_pid): not found: ~p",
[ ?MODULE, Pid]),
{ reply, not_found, St};
-
+
{ok, {BTAddr, Channel} } ->
- ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p",
+ ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p",
[ ?MODULE, Pid, {BTAddr, Channel}]),
{reply, {ok, BTAddr, Channel}, St}
end;
%% Find connection by address
-handle_call({find_connection_by_address, BTAddr, Channel}, _From,
+handle_call({find_connection_by_address, BTAddr, Channel}, _From,
#st { conn_by_addr = ConBTAddr} = St) ->
%% Find address associated with Pid
case dict:find({BTAddr, Channel}, ConBTAddr) of
- error ->
- ?debug("~p:handle_call(find_by_addr): not found: ~p",
+ error ->
+ ?debug("~p:handle_call(find_by_addr): not found: ~p",
[ ?MODULE, {BTAddr, Channel}]),
{ reply, not_found, St};
-
+
{ok, Pid } ->
- ?debug("~p:handle_call(find_by_addr): BTAddr: ~p ->: ~p",
+ ?debug("~p:handle_call(find_by_addr): BTAddr: ~p ->: ~p",
[ ?MODULE, {BTAddr, Channel}, Pid]),
{reply, {ok, Pid}, St}
end;
diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl
index e6129de..3711652 100644
--- a/components/dlink_bt/src/bt_listener.erl
+++ b/components/dlink_bt/src/bt_listener.erl
@@ -115,4 +115,4 @@ terminate(_Reason, _State) ->
listen(bt, Channel) ->
rfcomm:listen(Channel);
listen(tcp, Port) ->
- gen_tcp:listen(Port).
+ exo_socket:listen(Port).
diff --git a/components/dlink_bt/src/dlink_bt.app.src b/components/dlink_bt/src/dlink_bt.app.src
index 7b8e006..e63d1ac 100644
--- a/components/dlink_bt/src/dlink_bt.app.src
+++ b/components/dlink_bt/src/dlink_bt.app.src
@@ -3,7 +3,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -23,6 +23,6 @@
{mod, { dlink_bt_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
{env, [
- {rvi_core_await, [{n,l,dlink_dt}]}
+ {rvi_core_await, [{n,l,dlink_bt}]}
]}
]}.
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index 168d13e..ab5f8ac 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -38,6 +38,7 @@
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BT_CHANNEL, 1).
+-define(DEFAULT_TCP_PORT, 8807).
-define(DEFAULT_RECONNECT_INTERVAL, 1000).
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
@@ -111,13 +112,13 @@ start_connection_manager() ->
[],
CompSpec),
%% Retrieve the channel we should use
- Channel = proplists:get_value(channel, BertOpts, ?DEFAULT_BT_CHANNEL),
+ Mode = get_mode(BertOpts),
+ Channel = get_channel(Mode, BertOpts),
?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
- Mode = get_mode(BertOpts),
case Mode of
bt ->
bt:start(),
@@ -126,7 +127,7 @@ start_connection_manager() ->
ok
end,
bt_listener:start_link(Mode),
- bt_connection_manager:start_link(Mode),
+ bt_connection_manager:start_link(),
?info("dlink_bt:start_connection_manager(): Adding listener on bluetooth channel ~p", [Channel ]),
%% Add listener channel.
@@ -149,13 +150,13 @@ start_connection_manager() ->
ok.
-get_mode(BertOpts) ->
- case proplists:get_value(test_mode, BertOpts) of
- TM when TM==undefined; TM==bt ->
- bt;
- tcp ->
- tcp
- end.
+get_mode(Opts) ->
+ proplists:get_value(test_mode, Opts, bt).
+
+get_channel(tcp, Opts) ->
+ proplists:get_value(port, Opts, ?DEFAULT_TCP_PORT);
+get_channel(bt, Opts) ->
+ proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL).
setup_persistent_connections_([ ], _CompSpec) ->
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index abeb19c..c125ca3 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -39,6 +39,7 @@
-define(SERVER, ?MODULE).
-define(PACKET_MOD, dlink_data_msgpack).
+-define(MAX_MSG_SIZE, infinity).
-record(st, {
ip = {0,0,0,0},
@@ -50,7 +51,8 @@
mod = undefined,
func = undefined,
cs,
- role = server :: client | server
+ role = server :: client | server,
+ msg_size = ?MAX_MSG_SIZE :: infinity | pos_integer()
}).
%%%===================================================================
@@ -144,8 +146,7 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
%% Grab socket control
- {ok, PktMod} = rvi_common:get_module_config(dlink_tls, dlink_tls_rpc,
- packet_mod, ?PACKET_MOD, CompSpec),
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
@@ -158,6 +159,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
cs = CompSpec
}}.
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tls, dlink_tls_rpc, Key, Default, CS).
%%--------------------------------------------------------------------
%% @private
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index 964cc0c..e42a156 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -749,8 +749,7 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
?debug("dlink_tls:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]),
?debug("dlink_tls:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
Proto = list_to_existing_atom(ProtocolMod),
- Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
- base64:decode_to_string(Data)).
+ Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data).
process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
diff --git a/components/proto_bert/src/proto_bert_rpc.erl b/components/proto_bert/src/proto_bert_rpc.erl
index cad1708..f1658a3 100644
--- a/components/proto_bert/src/proto_bert_rpc.erl
+++ b/components/proto_bert/src/proto_bert_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -20,12 +20,12 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-export([start_json_server/0]).
-export([send_message/8,
receive_message/3]).
--record(st, {
+-record(st, {
%% Component specification
cs = #component_spec{}
}).
@@ -42,26 +42,26 @@ start_json_server() ->
-send_message(CompSpec,
- ServiceName,
- Timeout,
+send_message(CompSpec,
+ TID,
+ ServiceName,
+ Timeout,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters,
- Signature) ->
+ Parameters) ->
rvi_common:request(protocol, ?MODULE, send_message,
- [ { service, ServiceName },
+ [ { transaction_id, TID },
+ { service, ServiceName },
{ timeout, Timeout },
{ protocol_opts, ProtoOpts },
{ data_link_mod, DataLinkMod },
{ data_link_opts, DataLinkOpts },
- { parameters, Parameters },
- { signature, Signature }],
+ { parameters, Parameters } ],
[ status ], CompSpec).
receive_message(CompSpec, {IP,Port}, Data) ->
- rvi_common:notification(protocol, ?MODULE, receive_message,
+ rvi_common:notification(protocol, ?MODULE, receive_message,
[ {data, Data},
{remote_ip, IP},
{remote_port, Port} ],
@@ -71,23 +71,25 @@ receive_message(CompSpec, {IP,Port}, Data) ->
%% CAlled by local exo http server
handle_rpc("send_message", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
+ {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args),
{ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args),
{ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args),
{ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
- {ok, Signature} = rvi_common:get_json_element(["signature"], Args),
- [ ok ] = gen_server:call(?SERVER, { rvi, send_message,
- [ServiceName,
+ [ ok ] = gen_server:call(?SERVER, { rvi, send_message,
+ [TID,
+ ServiceName,
Timeout,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
Parameters,
- Signature]}),
+ LogId]}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} ]};
-
+
handle_rpc(Other, _Args) ->
@@ -107,23 +109,22 @@ handle_notification(Other, _Args) ->
ok.
-handle_call({rvi, send_message,
- [ServiceName,
+handle_call({rvi, send_message,
+ [_TID,
+ ServiceName,
Timeout,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters,
- Signature]}, _From, St) ->
+ Parameters | _LogId]}, _From, St) ->
?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]),
?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]),
?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]),
%% ?debug(" protocol:send(): parameters: ~p~n", [Parameters]),
- ?debug(" protocol:send(): signature: ~p~n", [Signature]),
-
- Data = term_to_binary({ ServiceName, Timeout, Parameters, Signature }),
+
+ Data = term_to_binary({ ServiceName, Timeout, Parameters }),
Res = DataLinkMod:send_data(St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data),
@@ -149,21 +150,18 @@ handle_cast_({rvi, receive_message, [Data,IP,Port]}, St) when is_list(Data)->
handle_cast({ rvi, receive_message, [list_to_binary(Data),IP,Port] }, St);
handle_cast_({rvi, receive_message, [Data, IP, Port]}, St) ->
- {ServiceName,
- Timeout,
- Parameters,
- Signature} = binary_to_term(Data),
+ {ServiceName,
+ Timeout,
+ Parameters} = binary_to_term(Data),
?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
%% ?debug(" protocol:rcv(): parameters: ~p~n", [Parameters]),
- ?debug(" protocol:rcv(): signature: ~p~n", [Signature]),
?debug(" protocol:rcv(): remote IP/port: ~p~n", [{IP, Port}]),
service_edge_rpc:handle_remote_message(St#st.cs,
{IP, Port},
ServiceName,
Timeout,
- Parameters,
- Signature),
+ Parameters),
{noreply, St};
handle_cast_(Other, St) ->
diff --git a/components/proto_json/src/proto_json_rpc.erl b/components/proto_json/src/proto_json_rpc.erl
index 9445965..9f7ccc0 100644
--- a/components/proto_json/src/proto_json_rpc.erl
+++ b/components/proto_json/src/proto_json_rpc.erl
@@ -22,7 +22,7 @@
-define(SERVER, ?MODULE).
-export([start_json_server/0]).
--export([send_message/9,
+-export([send_message/8,
receive_message/3]).
-record(st, {
@@ -50,8 +50,7 @@ send_message(CompSpec,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters,
- Signature) ->
+ Parameters) ->
rvi_common:request(protocol, ?MODULE, send_message,
[{ transaction_id, TID },
{ service, ServiceName },
@@ -59,9 +58,8 @@ send_message(CompSpec,
{ protocol_opts, ProtoOpts },
{ data_link_mod, DataLinkMod },
{ data_link_opts, DataLinkOpts },
- { parameters, Parameters },
- { signature, Signature }],
- [ status ], CompSpec).
+ { parameters, Parameters }],
+ [ status ], CompSpec).
receive_message(CompSpec, {IP, Port}, Data) ->
rvi_common:notification(protocol, ?MODULE, receive_message,
@@ -74,6 +72,7 @@ receive_message(CompSpec, {IP, Port}, Data) ->
%% CAlled by local exo http server
handle_rpc("send_message", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, TID} = rvi_common:get_json_element(["transaction_id"], Args),
{ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
@@ -81,7 +80,6 @@ handle_rpc("send_message", Args) ->
{ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args),
{ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
- {ok, Signature} = rvi_common:get_json_element(["signature"], Args),
[ ok ] = gen_server:call(?SERVER, { rvi, send_message,
[TID,
ServiceName,
@@ -90,7 +88,7 @@ handle_rpc("send_message", Args) ->
DataLinkMod,
DataLinkOpts,
Parameters,
- Signature]}),
+ LogId]}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} ]};
@@ -101,10 +99,14 @@ handle_rpc(Other, _Args) ->
handle_notification("receive_message", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Data} = rvi_common:get_json_element(["data"], Args),
{ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args),
{ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args),
- gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}),
+ gen_server:cast(?SERVER, { rvi, receive_message, [Data,
+ RemoteIP,
+ RemotePort,
+ LogId]}),
ok;
handle_notification(Other, _Args) ->
@@ -119,8 +121,7 @@ handle_call({rvi, send_message,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters,
- Signature]}, _From, St) ->
+ Parameters | _LogId]}, _From, St) ->
?debug(" protocol:send(): transaction id: ~p~n", [TID]),
?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
@@ -128,13 +129,11 @@ handle_call({rvi, send_message,
?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]),
?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]),
?debug(" protocol:send(): parameters: ~p~n", [Parameters]),
- ?debug(" protocol:send(): signature: ~p~n", [Signature]),
Data = jsx:encode([
- { "tid", TID },
- { "service", ServiceName },
- { "timeout", Timeout },
- { "parameters", Parameters },
- { "signature", Signature }
+ { <<"tid">>, TID },
+ { <<"service">>, ServiceName },
+ { <<"timeout">>, Timeout },
+ { <<"parameters">>, Parameters }
]),
case use_frag(Parameters, DataLinkOpts) of
@@ -154,30 +153,28 @@ handle_call(Other, _From, St) ->
{ reply, [ invalid_command ], St}.
%% Convert list-based data to binary.
-handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) ->
+handle_cast({rvi, receive_message, [Payload, IP, Port | _LogId]} = Msg, St) ->
?debug("~p:handle_cast(~p)", [?MODULE, Msg]),
- Elems = jsx:decode(Payload),
+ Elems = jsx:decode(iolist_to_binary(Payload)),
case Elems of
[{<<"frg">>, _}|_] ->
St1 = handle_frag(Elems, IP, Port, St),
{noreply, St1};
_ ->
- [ ServiceName, Timeout, Parameters, Signature ] =
- opts([<<"service">>, <<"timeout">>, <<"parameters">>, <<"signature">>],
+ [ ServiceName, Timeout, Parameters ] =
+ opts([<<"service">>, <<"timeout">>, <<"parameters">>],
Elems, undefined),
?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
- ?debug(" protocol:rcv(): signature: ~p~n", [Signature]),
?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]),
service_edge_rpc:handle_remote_message(St#st.cs,
{IP, Port},
ServiceName,
Timeout,
- Parameters,
- Signature),
+ Parameters),
{noreply, St}
end;
diff --git a/components/proto_msgpack/src/proto_msgpack.app.src b/components/proto_msgpack/src/proto_msgpack.app.src
new file mode 100644
index 0000000..1cdfe27
--- /dev/null
+++ b/components/proto_msgpack/src/proto_msgpack.app.src
@@ -0,0 +1,26 @@
+%%
+%% Copyright (C) 2014, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+
+
+%% -*- erlang -*-
+{application, proto_msgpack,
+ [
+ {description, ""},
+ {vsn, "0.1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib,
+ rvi_common
+ ]},
+ {mod, { proto_msgpack_app, []}},
+ {start_phases, [{json_rpc, []}, {announce, []}]},
+ {env, [
+ {rvi_core_await, [{n, l, proto_msgpack}]}
+ ]}
+ ]}.
diff --git a/components/proto_msgpack/src/proto_msgpack_app.erl b/components/proto_msgpack/src/proto_msgpack_app.erl
new file mode 100644
index 0000000..d6d8ea8
--- /dev/null
+++ b/components/proto_msgpack/src/proto_msgpack_app.erl
@@ -0,0 +1,38 @@
+%%
+%% Copyright (C) 2014, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+
+
+-module(proto_msgpack_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2,
+ start_phase/3,
+ stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ proto_msgpack_sup:start_link().
+
+start_phase(init, _, _) ->
+ proto_msgpack_rpc:init_rvi_component(),
+ ok;
+
+start_phase(json_rpc, _, _) ->
+ proto_msgpack_rpc:start_json_server(),
+ ok;
+
+start_phase(announce, _, _) ->
+ rvi_common:announce({n, l, proto_msgpack}).
+
+stop(_State) ->
+ ok.
diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl
new file mode 100644
index 0000000..2b1f59c
--- /dev/null
+++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl
@@ -0,0 +1,180 @@
+%% -*- erlang-indent-level:4; indent-tabs-mode: nil -*-
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+
+
+-module(proto_msgpack_rpc).
+-behaviour(gen_server).
+
+-export([handle_rpc/2,
+ handle_notification/2]).
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+
+-include_lib("lager/include/log.hrl").
+-include_lib("rvi_common/include/rvi_common.hrl").
+
+-define(SERVER, ?MODULE).
+-export([start_json_server/0]).
+-export([send_message/8,
+ receive_message/3]).
+
+-record(st, {
+ %% Component specification
+ queue = [],
+ cs = #component_spec{},
+ pack_opts = [{allow_atom, pack},
+ {enable_str, true},
+ jsx]
+ }).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ ?debug("init(): called."),
+ {ok, #st{ cs = rvi_common:get_component_specification() }}.
+
+start_json_server() ->
+ rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_msgpack_sup).
+
+
+
+send_message(CompSpec,
+ TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters) ->
+ rvi_common:request(protocol, ?MODULE, send_message,
+ [{ transaction_id, TID },
+ { service, ServiceName },
+ { timeout, Timeout },
+ { protocol_opts, ProtoOpts },
+ { data_link_mod, DataLinkMod },
+ { data_link_opts, DataLinkOpts },
+ { parameters, Parameters }],
+ [ status ], CompSpec).
+
+receive_message(CompSpec, {IP, Port}, Data) ->
+ rvi_common:notification(protocol, ?MODULE, receive_message,
+ [ {data, Data },
+ {remote_ip, IP},
+ {remote_port, Port} ],
+ CompSpec).
+
+%% JSON-RPC entry point
+
+%% CAlled by local exo http server
+handle_rpc("send_message", Args) ->
+ {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args),
+ {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args),
+ {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
+ {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args),
+ {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args),
+ {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args),
+ {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ [ ok ] = gen_server:call(?SERVER, { rvi, send_message,
+ [TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters]}),
+ {ok, [ {status, rvi_common:json_rpc_status(ok)} ]};
+
+
+handle_rpc(Other, _Args) ->
+ ?warning("proto_msgpack_rpc:handle_rpc(~p): Unknown~n", [ Other ]),
+ { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
+
+
+handle_notification("receive_message", Args) ->
+ {ok, Data} = rvi_common:get_json_element(["data"], Args),
+ {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args),
+ {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args),
+ gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}),
+ ok;
+
+handle_notification(Other, _Args) ->
+ ?debug("handle_notification(Other=~p): unknown", [ Other ]),
+ ok.
+
+
+handle_call({rvi, send_message,
+ [TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters]}, _From, St) ->
+ ?debug(" protocol:send(): transaction id: ~p~n", [TID]),
+ ?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
+ ?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
+ ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]),
+ ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]),
+ ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]),
+ ?debug(" protocol:send(): parameters: ~p~n", [Parameters]),
+ Data = msgpack:pack([ { <<"tid">>, TID },
+ { <<"service">>, ServiceName },
+ { <<"timeout">>, Timeout },
+ { <<"parameters">>, Parameters } ], St#st.pack_opts),
+ Res = DataLinkMod:send_data(
+ St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data),
+ {reply, Res, St};
+
+handle_call(Other, _From, St) ->
+ ?warning("proto_msgpack_rpc:handle_call(~p): unknown", [ Other ]),
+ { reply, [ invalid_command ], St}.
+
+
+%% Convert list-based data to binary.
+handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) ->
+ ?debug("~p:handle_cast(~p)", [?MODULE, Msg]),
+ {ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts),
+
+ [ ServiceName, Timeout, Parameters ] =
+ opts([<<"service">>, <<"timeout">>, <<"parameters">>],
+ Elems, undefined),
+
+ ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
+ ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
+ ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]),
+
+ service_edge_rpc:handle_remote_message(St#st.cs,
+ {IP, Port},
+ ServiceName,
+ Timeout,
+ Parameters),
+ {noreply, St};
+
+handle_cast(Other, St) ->
+ ?warning("proto_msgpack_rpc:handle_cast(~p): unknown", [ Other ]),
+ {noreply, St}.
+
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+opt(K, L, Def) ->
+ case lists:keyfind(K, 1, L) of
+ {_, V} -> V;
+ false -> Def
+ end.
+
+opts(Keys, Elems, Def) ->
+ [ opt(K, Elems, Def) || K <- Keys].
diff --git a/components/proto_msgpack/src/proto_msgpack_sup.erl b/components/proto_msgpack/src/proto_msgpack_sup.erl
new file mode 100644
index 0000000..c96c272
--- /dev/null
+++ b/components/proto_msgpack/src/proto_msgpack_sup.erl
@@ -0,0 +1,38 @@
+%%
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+
+
+-module(proto_msgpack_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ {ok, { {one_for_one, 5, 10},
+ [
+ ?CHILD(proto_msgpack_rpc, worker)
+ ]} }.
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index 67b68eb..9d7917e 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -235,7 +235,14 @@ normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) ->
normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
normalize_routes_([ {Pr, DL} | Rem ], Acc) ->
- normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]).
+ normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]);
+normalize_routes_([H|T], Acc) ->
+ ?error("Unrecognized routing rule: ~p", [H]),
+ normalize_routes_(T, Acc);
+normalize_routes_(Other, Acc) ->
+ ?error("Unrecognized routing entry (expected list): ~p", [Other]),
+ lists:reverse(Acc).
+
find_protocols_(_DataLink, [], Acc ) ->
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index b66bdd1..0faddba 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -13,7 +13,7 @@
-include_lib("rvi_common/include/rvi_common.hrl").
%% API
-export([start_link/0]).
--export([schedule_message/5]).
+-export([schedule_message/4]).
%% Invoked by service discovery
%% FIXME: Should be rvi_service_discovery behavior
@@ -63,8 +63,8 @@
protocol, %% Protocol to use. { Module Opts }
routes, %% Routes retrieved for this
timeout_tref, %% Reference to erlang timer associated with this message.
- parameters,
- signature
+ log_id,
+ parameters
}).
@@ -123,15 +123,13 @@ start_json_server() ->
schedule_message(CompSpec,
SvcName,
Timeout,
- Parameters,
- Signature) ->
+ Parameters) ->
rvi_common:request(schedule, ?MODULE,
schedule_message,
[{ service, SvcName },
{ timeout, Timeout },
- { parameters, Parameters },
- { signature, Signature }],
+ { parameters, Parameters }],
[status, transaction_id], CompSpec).
@@ -158,18 +156,17 @@ handle_rpc(<<"schedule_message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
- {ok, Signature} = rvi_common:get_json_element(["signature"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
?debug("schedule_rpc:schedule_request(): service: ~p", [ SvcName]),
?debug("schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]),
%% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]),
- ?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]),
[ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message,
[ SvcName,
Timeout,
- {struct, Parameters},
- Signature]}),
+ Parameters,
+ LogId ]}),
{ok, [ { status, rvi_common:json_rpc_status(ok)},
{ transaction_id, TransID } ] };
@@ -184,19 +181,23 @@ handle_rpc(Other, _Args) ->
handle_notification("service_available", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
gen_server:cast(?SERVER, { rvi, service_available,
[ SvcName,
- list_to_existing_atom(DataLinkModule) ]}),
+ list_to_existing_atom(DataLinkModule),
+ LogId ]}),
ok;
handle_notification("service_unavailable", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
gen_server:cast(?SERVER, { rvi, service_unavailable,
[ SvcName,
- list_to_atom(DataLinkModule) ]}),
+ list_to_atom(DataLinkModule),
+ LogId ]}),
ok;
@@ -207,25 +208,24 @@ handle_notification(Other, _Args) ->
handle_call( { rvi, schedule_message,
[SvcName,
Timeout,
- Parameters,
- Signature] }, _From, St) ->
+ Parameters | LogId] }, _From, St) ->
?debug("sched:sched_msg(): service: ~p", [SvcName]),
?debug("sched:sched_msg(): timeout: ~p", [Timeout]),
?debug("sched:sched_msg(): parameters: ~p", [Parameters]),
- ?debug("sched:sched_msg(): signature: ~p", [Signature]),
%%?debug("sched:sched_msg(): St: ~p", [St]),
%% Create a transaction ID
{ TransID, NSt1} = create_transaction_id(St),
-
+ log(LogId, "queue: tid=~w", [TransID]),
%% Queue the message
- {_, NSt2 }= queue_message(SvcName,
- TransID,
+ Msg = #message{transaction_id = TransID,
+ service = SvcName,
+ timeout = Timeout,
+ parameters = Parameters,
+ log_id = LogId},
+ {_, NSt2 }= queue_message(Msg,
rvi_routing:get_service_routes(SvcName), %% Can be [] (no route)
- Timeout,
- Parameters,
- Signature,
NSt1),
{ reply, [ok, TransID], NSt2 };
@@ -334,12 +334,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID},
%% Try to requeue message
{ _Res, NSt } =
- queue_message(SvcName,
- Msg#message.transaction_id,
+ queue_message(Msg,
Msg#message.routes,
- Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature,
St),
{noreply, NSt}
end;
@@ -418,13 +414,7 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) ->
%%
%% Stash the message in the unknown datalinvariant of the service
%% and opportunistically send it if the service
-queue_message(SvcName,
- TransID,
- [ ],
- Timeout,
- Parameters,
- Signature,
- St) ->
+queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [], St) ->
TOut = calc_relative_tout(Timeout),
?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.",
@@ -434,33 +424,21 @@ queue_message(SvcName,
store_message(SvcRec,
orphaned,
- #message {
- transaction_id = TransID,
- service = SvcName,
- timeout = Timeout,
+ Msg#message {
data_link = undefined,
protocol = undefined,
routes = [],
- timeout_tref = 0,
- parameters = Parameters,
- signature = Signature
+ timeout_tref = 0
},
TOut),
{ok, St};
-
%% Try to queue message
-queue_message(SvcName,
- TransID,
+queue_message(#message{service = SvcName, timeout = Timeout} = Msg,
[ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ],
- Timeout,
- Parameters,
- Signature,
St) ->
?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]),
- %%?debug("sched:q(~p:~s): parameters: ~p", [DLMod, SvcName, Parameters]),
- %%?debug("sched:q(~p:~s): signature: ~p", [DLMod, SvcName, Signature]),
SvcRec = find_or_create_service(SvcName, DLMod, St),
@@ -470,16 +448,11 @@ queue_message(SvcName,
%% Once up, the data link will invoke service_availble()
%% to indicate that the service is available for the given DL.
%%
- Msg = #message {
- transaction_id = TransID,
- service = SvcName,
- timeout = Timeout,
+ Msg1 = Msg#message {
data_link = { DLMod, DLOpt },
protocol = { ProtoMod, ProtoOpt },
routes = RemainingRoutes,
- timeout_tref = 0,
- parameters = Parameters,
- signature = Signature
+ timeout_tref = 0
},
case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of
@@ -489,7 +462,7 @@ queue_message(SvcName,
?debug("sched:q(~p:~s): ~p seconds to compe up.",
[ DLMod, SvcName, TOut / 1000.0]),
- store_message(SvcRec, DLMod, Msg, TOut),
+ store_message(SvcRec, DLMod, Msg1, TOut),
{ok, St};
[ already_connected, _] ->
@@ -499,7 +472,7 @@ queue_message(SvcName,
%% Will re-queue message if cannot send.
{ _, NSt } =
- send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St),
+ send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg1, St),
{ ok, NSt };
@@ -508,13 +481,7 @@ queue_message(SvcName,
%%
[ Err, _Reason] ->
?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]),
- queue_message(SvcName,
- TransID,
- RemainingRoutes,
- Timeout,
- Parameters,
- Signature,
- St)
+ queue_message(Msg1, RemainingRoutes, St)
end.
@@ -528,15 +495,13 @@ send_message(local, _, _, _, Msg, St) ->
service_edge_rpc:handle_remote_message(St#st.cs,
Msg#message.service,
Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature),
+ Msg#message.parameters),
{ok, St};
%% Forward message to protocol.
send_message(DataLinkMod, DataLinkOpts,
ProtoMod, ProtoOpts,
Msg, St) ->
-
?debug("sched:send_msg(): ~p:~p:~p",
[ProtoMod, DataLinkMod, Msg#message.service]),
@@ -549,8 +514,7 @@ send_message(DataLinkMod, DataLinkOpts,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Msg#message.parameters,
- Msg#message.signature) of
+ Msg#message.parameters) of
%% Success
[ok] ->
@@ -563,15 +527,10 @@ send_message(DataLinkMod, DataLinkOpts,
[ProtoMod, DataLinkMod, Msg#message.service, Err]),
%% Requeue this message with the next route
- queue_message(Msg#message.service,
- Msg#message.transaction_id,
- Msg#message.routes,
- Msg#message.timeout,
- Msg#message.parameters,
- Msg#message.signature,
- St)
+ queue_message(Msg, Msg#message.routes, St)
end.
+
%% The service is not available
send_queued_messages(#service {
key = { SvcName, _ },
@@ -869,3 +828,8 @@ select_timeout(TimeOut1, TimeOut2) ->
{ false, false } -> min(TimeOut1, TimeOut2)
end.
+
+log([ID], Fmt, Args) ->
+ rvi_log:log(ID, <<"schedule">>, rvi_log:format(Fmt, Args));
+log(_, _, _) ->
+ ok.
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index b3ed684..d652725 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -24,7 +24,7 @@
terminate/2,
code_change/3]).
--export([handle_remote_message/6,
+-export([handle_remote_message/5,
handle_local_timeout/3]).
-export([start_json_server/0,
@@ -191,7 +191,7 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) ->
[{ service, SvcName },
{ data_link_module, DataLinkModule }], CompSpec).
-handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) ->
+handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) ->
{IP, Port} = Conn,
rvi_common:notification(service_edge, ?MODULE,
handle_remote_message,
@@ -199,8 +199,7 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) ->
{ port, Port },
{ service, SvcName },
{ timeout, Timeout },
- { parameters, Params },
- { signature, Signature }], CompSpec).
+ { parameters, Params }], CompSpec).
%% Invoked by schedule_rpc.
@@ -359,15 +358,13 @@ handle_notification(<<"handle_remote_message">>, Args) ->
{ ok, SvcName } = rvi_common:get_json_element(["service"], Args),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Args),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Args),
- { ok, Signature } = rvi_common:get_json_element(["signature"], Args),
gen_server:cast(?SERVER, { rvi, handle_remote_message,
[
IP,
Port,
SvcName,
Timeout,
- Parameters,
- Signature
+ Parameters
]}),
ok;
@@ -458,7 +455,7 @@ handle_call({ rvi, handle_local_message,
%% the messaage to its locally connected service_name service.
%%
?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
- [ok, Signature ] =
+ [ok] =
authorize_rpc:authorize_local_message(
CS, SvcName, [{service_name, SvcName},
{timeout, TimeoutArg},
@@ -522,8 +519,7 @@ handle_call({ rvi, handle_local_message,
[ _, TID ] = schedule_rpc:schedule_message(CS,
SvcName,
Timeout,
- Parameters,
- Signature),
+ Parameters),
{ reply, [ok, TID ], St}
end;
@@ -555,8 +551,7 @@ handle_cast({rvi, handle_remote_message,
Port,
SvcName,
Timeout,
- Parameters,
- Signature
+ Parameters
] }, St) ->
?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]),
@@ -564,7 +559,6 @@ handle_cast({rvi, handle_remote_message,
?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]),
?debug("service_edge:remote_msg(): timeout: ~p", [Timeout]),
?debug("service_edge:remote_msg(): parameters: ~p", [Parameters]),
- ?debug("service_edge:remote_msg(): signature: ~p", [Signature]),
%% Check if this is a local message.
case ets:lookup(?SERVICE_TABLE, SvcName) of
@@ -577,16 +571,15 @@ handle_cast({rvi, handle_remote_message,
{remote_port, Port},
{service_name, SvcName},
{timeout, Timeout},
- {parameters, Parameters1},
- {signature, Signature}]) of
+ {parameters, Parameters1}]) of
[ ok ] ->
forward_message_to_local_service(URL, SvcName,
Parameters, St#st.cs),
{ noreply, St};
- [ _ ] ->
- ?warning("service_entry:remote_msg(): Failed to authenticate ~p",
- [SvcName]),
+ [ _Other ] ->
+ ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)",
+ [SvcName, _Other]),
{ noreply, St}
end;
[] ->
@@ -700,6 +693,7 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) ->
%% Deliver the message to the local service, which can
%% be either a wse websocket, or a regular HTTP JSON-RPC call
spawn(fun() ->
+ try
log_outcome(
rvi_common:get_request_result(
dispatch_to_local_service(URL,
@@ -707,7 +701,12 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) ->
[{<<"service_name">>, LocalSvcName },
{<<"parameters">>, Parameters }])),
SvcName, CompSpec)
+ catch
+ Tag:Err ->
+ ?debug("Caught ~p:~p", [Tag,Err])
+ end
end),
+ timer:sleep(500),
[ ok, -1 ].
log_outcome({Status, _}, _SvcName, CS) ->