diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-10-28 21:23:05 +0100 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-11-20 13:46:13 -0800 |
commit | 1b44c2448344a10ae63904a796b6211c40a3f212 (patch) | |
tree | ce8f7dda870a5c454ffbe9e2c0bc7035b34f0f4b /components | |
parent | 34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (diff) | |
download | rvi_core-1b44c2448344a10ae63904a796b6211c40a3f212.tar.gz |
Lots of changes to make dlink_bt (simulated) and dlink_tls runtime tests pass
* Introduced high-level logging (rvi_log)
* Upgraded to new lager version, customized debug output
* Thread rvi_log IDs between nodes and components
* Introduce simplified protocol for dlink_tls
* Use msgpack encoding for dlink_tls
* dlink_bt can use TCP instead of Bluetooth for testing purposes
* Bug fixes and additions to the test suite
Diffstat (limited to 'components')
23 files changed, 1232 insertions, 654 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index daf7dc2..834935e 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -8,7 +8,7 @@ provisioning_key/0, signed_public_key/2, save_keys/2, - save_cert/3]). + save_cert/4]). -export([get_certificates/0, get_certificates/1]). -export([validate_message/2]). @@ -18,6 +18,10 @@ json_to_public_key/1]). -export([self_signed_public_key/0]). % just temporary +-export([pp_key/1, + abbrev_bin/1, + abbrev_payload/1, + abbrev_jwt/1]). -export([start_link/0, init/1, @@ -120,8 +124,8 @@ provisioning_key() -> save_keys(Keys, Conn) -> gen_server:call(?MODULE, {save_keys, Keys, Conn}). -save_cert(Cert, JWT, Conn) -> - gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn}). +save_cert(Cert, JWT, Conn, LogId) -> + gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn, LogId}). %% Gen_server functions @@ -138,7 +142,7 @@ start_link() -> init([]) -> ProvisioningKey = get_pub_key(get_env(provisioning_key)), - ?debug("ProvisioningKey = ~p~n", [ProvisioningKey]), + ?debug("ProvisioningKey = ~s~n", [pp_key(ProvisioningKey)]), CertDir = setup:verify_dir(get_env(cert_dir)), {ok, AuthJwt0} = file:read_file(get_env(authorize_jwt)), AuthJwt = strip_nl(AuthJwt0), @@ -167,17 +171,19 @@ handle_call_({get_certificates, Conn}, _, S) -> Certs = certs_by_conn(Conn), {reply, Certs, S}; handle_call_({save_keys, Keys, Conn}, _, S) -> - ?debug("save_keys: Keys=~p, Conn=~p~n", [Keys, Conn]), + ?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]), save_keys_(Keys, Conn), {reply, ok, S}; handle_call_({validate_message, JWT, Conn}, _, S) -> {reply, validate_message_(JWT, Conn), S}; -handle_call_({save_cert, Cert, JWT, Conn}, _, S) -> +handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) -> case process_cert_struct(Cert, JWT) of invalid -> + log(LogId, "cert INVALID Conn=~s:~w", [IP, Port]), {reply, {error, invalid}, S}; #cert{} = C -> ets:insert(?CERTS, {{Conn, C#cert.id}, C}), + log(LogId, "cert stored ~s Conn=~s:~w", [abbrev_bin(C#cert.id), IP, Port]), {reply, ok, S} end; handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> @@ -188,7 +194,7 @@ handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> handle_call_({find_cert_by_service, Service} = R, _From, State) -> ?debug("authorize_keys:handle_call(~p,...)~n", [R]), Res = find_cert_by_service_(Service), - ?debug("Res = ~p~n", [Res]), + ?debug("Res = ~p~n", [case Res of {ok,{A,B}} -> {ok,{A,abbrev_bin(B)}}; _ -> Res end]), {reply, Res, State}; handle_call_(_, _, S) -> {reply, error, S}. @@ -214,7 +220,7 @@ certs_by_conn(Conn) -> validity = '$2', _='_'}}, [], [{{'$1', '$2'}}] }]), - ?debug("rough selection: ~p~n", [Certs]), + ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]), [C || {C,V} <- Certs, check_validity(V, UTC)]. filter_by_service_(Services, Conn) -> @@ -258,8 +264,8 @@ find_cert_by_service_(Service) -> end, {0, none}, LocalCerts) of {0, none} -> {error, not_found}; - {_, Found} -> - {ok, Found#cert.jwt} + {_, #cert{id = Id, jwt = JWT}} -> + {ok, {Id, JWT}} end. match_length(Invoke, Svc) -> @@ -394,7 +400,7 @@ process_cert(F, Key, UTC, Acc) -> {ok, Bin} -> try authorize_sig:decode_jwt(strip_nl(Bin), Key) of {_, Cert} -> - ?info("Unpacked Cert ~p:~n~p~n", [F, Cert]), + ?info("Unpacked Cert ~p:~n~p~n", [F, abbrev_payload(Cert)]), case process_cert_struct(Cert, Bin, UTC) of invalid -> Acc; @@ -492,7 +498,7 @@ save_key(K, Conn) -> {ok, ID} -> {Conn, ID}; _ -> {Conn, make_ref()} end, - ?debug("Saving key ~p, PubKey = ~p~n", [KeyID, PubKey]), + ?debug("Saving key ~p, PubKey = ~p~n", [KeyID, pp_key(PubKey)]), ets:insert(?KEYS, #key{id = KeyID, key = PubKey}) end. @@ -517,3 +523,66 @@ validate_message_1([{_,K}|T], JWT) -> end; validate_message_1([], _) -> error(invalid). + + +pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) -> + P = integer_to_binary(Pub), + M = integer_to_binary(Mod), + <<"#{'RSAPrivateKey'{modulus = ", (abbrev_bin(M))/binary, + ", publicExponent = ", P/binary, ", _ = ...}">>; +pp_key(#'RSAPublicKey'{modulus = Mod, publicExponent = Pub}) -> + P = integer_to_binary(Pub), + M = integer_to_binary(Mod), + <<"#{'RSAPublicKey'{modulus = ", (abbrev_bin(M))/binary, + ", publicExponent = ", P/binary, ", _ = ...}">>. + +abbrev_bin(B) -> + abbrev_bin(B, 20, 6). + +abbrev_bin(B, Len, Part) -> + try case byte_size(B) of + Sz when Sz > Len -> + Part1 = erlang:binary_part(B, {0,Part}), + Part2 = erlang:binary_part(B, {Sz,-Part}), + <<Part1/binary, "...", Part2/binary>>; + _ -> + B + end + catch error:_ -> B end. + +abbrev_payload(Payload) -> + try lists:map(fun abbrev_pl/1, Payload) + catch error:_ -> Payload end. + +abbrev_jwt({Hdr, Body} = X) -> + try {Hdr, abbrev_payload(Body)} + catch error:_ -> X end. + +abbrev_pl({<<"keys">> = K, Ks}) -> + {K, [abbrev_k(Ky) || Ky <- Ks]}; +abbrev_pl({<<"certs">> = C, Cs}) -> + {C, [abbrev_bin(Cert) || Cert <- Cs]}; +abbrev_pl({<<"certificate">> = K, C}) -> + {K, abbrev_bin(C)}; +abbrev_pl({<<"sign">> = K, S}) when is_binary(S) -> + {K, abbrev_bin(S)}; +abbrev_pl(B) when is_binary(B) -> + abbrev_bin(B, 40, 10); +abbrev_pl(L) when is_list(L) -> + abbrev_payload(L); +abbrev_pl(X) -> + X. + +abbrev_k(K) -> + try lists:map(fun abbrev_elem/1, K) + catch error:_ -> K end. + +abbrev_elem({<<"n">>, Bin}) -> + {<<"n">>, authorize_keys:abbrev_bin(Bin)}; +abbrev_elem(X) -> + X. + +log([ID], Fmt, Args) -> + rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args)); +log(_, _, _) -> + ok. diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index 9dc82a8..54a9657 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -22,7 +22,9 @@ get_certificates/1, sign_message/2, validate_message/3, + validate_authorization/3, validate_authorization/4, + store_certs/3, authorize_local_message/3, authorize_remote_message/3]). -export([filter_by_service/3]). @@ -49,8 +51,9 @@ start_link() -> init([]) -> ?debug("authorize_rpc:init(): called."), - {Priv, Pub} = KeyPair = authorize_keys:get_key_pair(), - ?debug("KeyPair = ~p~n", [KeyPair]), + {Priv, Pub} = authorize_keys:get_key_pair(), + ?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv), + authorize_keys:pp_key(Pub)]), {ok, #st { cs = rvi_common:get_component_specification(), private_key = Priv, public_key = Pub} }. @@ -81,6 +84,14 @@ get_certificates(CompSpec) -> rvi_common:request(authorize, ?MODULE, get_certificates, [], [status, certs], CompSpec). +validate_authorization(CompSpec, JWT, Conn) -> + ?debug("authorize_rpc:validate_authorization():" + " Conn = ~p~n", [Conn]), + rvi_common:request(authorize, ?MODULE, validate_authorization, + [{jwt, JWT}, + {conn, Conn}], + [status], CompSpec). + validate_authorization(CompSpec, JWT, Certs, Conn) -> ?debug("authorize_rpc:validate_authorization():" " Conn = ~p~n", [Conn]), @@ -90,6 +101,12 @@ validate_authorization(CompSpec, JWT, Certs, Conn) -> {conn, Conn}], [status], CompSpec). +store_certs(CompSpec, Certs, Conn) -> + rvi_common:request(authorize, ?MODULE, store_certs, + [{certs, Certs}, + {conn, Conn}], + [status], CompSpec). + authorize_local_message(CompSpec, Service, Params) -> ?debug("authorize_rpc:authorize_local_msg(): params: ~p ~n", [Params]), rvi_common:request(authorize, ?MODULE, authorize_local_message, @@ -131,8 +148,9 @@ private_key() -> %% CAlled by local exo http server handle_rpc("sign_message", Args) -> {ok, Message} = rvi_common:get_json_element(["message"], Args), + LogId = rvi_common:get_json_log_id(Args), [ Status, JWT ] = - gen_server:call(?SERVER, { rvi, sign_message, [Message] }), + gen_server:call(?SERVER, { rvi, sign_message, [Message, LogId] }), ?debug("Message signature = ~p~n", [JWT]), {ok, [ {status, rvi_common:json_rpc_status(Status)}, {jwt, JWT} ]}; @@ -140,32 +158,47 @@ handle_rpc("validate_message", Args) -> ?debug("validate_message; Args = ~p~n", [Args]), {ok, JWT} = rvi_common:get_json_element(["jwt"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), + LogId = rvi_common:get_json_log_id(Args), [ Status, Msg ] = - gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn] }), + gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }), {ok, [ {status, rvi_common:json_rpc_status(Status)}, {message, Msg} ]}; -handle_rpc("get_authorize_jwt", []) -> +handle_rpc("get_authorize_jwt", Args) -> + LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = - gen_server:call(?SERVER, { rvi, get_authorize_jwt, [] }), + gen_server:call(?SERVER, { rvi, get_authorize_jwt, [LogId] }), {ok, [ rvi_common:json_rpc_status(Status) | Rem ] }; -handle_rpc("get_certificates", []) -> +handle_rpc("get_certificates", Args) -> + LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = - gen_server:call(?SERVER, { rvi, get_certificates, [] }), + gen_server:call(?SERVER, { rvi, get_certificates, [LogId] }), {ok, [ rvi_common:json_rpc_status(Status) | Rem ] }; handle_rpc("validate_authorization", Args) -> {ok, JWT} = rvi_common:get_json_element(["jwt"], Args), - {ok, Certs} = rvi_common:get_json_element(["certs"], Args), {ok, Conn} = rvi_common:get_json_element(["connection"], Args), + LogId = rvi_common:get_json_log_id(Args), + CmdArgs = + case rvi_common:get_json_element(["certs"], Args) of + {ok, Certs} -> [JWT, Certs, Conn, LogId]; + {error, _} -> [JWT, Conn, LogId] + end, [ Status | Rem ] = - gen_server:call(?SERVER, { rvi, validate_authorization, - [JWT, Certs, Conn] }), + gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}), {ok, [ rvi_common:json_rpc_status(Status) | Rem] }; +handle_rpc("store_certs", Args) -> + {ok, Certs} = rvi_common:get_json_element(["certs"], Args), + {ok, Conn} = rvi_common:get_json_element(["conn"], Args), + LogId = rvi_common:get_json_log_id(Args), + [ Status | Rem ] = + gen_server:call(?SERVER, {rvi, store_certs, [Certs, Conn, LogId]}), + {ok, [ rvi_common:json_rpc_status(Status) | Rem]}; handle_rpc("authorize_local_message", Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), {ok, Params} = rvi_common:get_json_element(["parameters"], Args), + LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = gen_server:call(?SERVER, { rvi, authorize_local_message, - [Service, Params]}), + [Service, Params, LogId]}), { ok, [ rvi_common:json_rpc_status(Status) | Rem] }; @@ -173,17 +206,19 @@ handle_rpc("authorize_local_message", Args) -> handle_rpc("authorize_remote_message", Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), {ok, Params} = rvi_common:get_json_element(["parameters"], Args), + LogId = rvi_common:get_json_log_id(Args), [ Status ] = gen_server:call(?SERVER, { rvi, authorize_remote_message, - [Service, Params]}), + [Service, Params, LogId]}), { ok, rvi_common:json_rpc_status(Status)}; handle_rpc("filter_by_service", Args) -> ?debug("authorize_rpc:handle_rpc(\"filter_by_service\", ~p)~n", [Args]), {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), + LogId = rvi_common:get_json_log_id(Args), [ Status, FilteredServices ] = gen_server:call(?SERVER, { rvi, filter_by_service, - [Services, Conn] }), + [Services, Conn, LogId] }), {ok, [{status, rvi_common:json_rpc_status(Status)}, {services, FilteredServices}]}; @@ -199,31 +234,40 @@ handle_notification(Other, _Args) -> %% %% Genserver implementation %% -handle_call({rvi, sign_message, [Msg]}, _, #st{private_key = Key} = State) -> - {reply, [ ok, authorize_sig:encode_jwt(Msg, Key) ], State}; -handle_call({rvi, validate_message, [JWT, Conn]}, _, State) -> - try {reply, [ok, authorize_keys:validate_message(JWT, Conn)], State} +handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) -> + Sign = authorize_sig:encode_jwt(Msg, Key), + log(LogId, "signed", []), + {reply, [ ok, Sign ], State}; +handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) -> + try begin Res = authorize_keys:validate_message(JWT, Conn), + log(LogId, "validated", []), + {reply, [ok, Res], State} + end catch error:_Err -> - {reply, [not_found], State} + log(LogId, "validation FAILED", []), + {reply, [not_found], State} end; -handle_call({rvi, get_authorize_jwt, []}, _From, State) -> +handle_call({rvi, get_authorize_jwt, [_LogId]}, _From, State) -> {reply, [ ok, authorize_keys:authorize_jwt() ], State}; -handle_call({rvi, get_certificates, []}, _From, State) -> +handle_call({rvi, get_certificates, [_LogId]}, _From, State) -> {reply, [ ok, authorize_keys:get_certificates() ], State}; -handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) -> +handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) -> %% The authorize JWT contains the public key used to sign the cert ?debug( "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", []), try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of {_Header, Keys} -> - store_certs(Certs, Keys, JWT, Conn), + log(LogId, "auth jwt validated", []), + KeyStructs = get_json_element(["keys"], Keys, []), + authorize_keys:save_keys(KeyStructs, Conn), {reply, [ok], State}; invalid -> ?warning("Invalid auth JWT from ~p~n", [Conn]), + log(LogId, "auth jwt INVALID", []), {reply, [not_found], State} catch error:_Err -> @@ -231,20 +275,49 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) -> {reply, [not_found], State} end; -handle_call({rvi, authorize_local_message, [Service, Params] } = R, _From, +handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _From, State) -> + %% The authorize JWT contains the public key used to sign the cert + ?debug( + "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", + []), + try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of + {_Header, Keys} -> + log(LogId, "auth jwt validated", []), + KeyStructs = get_json_element(["keys"], Keys, []), + ?debug("KeyStructs = ~p~n", [KeyStructs]), + authorize_keys:save_keys(KeyStructs, Conn), + do_store_certs(Certs, Conn, LogId), + {reply, [ok], State}; + invalid -> + ?warning("Invalid auth JWT from ~p~n", [Conn]), + log(LogId, "auth jwt INVALID", []), + {reply, [not_found], State} + catch + error:_Err -> + ?warning("Auth validation exception: ~p~n", [_Err]), + {reply, [not_found], State} + end; + +handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) -> + do_store_certs(Certs, Conn, LogId), + {reply, [ok], State}; +handle_call({rvi, authorize_local_message, [Service, Params | LogId] } = R, _From, #st{private_key = Key} = State) -> ?debug("authorize_rpc:handle_call(~p)~n", [R]), case authorize_keys:find_cert_by_service(Service) of - {ok, Cert} -> + {ok, {ID, Cert}} -> Msg = Params ++ [{<<"certificate">>, Cert}], - ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", [Msg]), + ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", + [authorize_keys:abbrev_payload(Msg)]), Sig = authorize_sig:encode_jwt(Msg, Key), + log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]), {reply, [ok, Sig], State}; _ -> + log(LogId, "NO CERTS for ~s", [Service]), {reply, [ not_found ], State} end; -handle_call({rvi, authorize_remote_message, [_Service, Params]}, +handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]}, _From, State) -> IP = proplists:get_value(remote_ip, Params), Port = proplists:get_value(remote_port, Params), @@ -261,27 +334,22 @@ handle_call({rvi, authorize_remote_message, [_Service, Params]}, case authorize_keys:validate_message( iolist_to_binary(Signature), {IP, Port}) of invalid -> + log(LogId, "signature INVALID", []), {reply, [ not_found ], State}; Msg -> - {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg), - {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg), - {ok, Params1} = rvi_common:get_json_element(["parameters"], Msg), - ?debug("authorize_rpc:authorize_remote_message(): timeout1: ~p~n", [Timeout1]), - ?debug("authorize_rpc:authorize_remote_message(): service_name1: ~p~n", [SvcName1]), - ?debug("authorize_rpc:authorize_remote_message(): parameters1: ~p~n", [Params1]), - - if Timeout =:= Timeout1, - SvcName =:= SvcName1, - Parameters =:= Params1 -> - ?debug("Remote message authorized.~n", []), - {reply, [ ok ], State}; - true -> - ?debug("Remote message NOT authorized.~n", []), - {reply, [ not_found ], State} + case check_msg([{"timeout", Timeout}, + {"service_name", SvcName}, + {"parameters", Parameters}], Msg) of + ok -> + log(LogId, "params verified", []), + {reply, [ok], State}; + {error, {mismatch, Bad}} -> + log(LogId, "params MISMATCH: ~p", [Bad]), + {reply, [not_found], State} end end; -handle_call({rvi, filter_by_service, [Services, Conn]}, _From, State) -> +handle_call({rvi, filter_by_service, [Services, Conn | _LogId]}, _From, State) -> Filtered = authorize_keys:filter_by_service(Services, Conn), {reply, [ok, Filtered], State}; @@ -313,13 +381,10 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -store_certs(Certs, Keys, JWT, Conn) -> +do_store_certs(Certs, Conn, LogId) -> ?debug("Storing ~p certs for conn ~p~n", [length(Certs), Conn]), - KeyStructs = get_json_element(["keys"], Keys, []), - authorize_keys:save_keys(KeyStructs, Conn), - ?debug("KeyStructs = ~p~n", [KeyStructs]), lists:foreach(fun(Cert) -> - store_cert(Cert, KeyStructs, JWT, Conn) + store_cert(Cert, Conn, LogId) end, Certs). get_json_element(Path, JSON, Default) -> @@ -330,11 +395,10 @@ get_json_element(Path, JSON, Default) -> Default end. -store_cert(Cert, Keys, JWT, Conn) -> +store_cert(Cert, Conn, LogId) -> case authorize_sig:decode_jwt(Cert, authorize_keys:provisioning_key()) of {_CHeader, CertStruct} -> - authorize_keys:save_keys(Keys, Conn), - case authorize_keys:save_cert(CertStruct, JWT, Conn) of + case authorize_keys:save_cert(CertStruct, Cert, Conn, LogId) of ok -> ok; {error, Reason} -> @@ -347,3 +411,41 @@ store_cert(Cert, Keys, JWT, Conn) -> ?warning("Invalid certificate from ~p~n", [Conn]), ok end. + +log([ID], Fmt, Args) -> + rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args)); +log(_, _, _) -> + ok. + +check_msg(Checks, Params) -> + check_msg(Checks, Params, []). + + %% {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg), + %% {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg), + %% {ok, Params1} = rvi_common:get_json_element(["parameters"], Msg), + %% ?debug("authorize_rpc:authorize_remote_message(): timeout1: ~p~n", [Timeout1]), + %% ?debug("authorize_rpc:authorize_remote_message(): service_name1: ~p~n", [SvcName1]), + %% ?debug("authorize_rpc:authorize_remote_message(): parameters1: ~p~n", [Params1]), + + %% if Timeout =:= Timeout1 * 1000, + %% SvcName =:= SvcName1, + %% Parameters =:= Params1 -> + %% ?debug("Remote message authorized.~n", []), + %% {reply, [ ok ], State}; + %% true -> + %% ?debug("Remote message NOT authorized.~n", []), + %% {reply, [ not_found ], State} + %% end + %% end; + +check_msg([], _, []) -> + ok; +check_msg([{Key, Expect}|T], Msg, Acc) -> + case rvi_common:get_json_element([Key], Msg) of + {ok, Expect} -> + check_msg(T, Msg, Acc); + _ -> + check_msg(T, Msg, [Key|Acc]) + end; +check_msg([], _, [_|_] = Acc) -> + {error, {mismatch, lists:reverse(Acc)}}. diff --git a/components/authorize/src/authorize_sig.erl b/components/authorize/src/authorize_sig.erl index 9868218..c69bbd0 100644 --- a/components/authorize/src/authorize_sig.erl +++ b/components/authorize/src/authorize_sig.erl @@ -12,11 +12,11 @@ decode_jwt(JWT, PubKey) when is_list(JWT)-> decode_jwt(list_to_binary(JWT), PubKey); decode_jwt(JWT, PubKey) when is_binary(JWT)-> - ?debug("authorize_sig:decode_jwt(JWT, PubKey=~p)~n", [PubKey]), + ?debug("authorize_sig:decode_jwt(JWT, PubKey=~s)~n", [authorize_keys:pp_key(PubKey)]), [H, P, S] = binary:split(JWT, <<".">>, [global]), Header = decode_json(base64url:decode(H)), Payload = decode_json(base64url:decode(P)), - ?debug("JWT Header = ~p~nPayload: ~p~n", [Header, Payload]), + ?debug("JWT Header = ~p~nPayload: ~p~n", [Header, authorize_keys:abbrev_payload(Payload)]), Signature = base64url:decode(S), SigningInput = <<H/binary, ".", P/binary>>, Res = case public_key:verify( @@ -27,15 +27,14 @@ decode_jwt(JWT, PubKey) when is_binary(JWT)-> true -> {Header, Payload} end, - ?debug("decoded JWT = ~p~n", [Res]), + ?debug("decoded JWT = ~p~n", [authorize_keys:abbrev_jwt(Res)]), Res. encode_jwt(JSON, PrivKey) -> encode_jwt(JSON, header(), PrivKey). encode_jwt(Payload0, Header0, PrivKey) -> - ?debug("encode_jwt(~p,~p,_)~n", [catch ensure_json(Payload0), - catch ensure_json(Header0)]), + ?debug("encode_jwt()", []), Header = base64url:encode(ensure_json(Header0)), Payload = base64url:encode(ensure_json(Payload0)), SigningInput = <<Header/binary, ".", Payload/binary>>, @@ -56,7 +55,7 @@ ensure_json([_|_] = JSON) -> %% Since there may be atoms {ok, Normalized} = msgpack:unpack(msgpack:pack(JSON, [jsx, {allow_atom,pack}]), [jsx]), - ?debug("Normalized = ~p~n", [Normalized]), + ?debug("Normalized = ~p~n", [authorize_keys:abbrev_payload(Normalized)]), jsx:encode(Normalized). decode_json(JSON) when is_list(JSON) -> diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl index 69576b6..9510a53 100644 --- a/components/dlink/src/dlink_data_msgpack.erl +++ b/components/dlink/src/dlink_data_msgpack.erl @@ -10,7 +10,7 @@ port_options() -> [binary, {packet, 0}]. -init(_Opts) -> +init(_CS) -> #st{}. decode(Msg0, #st{buf = Prev, opts = Opts} = St) -> diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl index a5fc3c5..85a8663 100644 --- a/components/dlink_bt/src/bt_connection.erl +++ b/components/dlink_bt/src/bt_connection.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -25,8 +25,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([connect/5]). --export([accept/5]). +-export([connect/6]). +-export([accept/6]). -export([send/2]). -export([send/3]). -export([is_connection_up/1]). @@ -35,13 +35,14 @@ -export([terminate_connection/2]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -record(st, { remote_addr = "00:00:00:00:00:00", channel = 0, rfcomm_ref = undefined, listen_ref = undefined, + mode = bt, mod = undefined, func = undefined, args = undefined @@ -52,29 +53,30 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -connect(Addr, Channel, Mod, Fun, Arg) -> - gen_server:start_link(?MODULE, - {connect, Addr, Channel, Mod, Fun, Arg }, +connect(Addr, Channel, Mode, Mod, Fun, Arg) -> + gen_server:start_link(?MODULE, + {connect, Addr, Channel, Mode, Mod, Fun, Arg }, []). -accept(Channel, ListenRef, Mod, Fun, Arg) -> - gen_server:start_link(?MODULE, {accept, - Channel, - ListenRef, +accept(Channel, ListenRef, Mode, Mod, Fun, Arg) -> + gen_server:start_link(?MODULE, {accept, + Channel, + ListenRef, + Mode, Mod, - Fun, + Fun, Arg},[]). send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). - + send(Addr, Channel, Data) -> case bt_connection_manager:find_connection_by_address(Addr, Channel) of {ok, Pid} -> gen_server:cast(Pid, {send, Data}); - _Err -> - ?info("connection:send(): Connection ~p:~p not found for data: ~p", + _Err -> + ?info("connection:send(): Connection ~p:~p not found for data: ~p", [ Addr, Channel, Data]), not_found @@ -82,7 +84,7 @@ send(Addr, Channel, Data) -> terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). - + terminate_connection(Addr, Channel) -> case bt_connection_manager:find_connection_by_address(Addr, Channel) of {ok, Pid} -> @@ -100,10 +102,10 @@ is_connection_up(Addr, Channel) -> {ok, Pid} -> is_connection_up(Pid); - _Err -> + _Err -> false end. - + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -122,8 +124,8 @@ is_connection_up(Addr, Channel) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({connect, BTAddr, Channel, Mod, Fun, Arg}) -> - +init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) -> + %% connect will block on rfcomm:open, so cast to self %% in order to let init return. gen_server:cast(self(), connect), @@ -131,6 +133,7 @@ init({connect, BTAddr, Channel, Mod, Fun, Arg}) -> remote_addr = BTAddr, channel = Channel, rfcomm_ref = undefined, + mode = Mode, mod = Mod, func = Fun, args = Arg @@ -138,8 +141,15 @@ init({connect, BTAddr, Channel, Mod, Fun, Arg}) -> -init({accept, Channel, ListenRef, Mod, Fun, Arg}) -> - { ok, ARef } = rfcomm:accept(ListenRef, infinity, self()), +init({accept, Channel, ListenRef, Mode, Mod, Fun, Arg}) -> + ARef = case Mode of + tcp -> + {ok, R} = exo_socket:async_accept(ListenRef), + R; + bt -> + {ok, R} = rfcomm:accept(ListenRef, infinity, self()), + R + end, ?debug("bt_connection:init(accept): self(): ~p", [self()]), ?debug("bt_connection:init(accept): Channel: ~p", [Channel]), ?debug("bt_connection:init(accept): ListenRef: ~p", [ListenRef]), @@ -152,6 +162,7 @@ init({accept, Channel, ListenRef, Mod, Fun, Arg}) -> channel = Channel, rfcomm_ref = ARef, listen_ref = ListenRef, + mode = Mode, mod = Mod, func = Fun, args = Arg @@ -173,7 +184,7 @@ init({accept, Channel, ListenRef, Mod, Fun, Arg}) -> %% @end %%-------------------------------------------------------------------- handle_call(terminate_connection, _From, St) -> - ?debug("~p:handle_call(terminate_connection): Terminating: ~p", + ?debug("~p:handle_call(terminate_connection): Terminating: ~p", [ ?MODULE, {St#st.remote_addr, St#st.channel}]), {stop, Reason, NSt} = handle_info({tcp_closed, St#st.rfcomm_ref}, St), @@ -197,13 +208,20 @@ handle_call(_Request, _From, State) -> handle_cast(connect, #st { remote_addr = BTAddr, channel = Channel, + mode = Mode, mod = Mod, func = Fun, args = Arg } = St) -> %% Looong call that blocks for ever. - case rfcomm:open(BTAddr, Channel) of + ConnRes = case Mode of + bt -> + rfcomm:open(BTAddr, Channel); + tcp -> + exo_socket:connect(BTAddr, Channel) + end, + case ConnRes of {ok, ConnRef} -> ?debug("bt_connection:init(connect): self(): ~p", [self()]), ?debug("bt_connection:init(connect): BTAddr: ~p", [BTAddr]), @@ -229,7 +247,7 @@ handle_cast(connect, #st { end; handle_cast({send, Data}, St) -> - ?debug("~p:handle_call(send): Sending: ~p", + ?debug("~p:handle_call(send): Sending: ~p", [ ?MODULE, Data]), rfcomm:send(St#st.rfcomm_ref, Data), @@ -253,21 +271,21 @@ handle_cast(_Msg, State) -> %% An accept reference we've setup now has accetpted an %% incoming connection. -handle_info({rfcomm, _ARef, { accept, BTAddr, _ } }, +handle_info({rfcomm, _ARef, { accept, BTAddr, _ } }, #st { mod = Mod, func = Fun, args = Arg, channel = Channel } = St) -> - ?info("~p:handle_info(): bt_connection from ~w:~w\n", + ?info("~p:handle_info(): bt_connection from ~w:~w\n", [?MODULE, BTAddr,Channel]), - + Mod:Fun(self(), BTAddr, Channel, accepted, Arg), - { noreply, St#st { remote_addr = BTAddr, + { noreply, St#st { remote_addr = BTAddr, channel = Channel } }; -handle_info({rfcomm, _ConnRef, {data, Data}}, +handle_info({rfcomm, _ConnRef, {data, Data}}, #st { remote_addr = BTAddr, channel = Channel, mod = Mod, @@ -275,16 +293,16 @@ handle_info({rfcomm, _ConnRef, {data, Data}}, args = Arg } = State) -> ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), ?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]), - ?info("~p:handle_info(data): ~p:~p -> ~p:~p", + ?info("~p:handle_info(data): ~p:~p -> ~p:~p", [ ?MODULE, BTAddr, Channel, Mod, Fun]), Self = self(), - spawn(fun() -> Mod:Fun(Self, BTAddr, Channel, + spawn(fun() -> Mod:Fun(Self, BTAddr, Channel, data, Data, Arg) end), {noreply, State}; -handle_info({rfcomm, ConnRef, closed}, +handle_info({rfcomm, ConnRef, closed}, #st { remote_addr = BTAddr, channel = Channel, listen_ref = ListenRef, @@ -295,20 +313,20 @@ handle_info({rfcomm, ConnRef, closed}, Mod:Fun(self(), BTAddr, Channel, closed, Arg), bt_connection_manager:delete_connection_by_pid(self()), rfcomm:close(ConnRef), - + %% Fire up a new accept process to take care of the next incomign connectionX - gen_server:start_link(?MODULE, {accept, - Channel, - ListenRef, + gen_server:start_link(?MODULE, {accept, + Channel, + ListenRef, Mod, - Fun, + Fun, Arg},[]), %% Stop this process. {stop, normal, State}; -handle_info({rfcomm, ConnRef, error}, +handle_info({rfcomm, ConnRef, error}, #st { remote_addr = BTAddr, channel = Channel, mod = Mod, @@ -321,6 +339,13 @@ handle_info({rfcomm, ConnRef, error}, bt_connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; +handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod, + func = Fun, + args = Arg} = St) -> + ?debug("~p:handle_info(~p)", [?MODULE, Msg]), + {ok, {BTAddr, Channel}} = inet:peername(Sock), + Mod:Fun(self(), BTAddr, Channel, accepted, Arg), + {noreply, St}; handle_info(_Info, State) -> ?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]), diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl index b0c08ab..e6129de 100644 --- a/components/dlink_bt/src/bt_listener.erl +++ b/components/dlink_bt/src/bt_listener.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -12,7 +12,7 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --export([start_link/0, +-export([start_link/1, add_listener/1, remove_listener/1]). @@ -25,8 +25,8 @@ }). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(Mode) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, Mode, []). add_listener(Channel) -> gen_server:call(?MODULE, {add_listener, Channel}). @@ -34,35 +34,37 @@ add_listener(Channel) -> remove_listener(Channel) -> gen_server:call(?MODULE, {remove_listener, Channel}). -init([]) -> +init(Mode) -> - {ok, #st { - listeners = [], + {ok, #st { + listeners = [], acceptors = [], - cs = rvi_common:get_component_specification() + cs = rvi_common:set_value(bt_mode, Mode, rvi_common:get_component_specification()) } }. -handle_call({add_listener, Channel}, _From, St) -> +handle_call({add_listener, Channel}, _From, #st{cs = CS} = St) -> ?info("bt_listener:add_listener(): Setting up listener on channel ~p", [ Channel]), - case rfcomm:listen(Channel) of + Mode = rvi_common:get_value(bt_mode, bt, CS), + case listen(Mode, Channel) of {ok, ListenRef} -> ?info("bt_listener:add_listener(): ListenRef: ~p", [ ListenRef]), - {ok, ConnPid} = bt_connection:accept(Channel, - ListenRef, - dlink_bt_rpc, - handle_socket, - St#st.cs), + {ok, ConnPid} = bt_connection:accept(Channel, + ListenRef, + Mode, + dlink_bt_rpc, + handle_socket, + CS), + - %%{ noreply, NSt} = handle_info({accept, ListenRef, Channel, ok}, St), - { reply, - ok, - St#st { + { reply, + ok, + St#st { acceptors = [ { Channel, ConnPid } | St#st.acceptors ], listeners = [ { ListenRef, Channel } | St#st.listeners ] } @@ -89,25 +91,28 @@ handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) -> %% future incoming connection. ?info("bt_listener:accept(): ListenRef: ~p", [ ListenRef]), ?info("bt_listener:accept(): Remote: ~p-~p", [BTAddr, Channel ]), - + %% Must fix multiple acceptors in bt_linux_drv.c - %% {ok, ConnPid} = bt_connection:accept(Channel, - %% ListenRef, - %% dlink_bt_rpc, - %% handle_socket, + %% {ok, ConnPid} = bt_connection:accept(Channel, + %% ListenRef, + %% dlink_bt_rpc, + %% handle_socket, %% []), - + %% {noreply, St#st {acceptors = [ { Channel, ConnPid } | St#st.acceptors ]}}; {noreply, St }; handle_info(_Msg, State) -> ?info("bt_listener:handle_info(): Unknown: ~p", [ _Msg]), - + {noreply, State}. terminate(_Reason, _State) -> ok. - +listen(bt, Channel) -> + rfcomm:listen(Channel); +listen(tcp, Port) -> + gen_tcp:listen(Port). diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index c36c997..168d13e 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -40,7 +40,7 @@ -define(DEFAULT_BT_CHANNEL, 1). -define(DEFAULT_RECONNECT_INTERVAL, 1000). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -define(CONNECTION_TABLE, rvi_dlink_bt_connections). -define(SERVICE_TABLE, rvi_dlink_bt_services). @@ -58,7 +58,7 @@ services = [] %% List of service names available through this connection }). --record(st, { +-record(st, { cs = #component_spec{} }). @@ -71,30 +71,30 @@ tohex(V) when V < 16 -> tohex(V) -> integer_to_list(V, 16). - + bt_address_to_string({A1, A2, A3, A4, A5, A6}) -> tohex(A1) ++ ":" ++ tohex(A2) ++ ":" ++ tohex(A3) ++ ":" ++ - tohex(A4) ++ ":" ++ + tohex(A4) ++ ":" ++ tohex(A5) ++ ":" ++ tohex(A6). - + init([]) -> ?info("dlink_bt:init(): Called"), %% Dig out the bert rpc server setup - ets:new(?SERVICE_TABLE, [ set, public, named_table, + ets:new(?SERVICE_TABLE, [ set, public, named_table, { keypos, #service_entry.service }]), - ets:new(?CONNECTION_TABLE, [ set, public, named_table, + ets:new(?CONNECTION_TABLE, [ set, public, named_table, { keypos, #connection_entry.connection }]), CS = rvi_common:get_component_specification(), service_discovery_rpc:subscribe(CS, ?MODULE), - {ok, #st { + {ok, #st { cs = CS } }. @@ -105,44 +105,59 @@ start_json_server() -> start_connection_manager() -> CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - server_opts, - [], + {ok, BertOpts } = rvi_common:get_module_config(data_link, + ?MODULE, + server_opts, + [], CompSpec), %% Retrieve the channel we should use Channel = proplists:get_value(channel, BertOpts, ?DEFAULT_BT_CHANNEL), - + ?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener - - bt:start(), - bt:debug(debug), - bt_listener:start_link(), - bt_connection_manager:start_link(), + + Mode = get_mode(BertOpts), + case Mode of + bt -> + bt:start(), + bt:debug(debug); + tcp -> + ok + end, + bt_listener:start_link(Mode), + bt_connection_manager:start_link(Mode), ?info("dlink_bt:start_connection_manager(): Adding listener on bluetooth channel ~p", [Channel ]), - + %% Add listener channel. case bt_listener:add_listener(Channel) of ok -> ok; - Err -> + Err -> ?error("dlink_bt:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), ok end, - {ok, PersistentConnections } = rvi_common:get_module_config(data_link, - ?MODULE, - ?PERSISTENT_CONNECTIONS, - [], + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + ?PERSISTENT_CONNECTIONS, + [], CompSpec), setup_persistent_connections_(PersistentConnections, CompSpec), ok. +get_mode(BertOpts) -> + case proplists:get_value(test_mode, BertOpts) of + TM when TM==undefined; TM==bt -> + bt; + tcp -> + tcp + end. + + setup_persistent_connections_([ ], _CompSpec) -> ok; @@ -150,21 +165,21 @@ setup_persistent_connections_([ ], _CompSpec) -> setup_persistent_connections_([ BTAddress | T], CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]), [ BTAddr, Channel] = string:tokens(BTAddress, "-"), - connect_and_retry_remote(BTAddr, Channel, CompSpec), + connect_and_retry_remote(BTAddr, Channel, CompSpec), setup_persistent_connections_(T, CompSpec), ok. service_available(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(data_link, ?MODULE, - service_available, + rvi_common:notification(data_link, ?MODULE, + service_available, [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). service_unavailable(CompSpec, SvcName, DataLinkModule) -> - rvi_common:notification(data_link, ?MODULE, - service_unavailable, + rvi_common:notification(data_link, ?MODULE, + service_unavailable, [{ service, SvcName }, { data_link_module, DataLinkModule }], CompSpec). @@ -184,11 +199,11 @@ disconnect_data_link(CompSpec, NetworkAddress) -> send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> rvi_common:request(data_link, ?MODULE, send_data, - [ { proto_mod, ProtoMod }, - { service, Service }, + [ { proto_mod, ProtoMod }, + { service, Service }, { data, Data }, { opts, DataLinkOpts } - ], + ], [status], CompSpec). @@ -209,33 +224,33 @@ connect_remote(BTAddr, Channel, CompSpec) -> %%FIXME %% Setup a genserver around the new connection. - case bt_connection:connect(BTAddr, Channel, + case bt_connection:connect(BTAddr, Channel, ?MODULE, handle_socket, CompSpec ) of - { ok, Pid } -> - ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p", + { ok, Pid } -> + ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p", [BTAddr, Channel, Pid]), ok; - - {error, Err } -> + + {error, Err } -> ?info("dlink_bt:connect_remote(): Failed ~p:~p: ~p", [BTAddr, Channel, Err]), not_available end end. - + connect_and_retry_remote( BTAddr, Channel, CompSpec) -> - ?info("dlink_bt:connect_and_retry_remote(): ~p:~p", + ?info("dlink_bt:connect_and_retry_remote(): ~p:~p", [ BTAddr, Channel]), case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of ok -> ok; Err -> %% Failed to connect. Sleep and try again - ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p", + ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p", [BTAddr, Channel, Err]), - ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec", + ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec", [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]), setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec), @@ -248,14 +263,14 @@ connect_and_retry_remote( BTAddr, Channel, CompSpec) -> announce_local_service_(_CompSpec, [], _Service, _Availability) -> ok; -announce_local_service_(CompSpec, +announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> [ ok, JWT ] = authorize_rpc:sign_message( CompSpec, availability_msg(Availability, [Service])), - Res = bt_connection:send(ConnPid, + Res = bt_connection:send(ConnPid, term_to_json( - { struct, + { struct, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, { ?DLINK_ARG_TRANSACTION_ID, 3}, @@ -263,19 +278,19 @@ announce_local_service_(CompSpec, ] })), - ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p", + ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), %% Move on to next connection. - announce_local_service_(CompSpec, + announce_local_service_(CompSpec, T, Service, Availability). announce_local_service_(CompSpec, Service, Availability) -> - ?debug("dlink_bt:announce_local_service(~p, ~p)", + ?debug("dlink_bt:announce_local_service(~p, ~p)", [ Service, Availability]), - announce_local_service_(CompSpec, + announce_local_service_(CompSpec, get_connections(), Service, Availability). @@ -294,7 +309,7 @@ availability_msg(Availability, Services) -> status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. - + process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg), @@ -312,13 +327,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> end. process_authorize(FromPid, - PeerBTAddr, + PeerBTAddr, PeerBTChannel, - TransactionID, - RemoteAddress, - RemoteChannel, - Protocol, - Certificates, + TransactionID, + RemoteAddress, + RemoteChannel, + Protocol, + Certificates, Signature, CompSpec) -> @@ -333,7 +348,7 @@ process_authorize(FromPid, %% with the conneciton manager, this is an incoming connection %% from the client. We should respond with our own authorize followed by %% a service announce - + Conn = {RemoteAddress, RemoteChannel}, case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of true -> @@ -343,7 +358,7 @@ process_authorize(FromPid, false end. -handle_socket(FromPid, PeerBTAddr, PeerChannel, data, +handle_socket(FromPid, PeerBTAddr, PeerChannel, data, Payload, CompSpec) -> {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)), @@ -351,34 +366,34 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data, case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, - RemoteChannel, + [ TransactionID, + RemoteAddress, + RemoteChannel, RVIProtocol, CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, - ?DLINK_ARG_PORT, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + Signature ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_ADDRESS, + ?DLINK_ARG_PORT, + ?DLINK_ARG_VERSION, + ?DLINK_ARG_CERTIFICATES, + ?DLINK_ARG_SIGNATURE], Elems, undefined), - - Certificates = - case CertificatesTmp of + + Certificates = + case CertificatesTmp of { array, C} -> C; undefined -> [] end, process_authorize(FromPid, PeerBTAddr, RemoteChannel, - TransactionID, RemoteAddress, RemoteChannel, + TransactionID, RemoteAddress, RemoteChannel, RVIProtocol, Certificates, Signature, CompSpec); - - + + ?DLINK_CMD_SERVICE_ANNOUNCE -> Conn = {PeerBTAddr, PeerChannel}, - [ TransactionID, Signature ] = + [ TransactionID, Signature ] = opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE], Elems, undefined), case authorize_rpc:validate_message(CompSpec, Signature, Conn) of @@ -390,11 +405,11 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data, end; ?DLINK_CMD_RECEIVE -> - [ _TransactionID, - ProtocolMod, - Data ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_MODULE, + [ _TransactionID, + ProtocolMod, + Data ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_MODULE, ?DLINK_ARG_DATA], Elems, undefined), process_data(FromPid, PeerBTAddr, PeerChannel, @@ -404,7 +419,7 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data, ?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]), ok; - undefined -> + undefined -> ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]), ok end. @@ -428,17 +443,17 @@ handle_socket(FromPid, SetupBTAddr, SetupChannel, closed, CompSpec) -> case get_connections_by_service(SvcName) of [] -> service_discovery_rpc: - unregister_services(CompSpec, - [SvcName], + unregister_services(CompSpec, + [SvcName], ?MODULE); _ -> ok end end, LostSvcNameList), - {ok, PersistentConnections } = rvi_common:get_module_config(data_link, - ?MODULE, - persistent_connections, - [], + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + persistent_connections, + [], CompSpec), %% Check if this is a static node. If so, setup a timer for a reconnect case lists:member(NetworkAddress, PersistentConnections) of @@ -447,7 +462,7 @@ handle_socket(FromPid, SetupBTAddr, SetupChannel, closed, CompSpec) -> ?info("dlink_bt:closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]), [ BTAddr, Channel] = string:tokens(NetworkAddress, "-"), - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec); false -> ok end, @@ -473,7 +488,7 @@ handle_notification("service_available", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - gen_server:cast(?SERVER, { rvi, service_available, + gen_server:cast(?SERVER, { rvi, service_available, [ SvcName, DataLinkModule ]}), @@ -482,7 +497,7 @@ handle_notification("service_unavailable", Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - gen_server:cast(?SERVER, { rvi, service_unavailable, + gen_server:cast(?SERVER, { rvi, service_unavailable, [ SvcName, DataLinkModule ]}), @@ -497,7 +512,7 @@ handle_rpc("setup_data_link", Args) -> { ok, Opts } = rvi_common:get_json_element(["opts"], Args), - [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link, + [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link, [ Service, Opts ] }), {ok, [ {status, rvi_common:json_rpc_status(Res)} , { timeout, Timeout }]}; @@ -512,11 +527,11 @@ handle_rpc("send_data", Args) -> { ok, Service } = rvi_common:get_json_element(["service"], Args), { ok, Data } = rvi_common:get_json_element(["data"], Args), { ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args), - [ Res ] = gen_server:call(?SERVER, - { rvi, send_data, + [ Res ] = gen_server:call(?SERVER, + { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}), {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; - + handle_rpc(Other, _Args) -> ?info("dlink_bt:handle_rpc(~p): unknown", [ Other ]), @@ -541,7 +556,7 @@ handle_cast( {rvi, service_unavailable, [SvcName, local]}, St) -> {noreply, St}; handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) -> - %% We don't care about remote services available through + %% We don't care about remote services available through %% other data link modules {noreply, St}; @@ -561,7 +576,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> [Service]), { reply, [ok, -1 ], St }; - Addr -> + Addr -> [ Address, Channel] = string:tokens(Addr, "-"), case connect_remote(Address, list_to_integer(Channel), St#st.cs) of @@ -569,7 +584,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> { reply, [ok, 2000], St }; %% 2 second timeout already_connected -> %% We are already connected - { reply, [already_connected, -1], St }; + { reply, [already_connected, -1], St }; Err -> { reply, [Err, 0], St } @@ -595,34 +610,34 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S { reply, [ no_route ], St}; %% FIXME: What to do if we have multiple connections to the same service? - [ConnPid | _T] -> + [ConnPid | _T] -> ?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]), - Res = bt_connection:send(ConnPid, + Res = bt_connection:send(ConnPid, term_to_json( - {struct, + {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } ]})), - + { reply, [ Res ], St} end; - + handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) -> %% Create a timer to handle periodic pings. - {ok, ServerOpts } = rvi_common:get_module_config(data_link, + {ok, ServerOpts } = rvi_common:get_module_config(data_link, ?MODULE, - server_opts, [], + server_opts, [], St#st.cs), Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), - ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec", + ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec", [ Address, Channel, Timeout] ), - + erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Channel, Timeout }), {reply, ok, St}; @@ -641,12 +656,12 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) -> true -> ?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]), bt_connection:send(Pid, term_to_json( - { struct, - [ { ?DLINK_ARG_CMD, - ?DLINK_CMD_PING + { struct, + [ { ?DLINK_ARG_CMD, + ?DLINK_CMD_PING }]})), - erlang:send_after(Timeout, self(), + erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Channel, Timeout }); false -> @@ -671,9 +686,9 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> {ok,[{address, Address }]} = bt_drv:local_info([address]), - bt_connection:send(Pid, + bt_connection:send(Pid, term_to_json( - {struct, + {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, @@ -697,7 +712,7 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) %% Send our own servide announcement to the remote server %% that just authorized to us. [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), - + [ ok, FilteredServices ] = authorize_rpc:filter_by_service( CompSpec, LocalServices, Conn), @@ -707,9 +722,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) [ ok, JWT ] = authorize_rpc:sign_message( CompSpec, availability_msg(available, FilteredServices)), - bt_connection:send(FromPid, + bt_connection:send(FromPid, term_to_json( - {struct, + {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, { ?DLINK_ARG_SIGNATURE, JWT } ]})), @@ -719,8 +734,8 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ok. setup_reconnect_timer(MSec, BTAddr, Channel, CompSpec) -> - erlang:send_after(MSec, ?MODULE, - { rvi_setup_persistent_connection, + erlang:send_after(MSec, ?MODULE, + { rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }), ok. @@ -739,20 +754,20 @@ get_connections_by_service(Service) -> Connections; [] -> [] end. - + add_services(SvcNameList, ConnPid) -> %% Create or replace existing connection table entry %% with the sum of new and old services. ?debug("dlink_bt:add_services(~p, ~p)", [ ConnPid, SvcNameList]), - ets:insert(?CONNECTION_TABLE, + ets:insert(?CONNECTION_TABLE, #connection_entry { connection = ConnPid, services = SvcNameList ++ get_services_by_connection(ConnPid) }), %% Add the connection to the service entry for each service. - [ ets:insert(?SERVICE_TABLE, + [ ets:insert(?SERVICE_TABLE, #service_entry { service = SvcName, connections = [ConnPid | get_connections_by_service(SvcName)] @@ -765,15 +780,15 @@ add_services(SvcNameList, ConnPid) -> delete_services(ConnPid, SvcNameList) -> - ets:insert(?CONNECTION_TABLE, + ets:insert(?CONNECTION_TABLE, #connection_entry { connection = ConnPid, services = get_services_by_connection(ConnPid) -- SvcNameList }), - + %% Loop through all services and update the conn table %% Update them with a new version where ConnPid has been removed - [ ets:insert(?SERVICE_TABLE, + [ ets:insert(?SERVICE_TABLE, #service_entry { service = SvcName, connections = get_connections_by_service(SvcName) -- [ConnPid] @@ -785,7 +800,7 @@ delete_connection(Conn) -> %% with the sum of new and old services. SvcNameList = get_services_by_connection(Conn), - %% Replace each existing connection entry that has + %% Replace each existing connection entry that has %% SvcName with a new one where the SvcName is removed. lists:map(fun(SvcName) -> Existing = get_connections_by_service(SvcName), @@ -795,12 +810,12 @@ delete_connection(Conn) -> connections = Existing -- [ Conn ] }) end, SvcNameList), - + %% Delete the connection ets:delete(?CONNECTION_TABLE, Conn), ok. - + get_connections('$end_of_table', Acc) -> Acc; @@ -808,7 +823,7 @@ get_connections('$end_of_table', Acc) -> get_connections(Key, Acc) -> get_connections(ets:next(?CONNECTION_TABLE, Key), [ Key | Acc ]). - + get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 068a0c5..77300d9 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -196,7 +196,7 @@ handle_cast({activate_socket, Sock}, State) -> handle_cast(_Msg, State) -> - ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), + ?warning("handle_cast(): Unknown call: ~p", [_Msg]), {noreply, State}. %%-------------------------------------------------------------------- @@ -225,17 +225,17 @@ handle_info({tcp, Sock, Data}, func = Fun, args = Arg, pst = PST} = State) -> - ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), - ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]), + ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), case jsx_decode_stream(Data, PST) of { [], NPST } -> - ?debug("~p:handle_info(data incomplete)", [ ?MODULE]), + ?debug("handle_info(data incomplete)", []), inet:setopts(Sock, [{active, once}]), {noreply, State#st { pst = NPST} }; { JSONElements, NPST } -> - ?debug("~p:handle_info(data complete): Processed: ~p", [ ?MODULE, JSONElements]), + ?debug("data complete: Processed: ~p", + [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]), FromPid = self(), [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg) || SingleElem <- JSONElements], @@ -251,7 +251,7 @@ handle_info({tcp_closed, Sock}, mod = Mod, func = Fun, args = Arg } = State) -> - ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]), + ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]), Mod:Fun(self(), IP, Port,closed, Arg), gen_tcp:close(Sock), connection_manager:delete_connection_by_pid(self()), @@ -265,14 +265,14 @@ handle_info({tcp_error, _Sock}, func = Fun, args = Arg} = State) -> - ?debug("~p:handle_info(tcp_error): Address: ~p:~p ", [ ?MODULE, IP, Port]), + ?debug("handle_info(tcp_error): Address: ~p:~p ", [IP, Port]), Mod:Fun(self(), IP, Port, error, Arg), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; handle_info(_Info, State) -> - ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), + ?warning("handle_cast(): Unknown info: ~p", [_Info]), {noreply, State}. %%-------------------------------------------------------------------- @@ -287,7 +287,7 @@ handle_info(_Info, State) -> %% @end %%-------------------------------------------------------------------- terminate(_Reason, _State) -> - ?debug("~p:terminate(): Reason: ~p ", [ ?MODULE, _Reason]), + ?debug("terminate(): Reason: ~p ", [_Reason]), ok. %%-------------------------------------------------------------------- diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 2456f3e..b3ec80d 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -92,32 +92,14 @@ start_json_server() -> start_connection_manager() -> - CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - ?SERVER_OPTS, - [], - CompSpec), - IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS), - Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT), - - ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]), - %% Fire up listener + CompSpec = rvi_common:get_component_specification(), connection_manager:start_link(), + ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]), {ok,Pid} = listener:start_link(), - ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), + %% + setup_initial_listeners(Pid, CompSpec), - %% Add listener port. - case listener:add_listener(Pid, IP, Port, CompSpec) of - ok -> - ?notice("---- RVI Node External Address: ~s", - [ application:get_env(rvi_core, node_address, undefined)]); - - Err -> - ?error("dlink_tcp:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), - ok - end, ?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."), {ok, PersistentConnections } = rvi_common:get_module_config(data_link, @@ -126,14 +108,38 @@ start_connection_manager() -> [], CompSpec), - setup_persistent_connections_(PersistentConnections, CompSpec), ok. + +setup_initial_listeners(Pid, CompSpec) -> + case rvi_common:get_module_config(data_link, + ?MODULE, + ?SERVER_OPTS, + CompSpec) of + {ok, ServerOpts} -> + IP = proplists:get_value(ip, ServerOpts, ?DEFAULT_TCP_ADDRESS), + Port = proplists:get_value(port, ServerOpts, ?DEFAULT_TCP_PORT), + ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), + %% + %% Add listener port. + case listener:add_listener(Pid, IP, Port, CompSpec) of + ok -> + ?notice("---- RVI Node External Address: ~s", + [ application:get_env(rvi_core, node_address, undefined)]); + Err -> + ?warning( + "dlink_tcp:init_rvi_component(): Failed to launch listener (~p,~p): ~p", + [ IP, Port, Err ]), + ok + end; + {error, _} -> + ?info("dlink_tcp: no initial listeners", []) + end. + setup_persistent_connections_([ ], _CompSpec) -> ok; - setup_persistent_connections_([ NetworkAddress | T], CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]), [ IP, Port] = string:tokens(NetworkAddress, ":"), @@ -188,13 +194,14 @@ connect_remote(IP, Port, CompSpec) -> ?info("connect_remote(~p, ~p)~n", [IP, Port]), case connection_manager:find_connection_by_address(IP, Port) of { ok, _Pid } -> + log("already connected", [], CompSpec), already_connected; not_found -> %% Setup a new outbound connection ?info("dlink_tcp:connect_remote(): Connecting ~p:~p", [IP, Port]), - + log("new connection", [], CompSpec), case gen_tcp:connect(IP, Port, listener:sock_opts()) of { ok, Sock } -> ?info("dlink_tcp:connect_remote(): Connected ~p:~p", @@ -217,12 +224,14 @@ connect_remote(IP, Port, CompSpec) -> { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) }, { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } + | rvi_common:log_id_json_tail(CompSpec) ])), ok; {error, Err } -> ?info("dlink_tcp:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), + log("connect FAILED: ~w", [Err], CompSpec), not_available end end. @@ -231,8 +240,11 @@ connect_and_retry_remote( IP, Port, CompSpec) -> ?info("dlink_tcp:connect_and_retry_remote(): ~p:~p", [ IP, Port]), - case connect_remote(IP, list_to_integer(Port), CompSpec) of - ok -> ok; + CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), + case connect_remote(IP, list_to_integer(Port), CS) of + ok -> + log("connected", [], CS), + ok; Err -> %% Failed to connect. Sleep and try again ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Failed: ~p", @@ -240,8 +252,8 @@ connect_and_retry_remote( IP, Port, CompSpec) -> ?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Will try again in ~p sec", [IP, Port, ?DEFAULT_RECONNECT_INTERVAL]), - - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec), + log("start reconnect timer", [], CS), + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS), not_available end. @@ -262,6 +274,7 @@ announce_local_service_(CompSpec, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, { ?DLINK_ARG_SIGNATURE, JWT } + | rvi_common:log_id_json_tail(CompSpec) ])), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", @@ -332,14 +345,18 @@ handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> ?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> - ?debug("dlink_tcp:data(): Elems ~p", [Elems]), + ?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]), + + CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> + ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), [ TransactionID, RemoteAddress, RemotePort, @@ -362,9 +379,10 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> end, process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CompSpec); + ProtoVersion, Signature, Certificates, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> + ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), [ TransactionID, ProtoVersion, Signature ] = @@ -374,12 +392,14 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> Elems, undefined), Conn = {PeerIP, PeerPort}, + log("sa from ~s:~w", [PeerIP, PeerPort], CS), case authorize_rpc:validate_message(CompSpec, Signature, Conn) of [ok, Msg] -> ?debug("Service Announce~nMsg = ~p~n", [Msg]), process_announce(Msg, FromPid, PeerIP, PeerPort, TransactionID, ProtoVersion, CompSpec); _ -> + log("sa INVALID", [], CS), ?debug("Couldn't validate availability msg from ~p", [Conn]) end; @@ -684,10 +704,10 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tcp:authorize(): Certificates: ~p", [ Certificates ]), - ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]), + ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]), + ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), - { _NRemoteAddress, _NRemotePort} = Conn = + {NRemoteAddress, NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -697,6 +717,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, + log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of true -> connection_authorized(FromPid, Conn, CompSpec); @@ -715,13 +736,15 @@ send_authorize(Pid, CompSpec) -> { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ])). + { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } + | rvi_common:log_id_json_tail(CompSpec) ])). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered %% with the conneciton manager, this is an incoming connection %% from the client. We should respond with our own authorize followed by %% a service announce + log("authorized: ~s:~p", [RemoteIP, RemotePort], CompSpec), case connection_manager:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_tcp:authorize(): New connection!"), @@ -746,11 +769,13 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> [ ok, JWT ] = authorize_rpc:sign_message( CompSpec, availability_msg(available, FilteredServices)), + log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), connection:send(FromPid, rvi_common:term_to_json( [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } ])), + { ?DLINK_ARG_SIGNATURE, JWT } + | rvi_common:log_id_json_tail(CompSpec)])), %% Setup ping interval gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), @@ -850,3 +875,15 @@ opt(K, L, Def) -> opts(Keys, Elems, Def) -> [ opt(K, Elems, Def) || K <- Keys]. + + +log_orphan(Pfx, Fmt, Args) -> + start_log(Pfx, Fmt, Args, #component_spec{}). + +start_log(Pfx, Fmt, Args, CS) -> + LogId = rvi_log:new_id(Pfx), + rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogId, CS). + +log(Fmt, Args, CS) -> + rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index 432cf54..abeb19c 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -28,7 +28,7 @@ terminate/2, code_change/3]). -export([setup/6]). --export([upgrade/2]). +-export([upgrade/3]). -export([send/2]). -export([send/3]). -export([is_connection_up/1]). @@ -49,7 +49,8 @@ packet_st = [], mod = undefined, func = undefined, - args = undefined + cs, + role = server :: client | server }). %%%=================================================================== @@ -57,11 +58,10 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, Arg) -> - setup(IP, Port, Sock, Mod, Fun, Arg, []). - -setup(IP, Port, Sock, Mod, Fun, Arg, Opts) -> - Params = {IP, Port, Sock, Mod, Fun, Arg, Opts}, +setup(IP, Port, Sock, Mod, Fun, CompSpec) -> + Params = {IP, Port, Sock, Mod, Fun, CompSpec}, + ?debug("setup() IP = ~p; Port = ~p; Mod = ~p; Fun = ~p", [IP, Port, Mod, Fun]), + ?debug("CompSpec = ~p", [CompSpec]), case gen_server:start_link(?MODULE, Params ,[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), @@ -72,8 +72,8 @@ setup(IP, Port, Sock, Mod, Fun, Arg, Opts) -> Err end. -upgrade(Pid, Role) -> - gen_server:call(Pid, {upgrade, Role}). +upgrade(Pid, Role, CompSpec) when Role==client; Role==server -> + gen_server:call(Pid, {upgrade, Role, CompSpec}). send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). @@ -132,7 +132,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, Arg, Opts}) -> +init({IP, Port, Sock, Mod, Fun, CompSpec}) -> case IP of undefined -> ok; _ -> dlink_tls_connmgr:add_connection(IP, Port, self()) @@ -143,10 +143,10 @@ init({IP, Port, Sock, Mod, Fun, Arg, Opts}) -> ?debug("connection:init(): Sock: ~p", [Sock]), ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), - ?debug("connection:init(): Arg: ~p", [Arg]), %% Grab socket control - PktMod = opt(packet_mod, Opts, ?PACKET_MOD), - PktSt = PktMod:init(Opts), + {ok, PktMod} = rvi_common:get_module_config(dlink_tls, dlink_tls_rpc, + packet_mod, ?PACKET_MOD, CompSpec), + PktSt = PktMod:init(CompSpec), {ok, #st{ ip = IP, port = Port, @@ -155,7 +155,7 @@ init({IP, Port, Sock, Mod, Fun, Arg, Opts}) -> packet_mod = PktMod, packet_st = PktSt, func = Fun, - args = Arg + cs = CompSpec }}. @@ -181,16 +181,20 @@ handle_call(terminate_connection, _From, St) -> {stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St), {stop, Reason, ok, NSt}; -handle_call({upgrade, Role} = Req, _From, #st{sock = S} = St) -> +handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) -> ?debug("~p:handle_call(~p)~n", [?MODULE, Req]), {ok, [{active, Last}]} = inet:getopts(S, [active]), inet:setopts(S, [{active, false}]), - case do_upgrade(S, Role) of + case do_upgrade(S, Role, CompSpec) of {ok, NewS} -> ?debug("upgrade to TLS succcessful~n", []), - ssl:setopts(NewS, [{active, Last}]), - {reply, ok, St#st{sock = NewS, mode = tls}}; + ssl:setopts(NewS, [{active, Last}]), + {ok, {IP, Port}} = ssl:peername(NewS), + NewCS = rvi_common:set_value(dlink_tls_role, client, CompSpec), + {reply, ok, St#st{sock = NewS, mode = tls, role = Role, + ip = inet_parse:ntoa(IP), port = Port, + cs = NewCS}}; Error -> ?error("Cannot upgrade to TLS: ~p~n", [Error]), {stop, Error, Error, St} @@ -251,17 +255,14 @@ handle_info({tcp, Sock, Data}, handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port, - mod = Mod, func = Fun, args = Arg, + mod = Mod, func = Fun, cs = CS, packet_mod = PMod, packet_st = PSt} = State) -> ?debug("handle_info(data): ~p", [Data]), case PMod:decode(Data, PSt) of {ok, Elements, PSt1} -> ?debug("~p:handle_info(data complete): Processed: ~p", [?MODULE, Elements]), - FromPid = self(), - spawn(fun() -> [Mod:Fun(FromPid, IP, Port, - data, Elem, Arg) || Elem <- Elements] - end), + Mod:Fun(self(), IP, Port, data, Elements, CS), ssl:setopts(Sock, [{active, once}]), {noreply, State#st{packet_st = PSt1}}; {error, Reason} -> @@ -276,7 +277,7 @@ handle_info({tcp, Sock, Data}, port = Port, mod = Mod, func = Fun, - args = Arg, + cs = CS, packet_mod = PMod, packet_st = PSt} = State) -> ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), @@ -286,10 +287,7 @@ handle_info({tcp, Sock, Data}, {ok, Elements, PSt1} -> ?debug("~p:handle_info(data complete): Processed: ~p", [?MODULE, Elements]), - FromPid = self(), - spawn(fun() -> [Mod:Fun(FromPid, IP, Port, - data, Elem, Arg) || Elem <- Elements] - end), + Mod:Fun(self(), IP, Port, data, Elements, CS), inet:setopts(Sock, [{active, once}]), {noreply, State#st{packet_st = PSt1}}; {error, Reason} -> @@ -305,9 +303,9 @@ handle_info({tcp_closed, Sock}, port = Port, mod = Mod, func = Fun, - args = Arg } = State) -> + cs = CS} = State) -> ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]), - Mod:Fun(self(), IP, Port,closed, Arg), + Mod:Fun(self(), IP, Port,closed, CS), gen_tcp:close(Sock), dlink_tls_connmgr:delete_connection_by_pid(self()), {stop, normal, State}; @@ -317,10 +315,10 @@ handle_info({tcp_error, _Sock}, port = Port, mod = Mod, func = Fun, - args = Arg} = State) -> + cs = CS} = State) -> ?debug("~p:handle_info(tcp_error): Address: ~p:~p ", [ ?MODULE, IP, Port]), - Mod:Fun(self(), IP, Port, error, Arg), + Mod:Fun(self(), IP, Port, error, CS), dlink_tls_connmgr:delete_connection_by_pid(self()), {stop, normal, State}; @@ -359,27 +357,24 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +do_upgrade(Sock, client, CompSpec) -> + ssl:connect(Sock, tls_opts(client, CompSpec)); +do_upgrade(Sock, server, CompSpec) -> + ssl:ssl_accept(Sock, tls_opts(server, CompSpec)). + %% FIXME: For now, use the example certs delivered with the OTP SSL appl. -tls_opts(Role) -> +tls_opts(Role, CompSpec) -> Dir = tls_dir(Role), - [{certfile, filename:join(Dir, "cert.pem")}, - {keyfile, filename:join(Dir, "key.pem")}]. + {ok, CertFile} = get_config(certfile, filename:join(Dir, "cert.pem"), CompSpec), + {ok, KeyFile} = get_config(keyfile, filename:join(Dir, "key.pem"), CompSpec), + [{certfile, CertFile}, + {keyfile, KeyFile}]. tls_dir(Role) when Role==client; - Role==server -> + Role==server -> filename:join([code:lib_dir(ssl), "examples", "certs", "etc", - atom_to_list(Role)]). - -do_upgrade(Sock, client) -> - ssl:connect(Sock, tls_opts(client)); -do_upgrade(Sock, server) -> - ssl:ssl_accept(Sock, tls_opts(server)). + atom_to_list(Role)]). - -opt(K, Opts, Def) -> - case lists:keyfind(K, 1, Opts) of - {_, V} -> - V; - false -> - Def - end. +get_config(Key, Default, CompSpec) -> + rvi_common:get_module_config( + dlink_tls, dlink_tls_rpc, Key, Default, CompSpec). diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl index 64b004f..25da496 100644 --- a/components/dlink_tls/src/dlink_tls_listener.erl +++ b/components/dlink_tls/src/dlink_tls_listener.erl @@ -64,18 +64,20 @@ terminate(_Reason, _State) -> ok. sock_opts() -> - [list, {active, once}, {packet, 0}]. + [{mode, binary}, {active, once}, {packet, 0}]. new_connection(IP, Port, Sock, State) -> - ?debug("~p:new_connection(): Peer IP: ~p (ignored)", [?MODULE,IP]), - ?debug("~p:new_connection(): Peer Port: ~p (ignored)", [?MODULE,Port]), - ?debug("~p:new_connection(): Sock: ~p", [?MODULE,Sock]), + ?debug("new_connection(): Peer IP: ~p (ignored)", [IP]), + ?debug("new_connection(): Peer Port: ~p (ignored)", [Port]), + ?debug("new_connection(): Sock: ~p", [Sock]), %% IP and Port are garbage. We'll grab peername when we get our %% first data. %% Provide component spec as extra arg. - {ok, _P} = dlink_tls_conn:setup( - undefined, 0, Sock, - dlink_tls_rpc, - handle_socket, [gen_nb_server:get_cb_state(State)]), + CompSpec = gen_nb_server:get_cb_state(State), + {ok, P} = dlink_tls_conn:setup( + undefined, 0, Sock, + dlink_tls_rpc, + handle_socket, [CompSpec]), + dlink_tls_conn:upgrade(P, server, CompSpec), {ok, State}. diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 21df19f..964cc0c 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -35,11 +35,11 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --include_lib("rvi_common/include/rvi_dlink.hrl"). +-include_lib("rvi_common/include/rvi_dlink_bin.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(SERVER_OPTS, server_opts). --define(DEFAULT_TCP_PORT, 9999). +-define(DEFAULT_TCP_PORT, 9998). -define(DEFAULT_RECONNECT_INTERVAL, 5000). -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes @@ -94,13 +94,14 @@ start_json_server() -> start_connection_manager() -> CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts} = rvi_common:get_module_config(data_link, - ?MODULE, - ?SERVER_OPTS, - [], - CompSpec), - IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS), - Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT), + {ok, TlsOpts} = rvi_common:get_module_config(data_link, + ?MODULE, + ?SERVER_OPTS, + [], + CompSpec), + ?debug("TlsOpts = ~p", [TlsOpts]), + IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), + Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), @@ -136,7 +137,8 @@ setup_persistent_connections_([ ], _CompSpec) -> setup_persistent_connections_([ NetworkAddress | T], CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]), [ IP, Port] = string:tokens(NetworkAddress, ":"), - connect_and_retry_remote(IP, Port, CompSpec), + %% cast an immediate (re-)connect attempt to dlink_tls_rpc + setup_reconnect_timer(0, IP, Port, CompSpec), setup_persistent_connections_(T, CompSpec), ok. @@ -187,41 +189,34 @@ connect_remote(IP, Port, CompSpec) -> ?info("connect_remote(~p, ~p)~n", [IP, Port]), case dlink_tls_connmgr:find_connection_by_address(IP, Port) of { ok, _Pid } -> + log("already connected", [], CompSpec), already_connected; not_found -> %% Setup a new outbound connection - ?info("dlink_tls:connect_remote(): Connecting ~p:~p", - [IP, Port]), - - case gen_tcp:connect(IP, Port, [list, {packet, 0}]) of + {ok, Timeout} = rvi_common:get_module_config( + dlink_tls, ?MODULE, connect_timeout, 10000, CompSpec), + ?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p", + [IP, Port, Timeout]), + log("new connection", [], CompSpec), + case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of { ok, Sock } -> ?info("dlink_tls:connect_remote(): Connected ~p:~p", [IP, Port]), %% Setup a genserver around the new connection. {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, - ?MODULE, handle_socket, [CompSpec] ), - + ?MODULE, handle_socket, CompSpec), + UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec), + ?debug("Upgrade result = ~p", [UgRes]), %% Send authorize - { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - dlink_tls_conn:send( - Pid, - term_to_json( - {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, - { ?DLINK_ARG_CERTIFICATES, - {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - ]})), + send_authorize(Pid, CompSpec), ok; {error, Err } -> ?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), + log("connect FAILED: ~w", [Err], CompSpec), not_available end end. @@ -229,9 +224,11 @@ connect_remote(IP, Port, CompSpec) -> connect_and_retry_remote( IP, Port, CompSpec) -> ?info("dlink_tls:connect_and_retry_remote(): ~p:~p", [ IP, Port]), - - case connect_remote(IP, list_to_integer(Port), CompSpec) of - ok -> ok; + CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), + case connect_remote(IP, list_to_integer(Port), CS) of + ok -> + log("connected", [], CS), + ok; Err -> %% Failed to connect. Sleep and try again ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Failed: ~p", @@ -239,8 +236,8 @@ connect_and_retry_remote( IP, Port, CompSpec) -> ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Will try again in ~p sec", [IP, Port, ?DEFAULT_RECONNECT_INTERVAL]), - - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec), + log("start reconnect timer", [], CS), + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS), not_available end. @@ -253,16 +250,12 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), + AvailabilityMsg = availability_msg(Availability, [Service]), Res = dlink_tls_conn:send( ConnPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - ]})), + rvi_common:pass_log_id( + [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE } + | AvailabilityMsg ], CompSpec)), ?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -282,7 +275,7 @@ announce_local_service_(CompSpec, Service, Availability) -> handle_socket(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); -handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> +handle_socket(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tls:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -323,57 +316,57 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> end, ok; -handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> +handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) -> ?info("dlink_tls:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. -handle_socket(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> +handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> + + ?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]), + ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]), - ?debug("dlink_tls:data(): Elems ~p", [Elems]), + CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, + ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), + [ RemoteAddress, RemotePort, - ProtoVersion, - CertificatesTmp, Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, ?DLINK_ARG_SIGNATURE], Elems, undefined), - - Certificates = - case CertificatesTmp of - { array, C} -> C; - undefined -> [] - end, - process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CompSpec); + process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, + Signature, CS); + + ?DLINK_CMD_CERT_EXCHANGE -> + ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]), + [ Certs ] = + opts([?DLINK_ARG_CERTIFICATES], Elems, undefined), + ?debug("Certs = ~p", [Certs]), + log("certs from ~s:~w", [PeerIP, PeerPort], CS), + authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}), + case rvi_common:get_value(dlink_tls_role, client, CS) of + client -> ok; + server -> + send_certs(FromPid, CompSpec) + end, + ok; ?DLINK_CMD_SERVICE_ANNOUNCE -> - [ TransactionID, - ProtoVersion, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_SIGNATURE], + ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), Conn = {PeerIP, PeerPort}, - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - process_announce(Msg, FromPid, PeerIP, PeerPort, - TransactionID, ProtoVersion, CompSpec); - _ -> - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + log("sa from ~s:~w", [PeerIP, PeerPort], CS), + process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -644,20 +637,17 @@ delete_services(ConnPid, SvcNameList) -> ok. availability_msg(Availability, Services) -> - {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) }, - { ?DLINK_ARG_SERVICES, {array, Services} }]}. + [{ ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, Services }]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, - RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> - ?info("dlink_tls:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), - ?info("dlink_tls:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), - ?info("dlink_tls:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), - ?debug("dlink_tls:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tls:authorize(): Certificates: ~p", [ Certificates ]), - ?debug("dlink_tls:authorize(): Signature: ~p", [ Signature ]), +process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, Signature, CompSpec) -> + ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]), + ?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]), + ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), { _NRemoteAddress, _NRemotePort} = Conn = case { RemoteAddress, RemotePort } of @@ -669,7 +659,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, - case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of + case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of true -> connection_authorized(FromPid, Conn, CompSpec); false -> @@ -677,37 +667,59 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, false end. +send_certs(Pid, CompSpec) -> + ?debug("send_certs (Pid = ~p)", [Pid]), + {LocalIP, LocalPort} = rvi_common:node_address_tuple(), + dlink_tls_conn:send(Pid, rvi_common:pass_log_id( + [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE}, + {?DLINK_ARG_ADDRESS, LocalIP}, + {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, + {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)). + send_authorize(Pid, CompSpec) -> + ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]), {LocalIP, LocalPort} = rvi_common:node_address_tuple(), - dlink_tls_conn:send(Pid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, - { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, - { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + JWT = get_authorize_jwt(CompSpec), + dlink_tls_conn:send(Pid, rvi_common:pass_log_id( + [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE}, + {?DLINK_ARG_ADDRESS, LocalIP}, + {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, + {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)). + %% dlink_tls_conn:send(Pid, + %% term_to_json( + %% {struct, + %% [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + %% { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + %% { ?DLINK_ARG_ADDRESS, LocalIP }, + %% { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, + %% { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, + %% { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, + %% { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered %% with the connection manager, this is an incoming connection %% from the client. We should respond with our own authorize followed by %% a service announce + log("authorized ~s:~w", [RemoteIP, RemotePort], CompSpec), case dlink_tls_connmgr:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_tls:authorize(): New connection!"), dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid), ?debug("dlink_tls:authorize(): Sending authorize."), _Res = send_authorize(FromPid, CompSpec), - ?debug("dlink_tls:upgrade connection", []), - UgRes = dlink_tls_conn:upgrade(FromPid, server), - ?debug("upgrade result: ~p", [UgRes]), + case rvi_common:get_value(dlink_tls_role, server, CompSpec) of + server -> ok; + client -> + send_certs(FromPid, CompSpec) + end, + %% ?debug("dlink_tls:upgrade connection", []), + %% UgRes = dlink_tls_conn:upgrade(FromPid, server), + %% ?debug("upgrade result: ~p", [UgRes]), ok; _ -> - UgRes = dlink_tls_conn:upgrade(FromPid, client), - ?debug("upgrade result: ~p", [UgRes]), + %% UgRes = dlink_tls_conn:upgrade(FromPid, client), + %% ?debug("upgrade result: ~p", [UgRes]), ok end, @@ -722,14 +734,12 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> ?info("dlink_tls:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteIP, RemotePort]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), + AvailabilityMsg = availability_msg(available, FilteredServices), + log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), dlink_tls_conn:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } ]})), + rvi_common:pass_log_id( + [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE } + | AvailabilityMsg ], CompSpec)), %% Setup ping interval gen_server:call(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), @@ -742,12 +752,8 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, base64:decode_to_string(Data)). -process_announce({struct, Elems}, FromPid, IP, Port, TID, _Vsn, CompSpec) -> - [ Avail, - {array, Svcs} ] = - opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), +process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), - ?debug("dlink_tls:service_announce(~p): TransactionID: ~p", [Avail,TID]), ?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]), case Avail of ?DLINK_ARG_AVAILABLE -> @@ -810,8 +816,8 @@ get_certificates(CompSpec) -> error(no_certificate_found) end. -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of +validate_auth_jwt(JWT, Conn, CompSpec) -> + case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of [ok] -> true; [not_found] -> @@ -828,4 +834,18 @@ opt(K, L, Def) -> end. opts(Keys, Elems, Def) -> - [ opt(K, Elems, Def) || K <- Keys]. + Res = [ opt(K, Elems, Def) || K <- Keys], + ?debug("opts(~p) -> ~p", [Keys, Elems]), + Res. + + +log_orphan(Pfx, Fmt, Args) -> + start_log(Pfx, Fmt, Args, #component_spec{}). + +start_log(Pfx, Fmt, Args, CS) -> + LogId = rvi_log:new_id(Pfx), + rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogId, CS). + +log(Fmt, Args, CS) -> + rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). diff --git a/components/rvi_common/include/rvi_common.hrl b/components/rvi_common/include/rvi_common.hrl index ca62c25..5deff46 100644 --- a/components/rvi_common/include/rvi_common.hrl +++ b/components/rvi_common/include/rvi_common.hrl @@ -2,11 +2,11 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% -%% A record defining the modules to use +%% A record defining the modules to use %% Used by rvi_common:request() to %% figure out how to route an intra-component call @@ -16,9 +16,11 @@ service_edge :: ?COMP_SPEC_TYPE, schedule :: ?COMP_SPEC_TYPE, service_discovery :: ?COMP_SPEC_TYPE, - authorize :: ?COMP_SPEC_TYPE, - data_link :: ?COMP_SPEC_TYPE, - protocol :: ?COMP_SPEC_TYPE + authorize :: ?COMP_SPEC_TYPE, + data_link :: ?COMP_SPEC_TYPE, + protocol :: ?COMP_SPEC_TYPE, + rvi_common :: ?COMP_SPEC_TYPE, + values = [] :: [{any(), any()}] }). -define(COMP_SPEC_SERVICE_EDGE_DEFAULT, [ { service_edge_rpc, gen_server, [] } ]). @@ -27,3 +29,4 @@ -define(COMP_SPEC_AUTHORIZE_DEFAULT, [ { authorize_rpc, gen_server, [] }]). -define(COMP_SPEC_DATA_LINK_DEFAULT, [ { dlink_tcp_rpc, gen_server, [] } ]). -define(COMP_SPEC_PROTOCOL_DEFAULT, [ { protocol, gen_server, [] } ]). +-define(COMP_SPEC_RVI_COMMON_DEFAULT, [ { rvi_log, gen_server, [] } ]). diff --git a/components/rvi_common/include/rvi_dlink.hrl b/components/rvi_common/include/rvi_dlink.hrl index 24e7bc0..ab1e49d 100644 --- a/components/rvi_common/include/rvi_dlink.hrl +++ b/components/rvi_common/include/rvi_dlink.hrl @@ -9,6 +9,7 @@ %% Commonly used protocol identifiers across dlink implementations %% -define(DLINK_CMD_AUTHORIZE, "au"). +-define(DLINK_CMD_CERT_EXCHANGE, "crt"). -define(DLINK_CMD_SERVICE_ANNOUNCE, "sa"). -define(DLINK_CMD_RECEIVE, "rcv"). -define(DLINK_CMD_FRAG, "frg"). diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl index f48dfd5..f4f4cb8 100644 --- a/components/rvi_common/include/rvi_dlink_bin.hrl +++ b/components/rvi_common/include/rvi_dlink_bin.hrl @@ -9,6 +9,7 @@ %% Commonly used protocol identifiers across dlink implementations %% -define(DLINK_CMD_AUTHORIZE, <<"au">>). +-define(DLINK_CMD_CERT_EXCHANGE, <<"crt">>). -define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>). -define(DLINK_CMD_RECEIVE, <<"rcv">>). -define(DLINK_CMD_FRAG, <<"frg">>). diff --git a/components/rvi_common/src/rvi_common.app.src b/components/rvi_common/src/rvi_common.app.src index 2c5f1ff..368423b 100644 --- a/components/rvi_common/src/rvi_common.app.src +++ b/components/rvi_common/src/rvi_common.app.src @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -18,6 +18,7 @@ stdlib ]}, {mod, { rvi_common_app, []}}, - {start_phases, [{setup_config, []}]}, + {start_phases, [{setup_config, []}, + {start_json_servers, []}]}, {env, []} ]}. diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index 911bd4a..c9bbabb 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -34,15 +34,25 @@ get_component_modules/1, get_component_modules/2, get_module_specification/3, + get_module_config/3, get_module_config/4, get_module_config/5, get_module_json_rpc_address/3, get_module_json_rpc_url/3, - get_module_genserver_pid/3 + get_module_genserver_pid/3, + get_value/3 ]). --export([utc_timestamp/0]). +-export([set_value/3]). +-export([get_log_id/1, %% (CompSpec) + get_json_log_id/1, %% (JSONArgs) + log_id_json_tail/1, %% (CompSpec) + pick_up_json_log_id/2, %% (JSONArgs, CompSpec) + pass_log_id/2]). %% (PropList, CompSpec) +-export([utc_timestamp/0, + utc_timestamp/1]). -export([bin/1]). --export([start_json_rpc_server/3]). +-export([start_json_rpc_server/3, + start_json_rpc_server/4]). -export([extract_json/2, normalize_json/1, term_to_json/1]). @@ -87,6 +97,7 @@ status_values() -> {7, unauthorized}]. get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) -> + ?debug("JSONBody = ~p", [JSONBody]), case get_json_element(["result", "status"], JSONBody) of {ok, Value} -> { json_rpc_status(Value), JSONBody }; @@ -110,7 +121,7 @@ get_request_result(Other)-> json_argument([], [], Acc) -> - Acc; + lists:reverse(Acc); json_argument([Arg | AT], [Spec | ST], Acc) when is_atom(Arg)-> json_argument(AT, ST, [ { Spec, atom_to_list(Arg) } | Acc]); @@ -121,24 +132,26 @@ json_argument([Arg | AT], [Spec | ST], Acc) -> %% Convert a list of unnamed arguments to a proplist %% understood by json encode json_argument(ArgList, SpecList) -> - { struct, json_argument(ArgList, SpecList, []) }. + json_argument(ArgList, SpecList, []). request(Component, Module, Function, - InArgPropList, + InArgPropList0, OutArgSpec, CompSpec) -> %% Split [ { network_address, "127.0.0.1:888" } , { timeout, 34 } ] to %% [ "127.0.0.1:888", 34] [ network_address, timeout ] + InArgPropList = pass_log_id(InArgPropList0, CompSpec), InArg = [ Val || { _Key, Val } <- InArgPropList ], InArgSpec = [ Key || { Key, _Val } <- InArgPropList ], %% Figure out how we are to invoke this MFA. case get_module_type(Component, Module, CompSpec) of %% We have a gen_server { ok, gen_server } -> - ?debug("Sending ~p - ~p:~p(~p)", [Component, Module, Function, InArg]), + ?debug("Sending ~p - ~p:~p(~p)", [Component, Module, Function, + authorize_keys:abbrev_payload(InArg)]), gen_server:call(Module, { rvi, Function, InArg}); %% We have a JSON-RPC server @@ -161,15 +174,15 @@ request(Component, Err1 -> Err1 end. - notification(Component, Module, Function, - InArgPropList, + InArgPropList0, CompSpec) -> %% Split [ { network_address, "127.0.0.1:888" } , { timeout, 34 } ] to %% [ "127.0.0.1:888", 34] [ network_address, timeout ] + InArgPropList = pass_log_id(InArgPropList0, CompSpec), InArg = [ Val || { _Key, Val } <- InArgPropList ], InArgSpec = [ Key || { Key, _Val } <- InArgPropList ], %% Figure out how we are to invoke this MFA. @@ -202,6 +215,14 @@ notification(Component, ok end. +pass_log_id(PList, CompSpec) -> + case get_value(rvi_log_id, undefined, CompSpec) of + undefined -> + PList; + ID -> + lists:keystore(rvi_log_id, 1, PList, {rvi_log_id, ID}) + end. + send_json_request(Url,Method, Args) -> Req = jsx_encode([{<<"jsonrpc">>, <<"2.0">>}, @@ -318,7 +339,7 @@ get_json_element_([Elem | T], JSON) -> {'OR', Alts} -> get_json_element_(T, get_json_element_alt(Alts, JSON)); _ -> - get_json_element_(T, get_value(Elem, JSON, undefined)) + get_json_element_(T, get_json_value(Elem, JSON, undefined)) end; get_json_element_(Path,JSON) -> @@ -342,12 +363,14 @@ member(K, [H|T]) -> member(_, []) -> false. -get_value(K, [{K1,V}|T], Def) -> +get_json_value(K, [{K1,V}|T], Def) -> case comp(K, K1) of true -> V; - false -> get_value(K, T, Def) + false -> get_json_value(K, T, Def) end; -get_value(_, [], Def) -> +get_json_value(K, [_|T], Def) -> + get_json_value(K, T, Def); +get_json_value(_, [], Def) -> Def. comp(A, A) -> true; @@ -497,7 +520,8 @@ get_component_specification_() -> service_discovery = ?COMP_SPEC_SERVICE_DISCOVERY_DEFAULT, authorize = ?COMP_SPEC_AUTHORIZE_DEFAULT, data_link = ?COMP_SPEC_DATA_LINK_DEFAULT, - protocol = ?COMP_SPEC_PROTOCOL_DEFAULT + protocol = ?COMP_SPEC_PROTOCOL_DEFAULT, + rvi_common = ?COMP_SPEC_RVI_COMMON_DEFAULT }; CompList -> @@ -545,10 +569,12 @@ get_component_modules(data_link, CompSpec) -> get_component_modules(protocol, CompSpec) -> CompSpec#component_spec.protocol; +get_component_modules(rvi_common, CompSpec) -> + CompSpec#component_spec.rvi_common; + get_component_modules(_, _) -> undefined. - %% Get the spec for a specific module (protocol_bert_rpc) within %% a component (protocol). get_module_specification(Component, Module, CompSpec) -> @@ -577,6 +603,9 @@ get_module_specification(Component, Module, CompSpec) -> end end. +get_module_config(Component, Module, Key) -> + get_module_config(Component, Module, Key, get_component_specification()). + %% Get a specific option (bert_rpc_port) for a specific module %% (protocol_bert_rpc) within a component (protocol). get_module_config(Component, Module, Key, CompSpec) -> @@ -613,6 +642,43 @@ get_module_config(Component, Module, Key, Default, CompSpec) -> _ -> {ok, Default } end. +set_value(Key, Value, #component_spec{values = Keys} = CompSpec) -> + CompSpec#component_spec{values = lists:keystore(Key, 1, Keys, {Key, Value})}. + +get_value(Key, Default, #component_spec{values = Values}) -> + case lists:keyfind(Key, 1, Values) of + {_, Value} -> + Value; + false -> + Default + end. + +get_log_id(CS) -> + get_value(rvi_log_id, <<"null">>, CS). + +log_id_json_tail(CS) -> + case get_value(rvi_log_id, undefined, CS) of + undefined -> + []; + Id -> + [{<<"rvi_log_id">>, Id}] + end. + +get_json_log_id(Args) -> + case get_json_element([<<"rvi_log_id">>], Args) of + {ok, ID} -> + ID; + {error, _} -> + <<"null">> + end. + +pick_up_json_log_id(Args, CS) -> + case get_json_element([<<"rvi_log_id">>], Args) of + {ok, ID} -> + set_value(rvi_log_id, ID, CS); + {error, _} -> + CS + end. get_module_type(Component, Module, CompSpec) -> case get_module_specification(Component, Module, CompSpec) of @@ -683,13 +749,16 @@ get_module_genserver_pid(Component, Module, CompSpec) -> start_json_rpc_server(Component, Module, Supervisor) -> + start_json_rpc_server(Component, Module, Supervisor, []). + +start_json_rpc_server(Component, Module, Supervisor, XOpts) -> Addr = get_module_json_rpc_address(Component, Module, get_component_specification()), case Addr of { ok, IP, Port } -> - ExoHttpOpts = [ { ip, IP }, { port, Port } ], + ExoHttpOpts = [ { ip, IP }, { port, Port } | XOpts ], exoport_exo_http:instance(Supervisor, Module, @@ -701,12 +770,14 @@ start_json_rpc_server(Component, Module, Supervisor) -> Err end. - - utc_timestamp() -> calendar:datetime_to_gregorian_seconds( calendar:universal_time()) - seconds_jan_1970(). +utc_timestamp({_,_,_} = TS) -> + calendar:datetime_to_gregorian_seconds( + calendar:now_to_universal_time(TS)) - seconds_jan_1970(). + seconds_jan_1970() -> %% calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}). 62167219200. diff --git a/components/rvi_common/src/rvi_common_app.erl b/components/rvi_common/src/rvi_common_app.erl index 998b921..c2564d4 100644 --- a/components/rvi_common/src/rvi_common_app.erl +++ b/components/rvi_common/src/rvi_common_app.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -25,6 +25,9 @@ start(_StartType, _StartArgs) -> start_phase(setup_config, _, _) -> rvi_config:setup_substitution(rvi_core), + ok; +start_phase(start_json_servers, _, _) -> + rvi_log:start_json_server(), ok. stop(_State) -> diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl index 620672a..5d0fa8e 100644 --- a/components/rvi_common/src/rvi_log.erl +++ b/components/rvi_common/src/rvi_log.erl @@ -4,7 +4,16 @@ -export([start_link/0, log/3, - fetch/1]). + flog/4, + new_id/1, + fetch/1, + timestamp/0, + format/2 + ]). + +-export([start_json_server/0, + handle_rpc/2, + handle_notification/2]). -export([init/1, handle_call/3, @@ -13,35 +22,113 @@ terminate/2, code_change/3]). --record(st, {n = 100}). +-include_lib("lager/include/log.hrl"). + +-record(st, {n = 100, + seq = 1, + node_tag}). -record(evt, {id, component, event}). --define(TIDS, rvi_log_tids). +-define(IDS, rvi_log_ids). -define(EVENTS, rvi_log_events). +-define(MAX_LENGTH, 60). + + start_link() -> create_tabs(), gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -log(TID, Component, Event) -> - gen_server:cast(?MODULE, {log, TID, Component, Event}). +log(TID, Component, Event) when is_binary(TID), is_binary(Component) -> + gen_server:cast(?MODULE, {log, TID, timestamp(), Component, bin(Event)}). + +new_id(Prefix) -> + gen_server:call(?MODULE, {new_id, bin(Prefix)}). + +format(Fmt, Args) -> + try format_(Fmt, Args) + catch + error:_ -> + format_("FORMAT ERROR ~p, ~p", [Fmt, Args]) + end. + +format_(Fmt, Args) -> + trunc_msg(iolist_to_binary(io_lib:fwrite(Fmt, Args))). + +trunc_msg(Bin) when byte_size(Bin) =< ?MAX_LENGTH -> + Bin; +trunc_msg(Bin) -> + binary:part(Bin, 0, ?MAX_LENGTH). -fetch(Tid) -> - ets:select(?EVENTS, [{#evt{id = {Tid,'$1'}, - component = '$2', - event = '$3'}, [], [{{Tid, '$1', '$2', '$3'}}] }]). + +flog(Fmt, Args, Component, CS) -> + LogTID = rvi_common:get_log_id(CS), + log(LogTID, Component, format(Fmt, Args)). + +timestamp() -> + os:timestamp(). + +fetch(Tids) -> + TidSet = select_ids(Tids), + [{Tid, ets:select(?EVENTS, [{#evt{id = {Tid,'$1'}, + component = '$2', + event = '$3'}, [], [{{'$1', '$2', '$3'}}] }])} + || Tid <- TidSet]. init(_) -> - {ok, #st{}}. + {ok, Tag} = rvi_common:get_module_config( + rvi_common, rvi_log, node_tag, base64url:encode(crypto:rand_bytes(3)), + rvi_common:get_component_specification()), + gen_server:cast(self(), log_start), + {ok, #st{node_tag = bin(Tag)}}. + +start_json_server() -> + ?debug("rvi_log:start_json_server()", []), + case rvi_common:start_json_rpc_server(rvi_common, + ?MODULE, + rvi_common_sup) of + ok -> ok; + Err -> + ?warning("rvi_log:start_json_server(): Failed to start: ~p", [Err]), + Err + end. +handle_rpc(<<"log">>, Args) -> + handle_notification(<<"log">>, Args), + {ok, [{status, rvi_common:json_rpc_status(ok)}]}; +handle_rpc(<<"fetch">>, Args) -> + TIDs = get_json_ids(Args), + Res = [{TID, fetch(TID)} || TID <- TIDs], + {ok, [{status, rvi_common:json_rpc_status(ok)}, + {<<"log">>, format_result(Res)}]}; +handle_rpc(Other, _Args) -> + ?warning("rvi_log:handle_rpc(~p): unknown command", [ Other ]), + {ok, [{status, rvi_common:json_rpc_status(invalid_command)}]}. + +handle_notification(<<"log">>, Args) -> + {ok, TID} = rvi_common:get_json_element([<<"tid">>], Args), + {ok, Component} = rvi_common:get_json_element([<<"cmp">>], Args), + {ok, Event} = rvi_common:get_json_element([<<"evt">>], Args), + log(TID, Component, Event), + ok; +handle_notification(Other, _Args) -> + ?warning("rvi_log:handle_notification(~p): unknown command", [Other]), + ok. -handle_cast({log, TID, Component, Event}, #st{n = N} = St) -> - do_log(TID, Component, Event, N), +handle_cast(log_start, #st{n = N, node_tag = Tag} = St) -> + do_log(tid_(<<"rvi_log">>, 0, Tag), timestamp(), <<"rvi_common">>, + format("Started - Tag = ~s", [Tag]), N), + {noreply, St}; +handle_cast({log, TID, TS, Component, Event}, #st{n = N} = St) -> + do_log(TID, TS, Component, Event, N), {noreply, St}; handle_cast(_, St) -> {noreply, St}. +handle_call({new_id, Prefix}, _From, #st{seq = Seq, node_tag = Tag} = S) -> + {reply, <<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>, + S#st{seq = Seq + 1}}; handle_call(_, _From, St) -> {reply, {error, unknown_call}, St}. @@ -60,8 +147,8 @@ code_change(_, St, _) -> %% ====================================================================== create_tabs() -> - maybe_new(?TIDS, [ordered_set, named_table]), - maybe_new(?EVENTS, [ordered_set, named_table]). + maybe_new(?IDS, [ordered_set, public, named_table]), + maybe_new(?EVENTS, [ordered_set, public, named_table, {keypos, #evt.id}]). maybe_new(T, Opts) -> case ets:info(T, name) of @@ -71,34 +158,91 @@ maybe_new(T, Opts) -> true end. -do_log(Tid, Component, Event, N) -> - case ets:member(?TIDS, Tid) of +tid_(Prefix, Seq, Tag) -> + <<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>. + +do_log(Tid, TS, Component, Event, N) -> + case ets:member(?IDS, Tid) of true -> - store_event(Tid, Component, Event); + store_event(Tid, TS, Component, Event); false -> - case ets:info(?TIDS, size) of + case ets:info(?IDS, size) of Sz when Sz >= N -> - purge_tid(); + purge_id(); _ -> ok end, - store_event(Tid, Component, Event) + ets:insert(?IDS, {Tid}), + store_event(Tid, TS, Component, Event) end. -store_event(Tid, Component, Event) -> - ets:insert(?EVENTS, #evt{id = {Tid, seq()}, +store_event(Tid, TS, Component, Event) -> + rvi_log_log:info("~-20s ~-12s ~s", [Tid, Component, Event]), + ?info("RVI_LOG: ~p/~p/~p", [Tid, Component, Event]), + ets:insert(?EVENTS, #evt{id = {Tid, TS}, component = Component, event = Event}). -purge_tid() -> - case ets:first(?TIDS) of +purge_id() -> + case ets:first(?IDS) of '$end_of_table' -> %% Should not be possible ... ok; - Tid -> - ets:delete(?TIDS, Tid), + {Tid} -> + ets:delete(?IDS, Tid), ets:match_delete(?EVENTS, #evt{id = {Tid,'_'}, _ = '_'}) end. -seq() -> - erlang:now(). +get_json_ids(Args) -> + Res = case rvi_common:get_json_element([<<"tid">>], Args) of + {ok, TID} when is_binary(TID) -> + [TID]; + {ok, TIDs} when is_list(TIDs) -> + TIDs; + {error, _} -> + [] + end, + lists:filter(fun valid_id_pat/1, Res). + +valid_id_pat(TP) -> + %% We'll accept anything that isn't blatantly *invalid* + try begin _ = re:run([], TP, []), + true + end + catch + _:_ -> + false + end. + +select_ids(TIDs) -> + ets:foldr( + fun({Tid}, Acc) -> + case match_id(Tid, TIDs) of + true -> [Tid|Acc]; + false -> Acc + end + end, [], ?IDS). + +match_id(Tid, [Pat|Pats]) -> + case re:run(Tid, Pat, []) of + {match, _} -> true; + nomatch -> match_id(Tid, Pats) + end; +match_id(_, []) -> + false. + +format_result(Log) -> + [{TID, format_events(Es)} || {TID, Es} <- Log]. + +format_events([{TS, Comp, Evt}|Es]) -> + [[{<<"ts">>, rvi_common:utc_timestamp(TS)}, + {<<"cmp">>, bin(Comp)}, + {<<"evt">>, bin(Evt)}] || format_events(Es)]; +format_events([]) -> + []. + +bin(A) when is_atom(A) -> atom_to_binary(A, latin1); +bin(B) when is_binary(B) -> B; +bin(L) when is_list(L) -> iolist_to_binary(L); +bin(Other) -> + iolist_to_binary(io_lib:fwrite("~w", [Other])). diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl index a27fb49..67b68eb 100644 --- a/components/schedule/src/rvi_routing.erl +++ b/components/schedule/src/rvi_routing.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2015, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% -module(rvi_routing). @@ -28,7 +28,7 @@ service_prefix = ""::string(), proto_link_pairs = []:: list(tuple()) }). - + -record(st, { routes= []:: list(#route{}) }). @@ -173,7 +173,7 @@ prefix_match_(_Service, [], Len) -> prefix_match_([], _Prefix, _Len ) -> -1; -%% +%% prefix_match_([ ServiceH | ServiceT], [ PrefixH | PrefixT ], Len) when ServiceH =:= PrefixH -> @@ -181,7 +181,7 @@ prefix_match_([ ServiceH | ServiceT], [ PrefixH | PrefixT ], Len) %% Mismatch between the the service and candidate. No match prefix_match_(_Service, _Prefix, _Len) -> - -1. + -1. find_routes_([], _Service, CurRoutes, CurMatchLen ) -> { CurRoutes, CurMatchLen }; @@ -191,7 +191,7 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) %% Do we have a better match than previosly recorded? case MatchLen >= CurMatchLen of - true -> + true -> %% Continue with the new routes and matching len installed find_routes_(T, Service, Routes, MatchLen); @@ -203,9 +203,10 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen ) find_routes_(Rt, _Svc, CurRoutes, CurMatchLen) -> ?warning("rvi_routing(): Incorrect route entry: ~p", [Rt]), { CurRoutes, CurMatchLen }. - + find_routes(Routes, Service) -> + ?debug("find_routes(~p, ~p)", [Routes, Service]), case find_routes_(Routes, Service, undefined, 0) of { undefined, 0 } -> ?debug("rvi_routing(): ~p -> unknown", [ Service]), @@ -213,7 +214,7 @@ find_routes(Routes, Service) -> { MatchRoutes, _MatchLen } -> ?debug("rvi_routing(): ~p -> ~p", [ Service, MatchRoutes ]), - normalize_routes_(MatchRoutes, []) + normalize_routes_(MatchRoutes, []) end. @@ -225,13 +226,13 @@ normalize_routes_([], Acc) -> lists:reverse(Acc); normalize_routes_([ {{ Pr, PrOp }, { DL, DLOp }} | Rem ], Acc) -> - normalize_routes_( Rem, [ {{Pr, PrOp}, { DL, DLOp } } | Acc]); + normalize_routes_( Rem, [ {{Pr, PrOp}, { DL, DLOp } } | Acc]); normalize_routes_([ { Pr, { DL, DLOp }} | Rem ], Acc) -> - normalize_routes_(Rem, [ { {Pr, []}, { DL, DLOp } } | Acc]); + normalize_routes_(Rem, [ { {Pr, []}, { DL, DLOp } } | Acc]); normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) -> - normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]); + normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]); normalize_routes_([ {Pr, DL} | Rem ], Acc) -> normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]). @@ -242,11 +243,11 @@ find_protocols_(_DataLink, [], Acc ) -> %% Matching data link. This is an allowed protocol -find_protocols_(DataLink, [ {{ Pr, PrOp }, { DL, DLOp }} | T], +find_protocols_(DataLink, [ {{ Pr, PrOp }, { DL, DLOp }} | T], Acc) when DataLink =:= DL -> find_protocols_(DataLink, T, [ { Pr, PrOp, DLOp } | Acc ]); - + %% No match find_protocols_(DataLink, [ {{ _Pr, _PrOp }, { _DL, _DLOp }} | T], Acc) -> @@ -258,4 +259,3 @@ find_protocols(AllRoutes, Service, DataLink) -> Res = find_protocols_(DataLink, SvcRoutes, []), ?debug("find_protocols(~p:~p): -> ~p", [ DataLink, Service, Res]), Res. - diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl index bf19dda..b66bdd1 100644 --- a/components/schedule/src/schedule_rpc.erl +++ b/components/schedule/src/schedule_rpc.erl @@ -153,7 +153,7 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) -> %% JSON-RPC entry point %% CAlled by local exo http server -handle_rpc("schedule_message", Args) -> +handle_rpc(<<"schedule_message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index 04c897d..b4f14ab 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -46,7 +46,7 @@ }). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -record(st, { %% Component specification @@ -58,13 +58,13 @@ start_link() -> init([]) -> ?info("svc_disc:init(): called."), - ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table, + ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table, { keypos, #service_entry.service }]), - ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table, + ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table, { keypos, #service_entry.data_link_mod }]), - ets:new(?SUBSCRIBER_TABLE, [set, public, named_table, + ets:new(?SUBSCRIBER_TABLE, [set, public, named_table, { keypos, #subscriber_entry.module }]), {ok, #st { cs = rvi_common:get_component_specification() } }. @@ -74,19 +74,19 @@ start_json_server() -> rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup). get_all_services(CompSpec) -> - rvi_common:request(service_discovery, ?MODULE, + rvi_common:request(service_discovery, ?MODULE, get_all_services, [], [status, services], CompSpec). get_services_by_module(CompSpec, DataLinkMod) -> - rvi_common:request(service_discovery, ?MODULE, - get_services_by_module, - [ { data_link_module, DataLinkMod }], + rvi_common:request(service_discovery, ?MODULE, + get_services_by_module, + [ { data_link_module, DataLinkMod }], [status, services], CompSpec). get_modules_by_service(CompSpec, Service) -> - rvi_common:request(service_discovery, ?MODULE, + rvi_common:request(service_discovery, ?MODULE, get_modules_by_service, - [ { service, Service }], + [ { service, Service }], [status, modules], CompSpec). @@ -95,7 +95,7 @@ register_services(CompSpec, Services, DataLinkModule) -> ?debug(" CompSpec : ~p", [CompSpec]), ?debug(" Services : ~p", [Services]), ?debug(" DataLinkMod : ~p", [DataLinkModule]), - rvi_common:notification(service_discovery, ?MODULE, register_services, + rvi_common:notification(service_discovery, ?MODULE, register_services, [{ services, Services }, { data_link_module, DataLinkModule }], CompSpec). @@ -105,55 +105,59 @@ unregister_services(CompSpec, Services, DataLinkModule) -> ?debug(" CompSpec : ", [CompSpec]), ?debug(" Services : ", [Services]), ?debug(" DataLinkMod : ", [DataLinkModule]), - rvi_common:notification(service_discovery, ?MODULE, unregister_services, + rvi_common:notification(service_discovery, ?MODULE, unregister_services, [{ services, Services }, { data_link_module, DataLinkModule}], CompSpec). subscribe(CompSpec, SubscribingMod) -> - rvi_common:notification(service_discovery, ?MODULE, subscribe, - [ { subscribing_module, SubscribingMod }], + rvi_common:notification(service_discovery, ?MODULE, subscribe, + [ { subscribing_module, SubscribingMod }], CompSpec). unsubscribe(CompSpec, SubscribingMod) -> - rvi_common:notification(service_discovery, ?MODULE, unsubscribe, - [ { subscribing_module, SubscribingMod }], + rvi_common:notification(service_discovery, ?MODULE, unsubscribe, + [ { subscribing_module, SubscribingMod }], CompSpec). - + %% JSON-RPC entry point %% Called by local exo http server %% Register remote services handle_notification("register_services", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), - gen_server:cast(?SERVER, { rvi, register_services, - [ Services, list_to_atom(DataLinkModule) ]}), + gen_server:cast(?SERVER, { rvi, register_services, + [ Services, list_to_atom(DataLinkModule), LogId ]}), ok; handle_notification("unregister_services", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, DataLinkModule } = rvi_common:get_json_element(["data_link_module"], Args), - gen_server:cast(?SERVER, { rvi, unregister_services, - [ Services, list_to_atom(DataLinkModule) ]}), + gen_server:cast(?SERVER, { rvi, unregister_services, + [ Services, list_to_atom(DataLinkModule), LogId ]}), ok; handle_notification("subscribe", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args), %% De-register service - gen_server:cast(?SERVER, { rvi, subscribe, [ list_to_atom(Module) ]}), + gen_server:cast(?SERVER, { rvi, subscribe, [ list_to_atom(Module), LogId ]}), ok; handle_notification("unsubscribe_from_service", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args), %% De-register service - gen_server:cast(?SERVER, { rvi, unsubscribe, [ list_to_atom(Module) ]}), + gen_server:cast(?SERVER, { rvi, unsubscribe, [ list_to_atom(Module), LogId ]}), ok; handle_notification( Other, _Args) -> @@ -164,36 +168,39 @@ handle_notification( Other, _Args) -> %% %% Get all services %% -handle_rpc("get_all_services", _Args) -> +handle_rpc("get_all_services", Args) -> ?debug("svc_disc:get_all_services(json-rpc)"), - [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services, []}), - {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]}; + LogId = rvi_common:get_json_log_id(Args), + [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services, [LogId]}), + {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, Services }]}; handle_rpc("get_services_by_module", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, DataLinkMod } = rvi_common:get_json_element(["data_link_module"], Args), ?debug("svc_disc:get_services_by_module(json-rpc): ~p ", [DataLinkMod]), - [ok, Services ] = gen_server:call(?SERVER, - { rvi, - get_services_by_module, - [DataLinkMod]}), + [ok, Services ] = gen_server:call(?SERVER, + { rvi, + get_services_by_module, + [DataLinkMod, LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]}; handle_rpc("get_modules_by_service", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Service } = rvi_common:get_json_element(["service"], Args), ?debug("svc_disc:get_modules_by_service(json-rpc): ~p ", [Service]), - [ok, Modules ] = gen_server:call(?SERVER, + [ok, Modules ] = gen_server:call(?SERVER, { rvi, get_modules_by_service, - [Service]}), + [Service, LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} , { modules, { array, Modules } }]}; -%% +%% %% Handle the rest. %% handle_rpc( Other, _Args) -> @@ -211,17 +218,17 @@ handle_call(Req, From, St) -> end. handle_call_({rvi, get_all_services, _Args}, _From, St) -> - Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> - [ ServiceName | Acc ] end, + Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> + [ ServiceName | Acc ] end, [], ?SERVICE_TABLE), {reply, [ok, Svcs], St }; -handle_call_({rvi, get_services_by_module, [Module]}, _From, St) -> +handle_call_({rvi, get_services_by_module, [Module | _LogId]}, _From, St) -> {reply, [ok, get_services_by_module_(Module)], St }; -handle_call_({rvi, get_modules_by_service, [Service]}, _From, St) -> +handle_call_({rvi, get_modules_by_service, [Service | _LogId]}, _From, St) -> {reply, [ok, get_modules_by_service_(Service)], St }; @@ -231,7 +238,7 @@ handle_call_(Other, _From, St) -> -handle_cast({rvi, subscribe, [ SubsMod] }, St) -> +handle_cast({rvi, subscribe, [ SubsMod | _LogId ] }, St) -> %% Insert new entry, or replace existing one ets:insert(?SUBSCRIBER_TABLE, #subscriber_entry { module = SubsMod}), @@ -240,14 +247,14 @@ handle_cast({rvi, subscribe, [ SubsMod] }, St) -> { noreply, St}; -handle_cast({rvi, unsubscribe, [ SubsMod] }, St) -> +handle_cast({rvi, unsubscribe, [ SubsMod | _LogId ] }, St) -> ets:delete(?SUBSCRIBER_TABLE, SubsMod), { noreply, St}; %% Handle calls received through regular gen_server calls, routed by %% rvi_common:request() -handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) -> +handle_cast({rvi, register_services, [Services, DataLinkModule | _LogId ] }, St) -> ?info("svc_disc:register_services(): ~p:~p", [DataLinkModule, Services]), @@ -255,15 +262,15 @@ handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) -> %% Notify all subscribers notify_subscribers(St#st.cs, - available, - Services, + available, + Services, DataLinkModule), {noreply, St }; %% Handle calls received through regular gen_server calls, routed by %% rvi_common:request() -handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) -> +handle_cast({rvi, unregister_services, [Services, DataLinkModule | _LogId ] }, St) -> ?info("svc_disc:unregister_services(): ~p:~p", [DataLinkModule, Services]), @@ -272,8 +279,8 @@ handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) -> %% Notify all subscribers notify_subscribers(St#st.cs, - unavailable, - Services, + unavailable, + Services, DataLinkModule), @@ -295,30 +302,30 @@ code_change(_OldVsn, St, _Extra) -> register_single_service_(Service, DataLinkModule) -> - ?info("svc_disc:register_single_service_(~p:~p)", + ?info("svc_disc:register_single_service_(~p:~p)", [DataLinkModule,Service]), %% Delete any previous instances of the given entry, in case %% the service registers multiple times - ets:match_delete(?MODULE_TABLE, - #service_entry { - service = Service, - data_link_mod = DataLinkModule + ets:match_delete(?MODULE_TABLE, + #service_entry { + service = Service, + data_link_mod = DataLinkModule }), - ets:match_delete(?SERVICE_TABLE, - #service_entry { - service = Service, - data_link_mod = DataLinkModule + ets:match_delete(?SERVICE_TABLE, + #service_entry { + service = Service, + data_link_mod = DataLinkModule }), - ets:insert(?SERVICE_TABLE, + ets:insert(?SERVICE_TABLE, #service_entry { service = Service, data_link_mod = DataLinkModule }), - ets:insert(?MODULE_TABLE, + ets:insert(?MODULE_TABLE, #service_entry { service = Service, data_link_mod = DataLinkModule @@ -330,21 +337,21 @@ register_single_service_(Service, DataLinkModule) -> unregister_single_service_(Service, DataLinkModule) -> - ?info("svc_disc:unregister_single_service_(): ~p:~p", + ?info("svc_disc:unregister_single_service_(): ~p:~p", [DataLinkModule, Service]), %% Delete any service table entries with a matching Service. - ets:match_delete(?SERVICE_TABLE, - #service_entry { + ets:match_delete(?SERVICE_TABLE, + #service_entry { service = Service, - data_link_mod = DataLinkModule + data_link_mod = DataLinkModule }), %% Delete any remote address table entries with a matching Service. - ets:match_delete(?MODULE_TABLE, - #service_entry { + ets:match_delete(?MODULE_TABLE, + #service_entry { service = Service, - data_link_mod = DataLinkModule + data_link_mod = DataLinkModule }), ok. @@ -355,14 +362,14 @@ unregister_single_service_(Service, DataLinkModule) -> get_modules_by_service_(Service) -> ModMatch = ets:lookup(?SERVICE_TABLE, Service), - - ModNames = lists:foldl(fun(#service_entry { - data_link_mod = Mod - }, Acc) -> + + ModNames = lists:foldl(fun(#service_entry { + data_link_mod = Mod + }, Acc) -> [ Mod | Acc ] end, [], ModMatch), - - + + ?debug("svc_disc:get_modules_by_service_(): ~p -> ~p", [ Service, ModNames ]), @@ -373,10 +380,10 @@ get_modules_by_service_(Service) -> get_services_by_module_(Module) -> SvcMatch = ets:lookup(?MODULE_TABLE, Module), - - SvcNames = lists:foldl(fun(#service_entry { - service = Svc - }, Acc) -> + + SvcNames = lists:foldl(fun(#service_entry { + service = Svc + }, Acc) -> [ Svc | Acc ] end, [], SvcMatch), @@ -389,20 +396,20 @@ notify_single_subscriber(_CompSpec, '$end_of_table', _SubsFun, _DataLinkModule, _Services) -> ok; -notify_single_subscriber(CompSpec, SubsModule, SubsFun, +notify_single_subscriber(CompSpec, SubsModule, SubsFun, DataLinkModule, Services) -> %% Invoke subscriber for each service that has been updated. - ?debug("notify_single_subscriber(~p:~p) ~p:~p()", + ?debug("notify_single_subscriber(~p:~p) ~p:~p()", [SubsModule, SubsFun, DataLinkModule, Services ]), [ SubsModule:SubsFun(CompSpec, SvcName, DataLinkModule) || SvcName <- Services], %% Move on to the next subscribing module - notify_single_subscriber(CompSpec, + notify_single_subscriber(CompSpec, ets:next(?SUBSCRIBER_TABLE, SubsModule), SubsFun, DataLinkModule, Services). -notify_subscribers(CompSpec, Available, Services, DataLinkModule) -> +notify_subscribers(CompSpec, Available, Services, DataLinkModule) -> ?debug("notify_subscribers(~p:~p) ~p", [ DataLinkModule, Services, Available]), @@ -415,16 +422,16 @@ notify_subscribers(CompSpec, Available, Services, DataLinkModule) -> ets:foldl( %% Notify if this is not the originating service. - fun(#subscriber_entry { module = Module }, Acc) -> + fun(#subscriber_entry { module = Module }, _Acc) -> ?debug(" notify_subscribers module: ~p ", [ Module]), ok end, ok, ?SUBSCRIBER_TABLE), %% Initiate with the first module - notify_single_subscriber(CompSpec, - ets:first(?SUBSCRIBER_TABLE), - Fun, - DataLinkModule, + notify_single_subscriber(CompSpec, + ets:first(?SUBSCRIBER_TABLE), + Fun, + DataLinkModule, Services). @@ -438,15 +445,20 @@ initial_notification(CompSpec, SubsMod, Service) -> [] -> %% Yanked ok; - [#service_entry { data_link_mod = DataLinkMod }] -> + [#service_entry { data_link_mod = DataLinkMod }] -> SubsMod:service_available(CompSpec, Service, DataLinkMod) end, %% Move on to the next service initial_notification(CompSpec, SubsMod, ets:next(?SERVICE_TABLE, Service)). - - + + initial_notification(CompSpec, SubsMod) -> initial_notification(CompSpec, SubsMod, ets:first(?SERVICE_TABLE)), ok. + +log([ID], Fmt, Args) -> + rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args)); +log(_, _, _) -> + ok. diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 2cf9c3a..b3ed684 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -30,6 +30,10 @@ -export([start_json_server/0, start_websocket/0]). +%% exo_socket authentication callbacks +-export([authenticate/3, + incoming/2]). + %% Invoked by service discovery %% FIXME: Should be rvi_service_discovery behavior -export([service_available/3, @@ -84,10 +88,22 @@ init([]) -> start_json_server() -> + Allowed = get_allowed(), ?debug("service_edge_rpc:start_json_server()"), + Opts = [{exo_socket, + [{auth, + [{role, server}, + {server, + [{mod, ?MODULE}, + {allowed, Allowed} + ]} + ]} + ]} + ], case rvi_common:start_json_rpc_server(service_edge, ?MODULE, - service_edge_sup) of + service_edge_sup, + Opts) of ok -> ok; @@ -97,6 +113,38 @@ start_json_server() -> Err end. +get_allowed() -> + Allowed = case rvi_common:get_module_config( + service_edge, service_edge_rpc, allowed) of + {ok, L} -> L; + {error, _} -> [{127,0,0,1}] + end, + ?debug("get_allowed(); Allowed = ~p", [Allowed]), + lists:flatmap( + fun(Addr) -> + case inet:ip(Addr) of + {ok, IP} -> [IP]; + {error, _} -> [] + end + end, Allowed). + +authenticate(X, Role, Opts) -> + {ok, {PeerIP,_}} = exo_socket:peername(X), + ?debug("authenticate(~p, ~p, ~p)~nPeer = ~p~n", [X, Role, Opts, PeerIP]), + case lists:keyfind(allowed, 1, Opts) of + {_, Allowed} -> + case lists:member(PeerIP, Allowed) of + true -> + {ok, Opts}; + false -> + error + end; + false -> + {ok, Opts} + end. + +incoming(Data, _) -> + Data. start_websocket() -> %% @@ -356,6 +404,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), FullSvcName = rvi_common:local_service_to_string(SvcName), + CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs), ?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]), ets:insert(?SERVICE_TABLE, #service_entry { @@ -364,7 +413,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> %% Register with service discovery, will trigger callback to service_available() %% that forwards the registration to other connected services. - service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local), + service_discovery_rpc:register_services(CS, [FullSvcName], local), %% Return ok. @@ -378,7 +427,8 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) -> %% Register with service discovery, will trigger callback to service_available() %% that forwards the registration to other connected services. - service_discovery_rpc:unregister_services(St#st.cs, [SvcName], local), + CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs), + service_discovery_rpc:unregister_services(CS, [SvcName], local), %% Return ok. { reply, [ ok ], St }; @@ -401,18 +451,20 @@ handle_call({ rvi, handle_local_message, ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), + CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs), %% %% Authorize local message and retrieve a certificate / signature %% that will be accepted by the receiving node that will deliver %% the messaage to its locally connected service_name service. %% + ?debug("CS = ~p", [lager:pr(CS, rvi_common)]), [ok, Signature ] = authorize_rpc:authorize_local_message( - St#st.cs, SvcName, [{service_name, SvcName}, - {timeout, TimeoutArg}, - %% {parameters, Parameters}, - {parameters, Parameters} - ]), + CS, SvcName, [{service_name, SvcName}, + {timeout, TimeoutArg}, + %% {parameters, Parameters}, + {parameters, Parameters} + ]), %% %% Slick but ugly. @@ -456,16 +508,18 @@ handle_call({ rvi, handle_local_message, case LookupRes of [ #service_entry { url = URL } ] -> %% SvcName is local. Forward message ?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."), + log("dispatch to ~s", [URL], CS), Res = forward_message_to_local_service(URL, SvcName, Parameters, - St#st.cs), + CS), { reply, Res , St}; _ -> %% SvcName is remote %% Ask Schedule the request to resolve the network address ?debug("service_edge_rpc:local_msg(): Service is remote. Scheduling."), - [ _, TID ] = schedule_rpc:schedule_message(St#st.cs, + log("schedule message (~s)", [SvcName], CS), + [ _, TID ] = schedule_rpc:schedule_message(CS, SvcName, Timeout, Parameters, @@ -481,13 +535,15 @@ handle_call(Other, _From, St) -> handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) -> - ?debug("service_edge_rpc: Service available: ~p:", [ SvcName]), + ?debug("service_edge_rpc: Service available: ~p", [ SvcName]), + start_log(<<"svc">>, "service_available: ~s", [SvcName], St#st.cs), announce_service_availability(available, SvcName), { noreply, St }; handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) -> ?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]), + start_log(<<"rsvc">>, "service_unavailable: ~s", [SvcName], St#st.cs), announce_service_availability(unavailable, SvcName), { noreply, St }; @@ -627,7 +683,7 @@ dispatch_to_local_service(URL, Command, Args) -> %% Forward a message to a specific locally connected service. %% Called by forward_message_to_local_service/2. %% -forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) -> +forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) -> ?debug("service_edge:forward_to_local(): URL: ~p", [URL]), ?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]), @@ -644,14 +700,22 @@ forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) -> %% Deliver the message to the local service, which can %% be either a wse websocket, or a regular HTTP JSON-RPC call spawn(fun() -> - rvi_common:get_request_result( - dispatch_to_local_service(URL, - message, - [{<<"service_name">>, LocalSvcName }, - {<<"parameters">>, Parameters }])) + log_outcome( + rvi_common:get_request_result( + dispatch_to_local_service(URL, + message, + [{<<"service_name">>, LocalSvcName }, + {<<"parameters">>, Parameters }])), + SvcName, CompSpec) end), [ ok, -1 ]. +log_outcome({Status, _}, _SvcName, CS) -> + log("result: ~w", [Status], CS); +log_outcome(Other, _SvcName, CS) -> + log("unexpected: ~w", [Other], CS). + + announce_service_availability(Available, SvcName) -> Cmd = case Available of @@ -697,3 +761,12 @@ announce_service_availability(Available, SvcName) -> Acc end end, BlockURLs, ?SERVICE_TABLE). + + +start_log(Pfx, Fmt, Args, CS) -> + LogTID = rvi_log:new_id(Pfx), + rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogTID, CS). + +log(Fmt, Args, CS) -> + rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS). |