diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-11-05 17:29:22 +0100 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-11-20 13:47:02 -0800 |
commit | ca1f0abd9f0f1478da8380bff28cb25aada34c1d (patch) | |
tree | 814c4a3c17b247cc300f62673771dad5a5c6ed77 /components | |
parent | 1b44c2448344a10ae63904a796b6211c40a3f212 (diff) | |
download | rvi_core-ca1f0abd9f0f1478da8380bff28cb25aada34c1d.tar.gz |
All tests (incl remote method inv) pass
- No signatures on messages (dlink_tls)
- dlink_bt wasn't actually tested (test case passed erroneously)
- added proto_msgpack component
- fixed sneaky bug in 'setup'
Diffstat (limited to 'components')
-rw-r--r-- | components/authorize/src/authorize_keys.erl | 26 | ||||
-rw-r--r-- | components/authorize/src/authorize_rpc.erl | 65 | ||||
-rw-r--r-- | components/dlink_bt/src/bt_connection_manager.erl | 56 | ||||
-rw-r--r-- | components/dlink_bt/src/bt_listener.erl | 2 | ||||
-rw-r--r-- | components/dlink_bt/src/dlink_bt.app.src | 4 | ||||
-rw-r--r-- | components/dlink_bt/src/dlink_bt_rpc.erl | 21 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_conn.erl | 9 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_rpc.erl | 3 | ||||
-rw-r--r-- | components/proto_bert/src/proto_bert_rpc.erl | 60 | ||||
-rw-r--r-- | components/proto_json/src/proto_json_rpc.erl | 45 | ||||
-rw-r--r-- | components/proto_msgpack/src/proto_msgpack.app.src | 26 | ||||
-rw-r--r-- | components/proto_msgpack/src/proto_msgpack_app.erl | 38 | ||||
-rw-r--r-- | components/proto_msgpack/src/proto_msgpack_rpc.erl | 180 | ||||
-rw-r--r-- | components/proto_msgpack/src/proto_msgpack_sup.erl | 38 | ||||
-rw-r--r-- | components/schedule/src/rvi_routing.erl | 9 | ||||
-rw-r--r-- | components/schedule/src/schedule_rpc.erl | 118 | ||||
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 35 |
17 files changed, 499 insertions, 236 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index 834935e..1d205b6 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -11,7 +11,8 @@ save_cert/4]). -export([get_certificates/0, get_certificates/1]). --export([validate_message/2]). +-export([validate_message/2, + validate_service_call/2]). -export([filter_by_service/2, find_cert_by_service/1]). -export([public_key_to_json/1, @@ -106,6 +107,9 @@ authorize_jwt() -> validate_message(JWT, Conn) -> gen_server:call(?MODULE, {validate_message, JWT, Conn}). +validate_service_call(Service, Conn) -> + gen_server:call(?MODULE, {validate_service_call, Service, Conn}). + get_certificates() -> get_certificates(local). @@ -176,6 +180,8 @@ handle_call_({save_keys, Keys, Conn}, _, S) -> {reply, ok, S}; handle_call_({validate_message, JWT, Conn}, _, S) -> {reply, validate_message_(JWT, Conn), S}; +handle_call_({validate_service_call, Svc, Conn}, _, S) -> + {reply, validate_service_call_(Svc, Conn), S}; handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) -> case process_cert_struct(Cert, JWT) of invalid -> @@ -223,6 +229,14 @@ certs_by_conn(Conn) -> ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]), [C || {C,V} <- Certs, check_validity(V, UTC)]. +cert_recs_by_conn(Conn) -> + ?debug("cert_recs_by_conn(~p)~n", [Conn]), + UTC = rvi_common:utc_timestamp(), + Certs = ets:select(?CERTS, [{ {{Conn,'_'}, '$1'}, + [], ['$1'] }]), + ?debug("rough selection: ~p~n", [[abbrev_bin(C#cert.id) || C <- Certs]]), + [C || C <- Certs, check_validity(C#cert.validity, UTC)]. + filter_by_service_(Services, Conn) -> ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]), Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1', @@ -524,6 +538,16 @@ validate_message_1([{_,K}|T], JWT) -> validate_message_1([], _) -> error(invalid). +validate_service_call_(Svc, Conn) -> + case lists:filter(fun(C) -> can_invoke(Svc, C) end, cert_recs_by_conn(Conn)) of + [] -> + invalid; + [#cert{id = ID}|_] -> + {ok, ID} + end. + +can_invoke(Svc, #cert{invoke = In}) -> + lists:any(fun(I) -> match_svc(I, Svc) end, In). pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) -> P = integer_to_binary(Pub), diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index 54a9657..c91b216 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -112,7 +112,7 @@ authorize_local_message(CompSpec, Service, Params) -> rvi_common:request(authorize, ?MODULE, authorize_local_message, [{service, Service}, {parameters, Params}], - [status, signature], CompSpec). + [status], CompSpec). authorize_remote_message(CompSpec, Service, Params) -> ?debug("authorize_rpc:authorize_remote_msg(): service: ~p ~n", [Service]), @@ -301,17 +301,16 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _F handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) -> do_store_certs(Certs, Conn, LogId), {reply, [ok], State}; -handle_call({rvi, authorize_local_message, [Service, Params | LogId] } = R, _From, - #st{private_key = Key} = State) -> +handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) -> ?debug("authorize_rpc:handle_call(~p)~n", [R]), case authorize_keys:find_cert_by_service(Service) of - {ok, {ID, Cert}} -> - Msg = Params ++ [{<<"certificate">>, Cert}], - ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", - [authorize_keys:abbrev_payload(Msg)]), - Sig = authorize_sig:encode_jwt(Msg, Key), + {ok, {ID, _Cert}} -> + %% Msg = Params ++ [{<<"certificate">>, Cert}], + %% ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", + %% [authorize_keys:abbrev_payload(Msg)]), + %% Sig = authorize_sig:encode_jwt(Msg, Key), log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]), - {reply, [ok, Sig], State}; + {reply, [ok], State}; _ -> log(LogId, "NO CERTS for ~s", [Service]), {reply, [ not_found ], State} @@ -324,29 +323,19 @@ handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]}, Timeout = proplists:get_value(timeout, Params), SvcName = proplists:get_value(service_name, Params), Parameters = proplists:get_value(parameters, Params), - Signature = proplists:get_value(signature, Params), ?debug("authorize_rpc:authorize_remote_message(): remote_ip: ~p~n", [IP]), ?debug("authorize_rpc:authorize_remote_message(): remote_port: ~p~n", [Port]), ?debug("authorize_rpc:authorize_remote_message(): timeout: ~p~n", [Timeout]), ?debug("authorize_rpc:authorize_remote_message(): service_name: ~p~n", [SvcName]), ?debug("authorize_rpc:authorize_remote_message(): parameters: ~p~n", [Parameters]), - ?debug("authorize_rpc:authorize_remote_message(): signature: ~40s~n", [Signature]), - case authorize_keys:validate_message( - iolist_to_binary(Signature), {IP, Port}) of + case authorize_keys:validate_service_call(SvcName, {IP, Port}) of invalid -> - log(LogId, "signature INVALID", []), + log(LogId, "remote msg REJECTED", []), {reply, [ not_found ], State}; - Msg -> - case check_msg([{"timeout", Timeout}, - {"service_name", SvcName}, - {"parameters", Parameters}], Msg) of - ok -> - log(LogId, "params verified", []), - {reply, [ok], State}; - {error, {mismatch, Bad}} -> - log(LogId, "params MISMATCH: ~p", [Bad]), - {reply, [not_found], State} - end + {ok, CertID} -> + ?debug("validated Cert ID=~p", [CertID]), + log(LogId, "remote msg allowed: Cert=~s", [CertID]), + {reply, [ok], State} end; handle_call({rvi, filter_by_service, [Services, Conn | _LogId]}, _From, State) -> @@ -417,8 +406,8 @@ log([ID], Fmt, Args) -> log(_, _, _) -> ok. -check_msg(Checks, Params) -> - check_msg(Checks, Params, []). +%% check_msg(Checks, Params) -> +%% check_msg(Checks, Params, []). %% {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg), %% {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg), @@ -438,14 +427,14 @@ check_msg(Checks, Params) -> %% end %% end; -check_msg([], _, []) -> - ok; -check_msg([{Key, Expect}|T], Msg, Acc) -> - case rvi_common:get_json_element([Key], Msg) of - {ok, Expect} -> - check_msg(T, Msg, Acc); - _ -> - check_msg(T, Msg, [Key|Acc]) - end; -check_msg([], _, [_|_] = Acc) -> - {error, {mismatch, lists:reverse(Acc)}}. +%% check_msg([], _, []) -> +%% ok; +%% check_msg([{Key, Expect}|T], Msg, Acc) -> +%% case rvi_common:get_json_element([Key], Msg) of +%% {ok, Expect} -> +%% check_msg(T, Msg, Acc); +%% _ -> +%% check_msg(T, Msg, [Key|Acc]) +%% end; +%% check_msg([], _, [_|_] = Acc) -> +%% {error, {mismatch, lists:reverse(Acc)}}. diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl index de665a2..1b1e049 100644 --- a/components/dlink_bt/src/bt_connection_manager.erl +++ b/components/dlink_bt/src/bt_connection_manager.erl @@ -2,10 +2,10 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% -%% +%% %%%------------------------------------------------------------------- %%% @author magnus <magnus@t520.home> %%% @copyright (C) 2014, magnus @@ -32,7 +32,7 @@ -export([find_connection_by_pid/1]). -export([find_connection_by_address/2]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -record(st, { conn_by_pid = undefined, @@ -104,11 +104,11 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call({add_connection, BTAddr, Channel, Pid}, _From, - #st { conn_by_pid = ConPid, +handle_call({add_connection, BTAddr, Channel, Pid}, _From, + #st { conn_by_pid = ConPid, conn_by_addr = ConBTAddr} = St) -> - ?debug("~p:handle_call(add): Adding Pid: ~p, BTAddress: ~p", + ?debug("~p:handle_call(add): Adding Pid: ~p, BTAddress: ~p", [ ?MODULE, Pid, { BTAddr, Channel }]), %% Store so that we can find connection both by pid and by address NConPid = dict:store(Pid, { BTAddr, Channel }, ConPid), @@ -119,19 +119,19 @@ handle_call({add_connection, BTAddr, Channel, Pid}, _From, {reply, ok, NSt}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, +handle_call({delete_connection_by_pid, Pid}, _From, + #st { conn_by_pid = ConPid, conn_by_addr = ConBTAddr} = St) when is_pid(Pid)-> %% Find address associated with Pid case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", + error -> + ?debug("~p:handle_call(del_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - + {ok, BTAddr } -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, BTAddress: ~p", + ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, BTAddress: ~p", [ ?MODULE, Pid, BTAddr]), NConPid = dict:erase(Pid, ConPid), @@ -145,19 +145,19 @@ handle_call({delete_connection_by_pid, Pid}, _From, %% Delete connection by address -handle_call({ delete_connection_by_address, BTAddr, Channel}, _From, - #st { conn_by_pid = ConPid, +handle_call({ delete_connection_by_address, BTAddr, Channel}, _From, + #st { conn_by_pid = ConPid, conn_by_addr = ConBTAddr} = St) -> %% Find Pid associated with BTAddress case dict:find({BTAddr, Channel}, ConBTAddr) of - error -> - ?debug("~p:handle_call(del_by_addr): not found: ~p", + error -> + ?debug("~p:handle_call(del_by_addr): not found: ~p", [ ?MODULE, {BTAddr, Channel}]), { reply, not_found, St}; - + {ok, Pid } -> - ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, BTAddress: ~p", + ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, BTAddress: ~p", [ ?MODULE, Pid, {BTAddr, Channel}]), NConPid = dict:erase(Pid, ConPid), NConBTAddr = dict:erase({ BTAddr, Channel }, ConBTAddr), @@ -168,36 +168,36 @@ handle_call({ delete_connection_by_address, BTAddr, Channel}, _From, %% Find connection by pid -handle_call({ find_connection_by_pid, Pid}, _From, +handle_call({ find_connection_by_pid, Pid}, _From, #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> %% Find address associated with Pid case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(find_by_pid): not found: ~p", + error -> + ?debug("~p:handle_call(find_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - + {ok, {BTAddr, Channel} } -> - ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", + ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", [ ?MODULE, Pid, {BTAddr, Channel}]), {reply, {ok, BTAddr, Channel}, St} end; %% Find connection by address -handle_call({find_connection_by_address, BTAddr, Channel}, _From, +handle_call({find_connection_by_address, BTAddr, Channel}, _From, #st { conn_by_addr = ConBTAddr} = St) -> %% Find address associated with Pid case dict:find({BTAddr, Channel}, ConBTAddr) of - error -> - ?debug("~p:handle_call(find_by_addr): not found: ~p", + error -> + ?debug("~p:handle_call(find_by_addr): not found: ~p", [ ?MODULE, {BTAddr, Channel}]), { reply, not_found, St}; - + {ok, Pid } -> - ?debug("~p:handle_call(find_by_addr): BTAddr: ~p ->: ~p", + ?debug("~p:handle_call(find_by_addr): BTAddr: ~p ->: ~p", [ ?MODULE, {BTAddr, Channel}, Pid]), {reply, {ok, Pid}, St} end; diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl index e6129de..3711652 100644 --- a/components/dlink_bt/src/bt_listener.erl +++ b/components/dlink_bt/src/bt_listener.erl @@ -115,4 +115,4 @@ terminate(_Reason, _State) -> listen(bt, Channel) -> rfcomm:listen(Channel); listen(tcp, Port) -> - gen_tcp:listen(Port). + exo_socket:listen(Port). diff --git a/components/dlink_bt/src/dlink_bt.app.src b/components/dlink_bt/src/dlink_bt.app.src index 7b8e006..e63d1ac 100644 --- a/components/dlink_bt/src/dlink_bt.app.src +++ b/components/dlink_bt/src/dlink_bt.app.src @@ -3,7 +3,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -23,6 +23,6 @@ {mod, { dlink_bt_app, []}}, {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, {env, [ - {rvi_core_await, [{n,l,dlink_dt}]} + {rvi_core_await, [{n,l,dlink_bt}]} ]} ]}. diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index 168d13e..ab5f8ac 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -38,6 +38,7 @@ -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BT_CHANNEL, 1). +-define(DEFAULT_TCP_PORT, 8807). -define(DEFAULT_RECONNECT_INTERVAL, 1000). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). @@ -111,13 +112,13 @@ start_connection_manager() -> [], CompSpec), %% Retrieve the channel we should use - Channel = proplists:get_value(channel, BertOpts, ?DEFAULT_BT_CHANNEL), + Mode = get_mode(BertOpts), + Channel = get_channel(Mode, BertOpts), ?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener - Mode = get_mode(BertOpts), case Mode of bt -> bt:start(), @@ -126,7 +127,7 @@ start_connection_manager() -> ok end, bt_listener:start_link(Mode), - bt_connection_manager:start_link(Mode), + bt_connection_manager:start_link(), ?info("dlink_bt:start_connection_manager(): Adding listener on bluetooth channel ~p", [Channel ]), %% Add listener channel. @@ -149,13 +150,13 @@ start_connection_manager() -> ok. -get_mode(BertOpts) -> - case proplists:get_value(test_mode, BertOpts) of - TM when TM==undefined; TM==bt -> - bt; - tcp -> - tcp - end. +get_mode(Opts) -> + proplists:get_value(test_mode, Opts, bt). + +get_channel(tcp, Opts) -> + proplists:get_value(port, Opts, ?DEFAULT_TCP_PORT); +get_channel(bt, Opts) -> + proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL). setup_persistent_connections_([ ], _CompSpec) -> diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index abeb19c..c125ca3 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -39,6 +39,7 @@ -define(SERVER, ?MODULE). -define(PACKET_MOD, dlink_data_msgpack). +-define(MAX_MSG_SIZE, infinity). -record(st, { ip = {0,0,0,0}, @@ -50,7 +51,8 @@ mod = undefined, func = undefined, cs, - role = server :: client | server + role = server :: client | server, + msg_size = ?MAX_MSG_SIZE :: infinity | pos_integer() }). %%%=================================================================== @@ -144,8 +146,7 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), %% Grab socket control - {ok, PktMod} = rvi_common:get_module_config(dlink_tls, dlink_tls_rpc, - packet_mod, ?PACKET_MOD, CompSpec), + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), PktSt = PktMod:init(CompSpec), {ok, #st{ ip = IP, @@ -158,6 +159,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> cs = CompSpec }}. +get_module_config(Key, Default, CS) -> + rvi_common:get_module_config(dlink_tls, dlink_tls_rpc, Key, Default, CS). %%-------------------------------------------------------------------- %% @private diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 964cc0c..e42a156 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -749,8 +749,7 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> ?debug("dlink_tls:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]), ?debug("dlink_tls:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), Proto = list_to_existing_atom(ProtocolMod), - Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, - base64:decode_to_string(Data)). + Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data). process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), diff --git a/components/proto_bert/src/proto_bert_rpc.erl b/components/proto_bert/src/proto_bert_rpc.erl index cad1708..f1658a3 100644 --- a/components/proto_bert/src/proto_bert_rpc.erl +++ b/components/proto_bert/src/proto_bert_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -20,12 +20,12 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -export([start_json_server/0]). -export([send_message/8, receive_message/3]). --record(st, { +-record(st, { %% Component specification cs = #component_spec{} }). @@ -42,26 +42,26 @@ start_json_server() -> -send_message(CompSpec, - ServiceName, - Timeout, +send_message(CompSpec, + TID, + ServiceName, + Timeout, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters, - Signature) -> + Parameters) -> rvi_common:request(protocol, ?MODULE, send_message, - [ { service, ServiceName }, + [ { transaction_id, TID }, + { service, ServiceName }, { timeout, Timeout }, { protocol_opts, ProtoOpts }, { data_link_mod, DataLinkMod }, { data_link_opts, DataLinkOpts }, - { parameters, Parameters }, - { signature, Signature }], + { parameters, Parameters } ], [ status ], CompSpec). receive_message(CompSpec, {IP,Port}, Data) -> - rvi_common:notification(protocol, ?MODULE, receive_message, + rvi_common:notification(protocol, ?MODULE, receive_message, [ {data, Data}, {remote_ip, IP}, {remote_port, Port} ], @@ -71,23 +71,25 @@ receive_message(CompSpec, {IP,Port}, Data) -> %% CAlled by local exo http server handle_rpc("send_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), + {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args), {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args), {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), - {ok, Signature} = rvi_common:get_json_element(["signature"], Args), - [ ok ] = gen_server:call(?SERVER, { rvi, send_message, - [ServiceName, + [ ok ] = gen_server:call(?SERVER, { rvi, send_message, + [TID, + ServiceName, Timeout, ProtoOpts, DataLinkMod, DataLinkOpts, Parameters, - Signature]}), + LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; - + handle_rpc(Other, _Args) -> @@ -107,23 +109,22 @@ handle_notification(Other, _Args) -> ok. -handle_call({rvi, send_message, - [ServiceName, +handle_call({rvi, send_message, + [_TID, + ServiceName, Timeout, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters, - Signature]}, _From, St) -> + Parameters | _LogId]}, _From, St) -> ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]), ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), %% ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), - ?debug(" protocol:send(): signature: ~p~n", [Signature]), - - Data = term_to_binary({ ServiceName, Timeout, Parameters, Signature }), + + Data = term_to_binary({ ServiceName, Timeout, Parameters }), Res = DataLinkMod:send_data(St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data), @@ -149,21 +150,18 @@ handle_cast_({rvi, receive_message, [Data,IP,Port]}, St) when is_list(Data)-> handle_cast({ rvi, receive_message, [list_to_binary(Data),IP,Port] }, St); handle_cast_({rvi, receive_message, [Data, IP, Port]}, St) -> - {ServiceName, - Timeout, - Parameters, - Signature} = binary_to_term(Data), + {ServiceName, + Timeout, + Parameters} = binary_to_term(Data), ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), %% ?debug(" protocol:rcv(): parameters: ~p~n", [Parameters]), - ?debug(" protocol:rcv(): signature: ~p~n", [Signature]), ?debug(" protocol:rcv(): remote IP/port: ~p~n", [{IP, Port}]), service_edge_rpc:handle_remote_message(St#st.cs, {IP, Port}, ServiceName, Timeout, - Parameters, - Signature), + Parameters), {noreply, St}; handle_cast_(Other, St) -> diff --git a/components/proto_json/src/proto_json_rpc.erl b/components/proto_json/src/proto_json_rpc.erl index 9445965..9f7ccc0 100644 --- a/components/proto_json/src/proto_json_rpc.erl +++ b/components/proto_json/src/proto_json_rpc.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -export([start_json_server/0]). --export([send_message/9, +-export([send_message/8, receive_message/3]). -record(st, { @@ -50,8 +50,7 @@ send_message(CompSpec, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters, - Signature) -> + Parameters) -> rvi_common:request(protocol, ?MODULE, send_message, [{ transaction_id, TID }, { service, ServiceName }, @@ -59,9 +58,8 @@ send_message(CompSpec, { protocol_opts, ProtoOpts }, { data_link_mod, DataLinkMod }, { data_link_opts, DataLinkOpts }, - { parameters, Parameters }, - { signature, Signature }], - [ status ], CompSpec). + { parameters, Parameters }], + [ status ], CompSpec). receive_message(CompSpec, {IP, Port}, Data) -> rvi_common:notification(protocol, ?MODULE, receive_message, @@ -74,6 +72,7 @@ receive_message(CompSpec, {IP, Port}, Data) -> %% CAlled by local exo http server handle_rpc("send_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args), {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), @@ -81,7 +80,6 @@ handle_rpc("send_message", Args) -> {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), - {ok, Signature} = rvi_common:get_json_element(["signature"], Args), [ ok ] = gen_server:call(?SERVER, { rvi, send_message, [TID, ServiceName, @@ -90,7 +88,7 @@ handle_rpc("send_message", Args) -> DataLinkMod, DataLinkOpts, Parameters, - Signature]}), + LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; @@ -101,10 +99,14 @@ handle_rpc(Other, _Args) -> handle_notification("receive_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Data} = rvi_common:get_json_element(["data"], Args), {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args), {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args), - gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}), + gen_server:cast(?SERVER, { rvi, receive_message, [Data, + RemoteIP, + RemotePort, + LogId]}), ok; handle_notification(Other, _Args) -> @@ -119,8 +121,7 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters, - Signature]}, _From, St) -> + Parameters | _LogId]}, _From, St) -> ?debug(" protocol:send(): transaction id: ~p~n", [TID]), ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), @@ -128,13 +129,11 @@ handle_call({rvi, send_message, ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), - ?debug(" protocol:send(): signature: ~p~n", [Signature]), Data = jsx:encode([ - { "tid", TID }, - { "service", ServiceName }, - { "timeout", Timeout }, - { "parameters", Parameters }, - { "signature", Signature } + { <<"tid">>, TID }, + { <<"service">>, ServiceName }, + { <<"timeout">>, Timeout }, + { <<"parameters">>, Parameters } ]), case use_frag(Parameters, DataLinkOpts) of @@ -154,30 +153,28 @@ handle_call(Other, _From, St) -> { reply, [ invalid_command ], St}. %% Convert list-based data to binary. -handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) -> +handle_cast({rvi, receive_message, [Payload, IP, Port | _LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), - Elems = jsx:decode(Payload), + Elems = jsx:decode(iolist_to_binary(Payload)), case Elems of [{<<"frg">>, _}|_] -> St1 = handle_frag(Elems, IP, Port, St), {noreply, St1}; _ -> - [ ServiceName, Timeout, Parameters, Signature ] = - opts([<<"service">>, <<"timeout">>, <<"parameters">>, <<"signature">>], + [ ServiceName, Timeout, Parameters ] = + opts([<<"service">>, <<"timeout">>, <<"parameters">>], Elems, undefined), ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), - ?debug(" protocol:rcv(): signature: ~p~n", [Signature]), ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]), service_edge_rpc:handle_remote_message(St#st.cs, {IP, Port}, ServiceName, Timeout, - Parameters, - Signature), + Parameters), {noreply, St} end; diff --git a/components/proto_msgpack/src/proto_msgpack.app.src b/components/proto_msgpack/src/proto_msgpack.app.src new file mode 100644 index 0000000..1cdfe27 --- /dev/null +++ b/components/proto_msgpack/src/proto_msgpack.app.src @@ -0,0 +1,26 @@ +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +%% -*- erlang -*- +{application, proto_msgpack, + [ + {description, ""}, + {vsn, "0.1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + rvi_common + ]}, + {mod, { proto_msgpack_app, []}}, + {start_phases, [{json_rpc, []}, {announce, []}]}, + {env, [ + {rvi_core_await, [{n, l, proto_msgpack}]} + ]} + ]}. diff --git a/components/proto_msgpack/src/proto_msgpack_app.erl b/components/proto_msgpack/src/proto_msgpack_app.erl new file mode 100644 index 0000000..d6d8ea8 --- /dev/null +++ b/components/proto_msgpack/src/proto_msgpack_app.erl @@ -0,0 +1,38 @@ +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(proto_msgpack_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, + start_phase/3, + stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + proto_msgpack_sup:start_link(). + +start_phase(init, _, _) -> + proto_msgpack_rpc:init_rvi_component(), + ok; + +start_phase(json_rpc, _, _) -> + proto_msgpack_rpc:start_json_server(), + ok; + +start_phase(announce, _, _) -> + rvi_common:announce({n, l, proto_msgpack}). + +stop(_State) -> + ok. diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl new file mode 100644 index 0000000..2b1f59c --- /dev/null +++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl @@ -0,0 +1,180 @@ +%% -*- erlang-indent-level:4; indent-tabs-mode: nil -*- +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(proto_msgpack_rpc). +-behaviour(gen_server). + +-export([handle_rpc/2, + handle_notification/2]). +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + + +-include_lib("lager/include/log.hrl"). +-include_lib("rvi_common/include/rvi_common.hrl"). + +-define(SERVER, ?MODULE). +-export([start_json_server/0]). +-export([send_message/8, + receive_message/3]). + +-record(st, { + %% Component specification + queue = [], + cs = #component_spec{}, + pack_opts = [{allow_atom, pack}, + {enable_str, true}, + jsx] + }). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + ?debug("init(): called."), + {ok, #st{ cs = rvi_common:get_component_specification() }}. + +start_json_server() -> + rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_msgpack_sup). + + + +send_message(CompSpec, + TID, + ServiceName, + Timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + Parameters) -> + rvi_common:request(protocol, ?MODULE, send_message, + [{ transaction_id, TID }, + { service, ServiceName }, + { timeout, Timeout }, + { protocol_opts, ProtoOpts }, + { data_link_mod, DataLinkMod }, + { data_link_opts, DataLinkOpts }, + { parameters, Parameters }], + [ status ], CompSpec). + +receive_message(CompSpec, {IP, Port}, Data) -> + rvi_common:notification(protocol, ?MODULE, receive_message, + [ {data, Data }, + {remote_ip, IP}, + {remote_port, Port} ], + CompSpec). + +%% JSON-RPC entry point + +%% CAlled by local exo http server +handle_rpc("send_message", Args) -> + {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args), + {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args), + {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), + {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args), + {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args), + {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args), + {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), + [ ok ] = gen_server:call(?SERVER, { rvi, send_message, + [TID, + ServiceName, + Timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + Parameters]}), + {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; + + +handle_rpc(Other, _Args) -> + ?warning("proto_msgpack_rpc:handle_rpc(~p): Unknown~n", [ Other ]), + { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. + + +handle_notification("receive_message", Args) -> + {ok, Data} = rvi_common:get_json_element(["data"], Args), + {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args), + {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args), + gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}), + ok; + +handle_notification(Other, _Args) -> + ?debug("handle_notification(Other=~p): unknown", [ Other ]), + ok. + + +handle_call({rvi, send_message, + [TID, + ServiceName, + Timeout, + ProtoOpts, + DataLinkMod, + DataLinkOpts, + Parameters]}, _From, St) -> + ?debug(" protocol:send(): transaction id: ~p~n", [TID]), + ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), + ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), + ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]), + ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]), + ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]), + ?debug(" protocol:send(): parameters: ~p~n", [Parameters]), + Data = msgpack:pack([ { <<"tid">>, TID }, + { <<"service">>, ServiceName }, + { <<"timeout">>, Timeout }, + { <<"parameters">>, Parameters } ], St#st.pack_opts), + Res = DataLinkMod:send_data( + St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data), + {reply, Res, St}; + +handle_call(Other, _From, St) -> + ?warning("proto_msgpack_rpc:handle_call(~p): unknown", [ Other ]), + { reply, [ invalid_command ], St}. + + +%% Convert list-based data to binary. +handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) -> + ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), + {ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts), + + [ ServiceName, Timeout, Parameters ] = + opts([<<"service">>, <<"timeout">>, <<"parameters">>], + Elems, undefined), + + ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), + ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), + ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]), + + service_edge_rpc:handle_remote_message(St#st.cs, + {IP, Port}, + ServiceName, + Timeout, + Parameters), + {noreply, St}; + +handle_cast(Other, St) -> + ?warning("proto_msgpack_rpc:handle_cast(~p): unknown", [ Other ]), + {noreply, St}. + +handle_info(_Info, St) -> + {noreply, St}. + +terminate(_Reason, _St) -> + ok. +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +opt(K, L, Def) -> + case lists:keyfind(K, 1, L) of + {_, V} -> V; + false -> Def + end. + +opts(Keys, Elems, Def) -> + [ opt(K, Elems, Def) || K <- Keys]. diff --git a/components/proto_msgpack/src/proto_msgpack_sup.erl b/components/proto_msgpack/src/proto_msgpack_sup.erl new file mode 100644 index 0000000..c96c272 --- /dev/null +++ b/components/proto_msgpack/src/proto_msgpack_sup.erl @@ -0,0 +1,38 @@ +%% +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(proto_msgpack_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, + [ + ?CHILD(proto_msgpack_rpc, worker) + ]} }. diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl index 67b68eb..9d7917e 100644 --- a/components/schedule/src/rvi_routing.erl +++ b/components/schedule/src/rvi_routing.erl @@ -235,7 +235,14 @@ normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) -> normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]); normalize_routes_([ {Pr, DL} | Rem ], Acc) -> - normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]). + normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]); +normalize_routes_([H|T], Acc) -> + ?error("Unrecognized routing rule: ~p", [H]), + normalize_routes_(T, Acc); +normalize_routes_(Other, Acc) -> + ?error("Unrecognized routing entry (expected list): ~p", [Other]), + lists:reverse(Acc). + find_protocols_(_DataLink, [], Acc ) -> diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index b66bdd1..0faddba 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -13,7 +13,7 @@ -include_lib("rvi_common/include/rvi_common.hrl"). %% API -export([start_link/0]). --export([schedule_message/5]). +-export([schedule_message/4]). %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior @@ -63,8 +63,8 @@ protocol, %% Protocol to use. { Module Opts } routes, %% Routes retrieved for this timeout_tref, %% Reference to erlang timer associated with this message. - parameters, - signature + log_id, + parameters }). @@ -123,15 +123,13 @@ start_json_server() -> schedule_message(CompSpec, SvcName, Timeout, - Parameters, - Signature) -> + Parameters) -> rvi_common:request(schedule, ?MODULE, schedule_message, [{ service, SvcName }, { timeout, Timeout }, - { parameters, Parameters }, - { signature, Signature }], + { parameters, Parameters }], [status, transaction_id], CompSpec). @@ -158,18 +156,17 @@ handle_rpc(<<"schedule_message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), - {ok, Signature} = rvi_common:get_json_element(["signature"], Args), + LogId = rvi_common:get_json_log_id(Args), ?debug("schedule_rpc:schedule_request(): service: ~p", [ SvcName]), ?debug("schedule_rpc:schedule_request(): timeout: ~p", [ Timeout]), %% ?debug("schedule_rpc:schedule_request(): parameters: ~p", [Parameters]), - ?debug("schedule_rpc:schedule_request(): signature: ~p", [Signature]), [ok, TransID] = gen_server:call(?SERVER, { rvi, schedule_message, [ SvcName, Timeout, - {struct, Parameters}, - Signature]}), + Parameters, + LogId ]}), {ok, [ { status, rvi_common:json_rpc_status(ok)}, { transaction_id, TransID } ] }; @@ -184,19 +181,23 @@ handle_rpc(Other, _Args) -> handle_notification("service_available", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), + LogId = rvi_common:get_json_log_id(Args), gen_server:cast(?SERVER, { rvi, service_available, [ SvcName, - list_to_existing_atom(DataLinkModule) ]}), + list_to_existing_atom(DataLinkModule), + LogId ]}), ok; handle_notification("service_unavailable", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_mod"], Args), + LogId = rvi_common:get_json_log_id(Args), gen_server:cast(?SERVER, { rvi, service_unavailable, [ SvcName, - list_to_atom(DataLinkModule) ]}), + list_to_atom(DataLinkModule), + LogId ]}), ok; @@ -207,25 +208,24 @@ handle_notification(Other, _Args) -> handle_call( { rvi, schedule_message, [SvcName, Timeout, - Parameters, - Signature] }, _From, St) -> + Parameters | LogId] }, _From, St) -> ?debug("sched:sched_msg(): service: ~p", [SvcName]), ?debug("sched:sched_msg(): timeout: ~p", [Timeout]), ?debug("sched:sched_msg(): parameters: ~p", [Parameters]), - ?debug("sched:sched_msg(): signature: ~p", [Signature]), %%?debug("sched:sched_msg(): St: ~p", [St]), %% Create a transaction ID { TransID, NSt1} = create_transaction_id(St), - + log(LogId, "queue: tid=~w", [TransID]), %% Queue the message - {_, NSt2 }= queue_message(SvcName, - TransID, + Msg = #message{transaction_id = TransID, + service = SvcName, + timeout = Timeout, + parameters = Parameters, + log_id = LogId}, + {_, NSt2 }= queue_message(Msg, rvi_routing:get_service_routes(SvcName), %% Can be [] (no route) - Timeout, - Parameters, - Signature, NSt1), { reply, [ok, TransID], NSt2 }; @@ -334,12 +334,8 @@ handle_info({ rvi_message_timeout, SvcName, DLMod,TransID}, %% Try to requeue message { _Res, NSt } = - queue_message(SvcName, - Msg#message.transaction_id, + queue_message(Msg, Msg#message.routes, - Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature, St), {noreply, NSt} end; @@ -418,13 +414,7 @@ store_message(SvcRec, DataLinkMod, Message, RelativeTimeout) -> %% %% Stash the message in the unknown datalinvariant of the service %% and opportunistically send it if the service -queue_message(SvcName, - TransID, - [ ], - Timeout, - Parameters, - Signature, - St) -> +queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [], St) -> TOut = calc_relative_tout(Timeout), ?debug("sched:q(~s): No more routes. Will orphan for ~p seconds.", @@ -434,33 +424,21 @@ queue_message(SvcName, store_message(SvcRec, orphaned, - #message { - transaction_id = TransID, - service = SvcName, - timeout = Timeout, + Msg#message { data_link = undefined, protocol = undefined, routes = [], - timeout_tref = 0, - parameters = Parameters, - signature = Signature + timeout_tref = 0 }, TOut), {ok, St}; - %% Try to queue message -queue_message(SvcName, - TransID, +queue_message(#message{service = SvcName, timeout = Timeout} = Msg, [ { { ProtoMod, ProtoOpt }, { DLMod, DLOpt } } | RemainingRoutes ], - Timeout, - Parameters, - Signature, St) -> ?debug("sched:q(~p:~s): timeout: ~p", [DLMod, SvcName, Timeout]), - %%?debug("sched:q(~p:~s): parameters: ~p", [DLMod, SvcName, Parameters]), - %%?debug("sched:q(~p:~s): signature: ~p", [DLMod, SvcName, Signature]), SvcRec = find_or_create_service(SvcName, DLMod, St), @@ -470,16 +448,11 @@ queue_message(SvcName, %% Once up, the data link will invoke service_availble() %% to indicate that the service is available for the given DL. %% - Msg = #message { - transaction_id = TransID, - service = SvcName, - timeout = Timeout, + Msg1 = Msg#message { data_link = { DLMod, DLOpt }, protocol = { ProtoMod, ProtoOpt }, routes = RemainingRoutes, - timeout_tref = 0, - parameters = Parameters, - signature = Signature + timeout_tref = 0 }, case DLMod:setup_data_link(St#st.cs, SvcName, DLOpt) of @@ -489,7 +462,7 @@ queue_message(SvcName, ?debug("sched:q(~p:~s): ~p seconds to compe up.", [ DLMod, SvcName, TOut / 1000.0]), - store_message(SvcRec, DLMod, Msg, TOut), + store_message(SvcRec, DLMod, Msg1, TOut), {ok, St}; [ already_connected, _] -> @@ -499,7 +472,7 @@ queue_message(SvcName, %% Will re-queue message if cannot send. { _, NSt } = - send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg, St), + send_message(DLMod, DLOpt, ProtoMod, ProtoOpt, Msg1, St), { ok, NSt }; @@ -508,13 +481,7 @@ queue_message(SvcName, %% [ Err, _Reason] -> ?debug("sched:q(~p:~s): failed to setup: ~p", [ DLMod, SvcName, Err]), - queue_message(SvcName, - TransID, - RemainingRoutes, - Timeout, - Parameters, - Signature, - St) + queue_message(Msg1, RemainingRoutes, St) end. @@ -528,15 +495,13 @@ send_message(local, _, _, _, Msg, St) -> service_edge_rpc:handle_remote_message(St#st.cs, Msg#message.service, Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature), + Msg#message.parameters), {ok, St}; %% Forward message to protocol. send_message(DataLinkMod, DataLinkOpts, ProtoMod, ProtoOpts, Msg, St) -> - ?debug("sched:send_msg(): ~p:~p:~p", [ProtoMod, DataLinkMod, Msg#message.service]), @@ -549,8 +514,7 @@ send_message(DataLinkMod, DataLinkOpts, ProtoOpts, DataLinkMod, DataLinkOpts, - Msg#message.parameters, - Msg#message.signature) of + Msg#message.parameters) of %% Success [ok] -> @@ -563,15 +527,10 @@ send_message(DataLinkMod, DataLinkOpts, [ProtoMod, DataLinkMod, Msg#message.service, Err]), %% Requeue this message with the next route - queue_message(Msg#message.service, - Msg#message.transaction_id, - Msg#message.routes, - Msg#message.timeout, - Msg#message.parameters, - Msg#message.signature, - St) + queue_message(Msg, Msg#message.routes, St) end. + %% The service is not available send_queued_messages(#service { key = { SvcName, _ }, @@ -869,3 +828,8 @@ select_timeout(TimeOut1, TimeOut2) -> { false, false } -> min(TimeOut1, TimeOut2) end. + +log([ID], Fmt, Args) -> + rvi_log:log(ID, <<"schedule">>, rvi_log:format(Fmt, Args)); +log(_, _, _) -> + ok. diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index b3ed684..d652725 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -24,7 +24,7 @@ terminate/2, code_change/3]). --export([handle_remote_message/6, +-export([handle_remote_message/5, handle_local_timeout/3]). -export([start_json_server/0, @@ -191,7 +191,7 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) -> [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). -handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) -> +handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params) -> {IP, Port} = Conn, rvi_common:notification(service_edge, ?MODULE, handle_remote_message, @@ -199,8 +199,7 @@ handle_remote_message(CompSpec, Conn, SvcName, Timeout, Params, Signature) -> { port, Port }, { service, SvcName }, { timeout, Timeout }, - { parameters, Params }, - { signature, Signature }], CompSpec). + { parameters, Params }], CompSpec). %% Invoked by schedule_rpc. @@ -359,15 +358,13 @@ handle_notification(<<"handle_remote_message">>, Args) -> { ok, SvcName } = rvi_common:get_json_element(["service"], Args), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Args), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Args), - { ok, Signature } = rvi_common:get_json_element(["signature"], Args), gen_server:cast(?SERVER, { rvi, handle_remote_message, [ IP, Port, SvcName, Timeout, - Parameters, - Signature + Parameters ]}), ok; @@ -458,7 +455,7 @@ handle_call({ rvi, handle_local_message, %% the messaage to its locally connected service_name service. %% ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), - [ok, Signature ] = + [ok] = authorize_rpc:authorize_local_message( CS, SvcName, [{service_name, SvcName}, {timeout, TimeoutArg}, @@ -522,8 +519,7 @@ handle_call({ rvi, handle_local_message, [ _, TID ] = schedule_rpc:schedule_message(CS, SvcName, Timeout, - Parameters, - Signature), + Parameters), { reply, [ok, TID ], St} end; @@ -555,8 +551,7 @@ handle_cast({rvi, handle_remote_message, Port, SvcName, Timeout, - Parameters, - Signature + Parameters ] }, St) -> ?debug("service_edge:remote_msg(): remote_ip: ~p", [IP]), @@ -564,7 +559,6 @@ handle_cast({rvi, handle_remote_message, ?debug("service_edge:remote_msg(): service_name: ~p", [SvcName]), ?debug("service_edge:remote_msg(): timeout: ~p", [Timeout]), ?debug("service_edge:remote_msg(): parameters: ~p", [Parameters]), - ?debug("service_edge:remote_msg(): signature: ~p", [Signature]), %% Check if this is a local message. case ets:lookup(?SERVICE_TABLE, SvcName) of @@ -577,16 +571,15 @@ handle_cast({rvi, handle_remote_message, {remote_port, Port}, {service_name, SvcName}, {timeout, Timeout}, - {parameters, Parameters1}, - {signature, Signature}]) of + {parameters, Parameters1}]) of [ ok ] -> forward_message_to_local_service(URL, SvcName, Parameters, St#st.cs), { noreply, St}; - [ _ ] -> - ?warning("service_entry:remote_msg(): Failed to authenticate ~p", - [SvcName]), + [ _Other ] -> + ?warning("service_entry:remote_msg(): Failed to authenticate ~p (~p)", + [SvcName, _Other]), { noreply, St} end; [] -> @@ -700,6 +693,7 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> %% Deliver the message to the local service, which can %% be either a wse websocket, or a regular HTTP JSON-RPC call spawn(fun() -> + try log_outcome( rvi_common:get_request_result( dispatch_to_local_service(URL, @@ -707,7 +701,12 @@ forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> [{<<"service_name">>, LocalSvcName }, {<<"parameters">>, Parameters }])), SvcName, CompSpec) + catch + Tag:Err -> + ?debug("Caught ~p:~p", [Tag,Err]) + end end), + timer:sleep(500), [ ok, -1 ]. log_outcome({Status, _}, _SvcName, CS) -> |