From 6cfeffca9f8e93e45dd885702a77896e2a1d0951 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Sun, 6 Dec 2015 13:54:17 -0800 Subject: new protocol & setup scripts --- components/authorize/src/authorize_keys.erl | 343 +++++++++++++-------- components/authorize/src/authorize_rpc.erl | 146 ++++----- components/dlink/src/dlink.app.src | 6 +- components/dlink/src/dlink_data_json.erl | 60 ++-- components/dlink/src/dlink_data_msgpack.erl | 15 +- components/dlink/src/dlink_data_rvi.erl | 123 -------- components/dlink_bt/src/bt_connection.erl | 144 ++++++--- components/dlink_bt/src/bt_connection_manager.erl | 1 + components/dlink_bt/src/bt_listener.erl | 64 ++-- components/dlink_bt/src/dlink_bt.app.src | 2 +- components/dlink_bt/src/dlink_bt_rpc.erl | 296 ++++++++---------- components/dlink_sms/src/dlink_sms.app.src | 6 +- components/dlink_tcp/src/connection.erl | 110 ++++--- components/dlink_tcp/src/dlink_tcp.app.src | 5 +- components/dlink_tcp/src/dlink_tcp_rpc.erl | 207 ++++--------- components/dlink_tcp/src/listener.erl | 4 +- components/dlink_tls/src/dlink_tls.app.src | 3 +- components/dlink_tls/src/dlink_tls_conn.erl | 159 ++++++---- components/dlink_tls/src/dlink_tls_rpc.erl | 167 +++++----- components/rvi_common/include/rvi_dlink_bin.hrl | 40 +-- components/rvi_common/src/exoport_exo_http.erl | 25 +- components/rvi_common/src/rvi_common.erl | 21 +- components/rvi_common/src/rvi_log.erl | 201 +++++++++++- .../src/service_discovery_rpc.erl | 2 +- components/service_edge/src/service_edge_rpc.erl | 55 +++- 25 files changed, 1198 insertions(+), 1007 deletions(-) delete mode 100644 components/dlink/src/dlink_data_rvi.erl (limited to 'components') diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index 70bb021..a362c7b 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -2,24 +2,25 @@ -behaviour(gen_server). -export([get_key_pair/0, + get_device_key/0, get_key_pair_from_pem/2, get_pub_key/1, - authorize_jwt/0, provisioning_key/0, signed_public_key/2, save_keys/2, - save_cert/4]). --export([get_certificates/0, - get_certificates/1]). + save_cred/5]). +-export([get_credentials/0, + get_credentials/1]). -export([validate_message/2, validate_service_call/2]). -export([filter_by_service/2, - find_cert_by_service/1]). + find_cred_by_service/1]). -export([public_key_to_json/1, json_to_public_key/1]). -export([self_signed_public_key/0]). % just temporary -export([pp_key/1, + abbrev/1, abbrev_bin/1, abbrev_payload/1, abbrev_jwt/1]). @@ -36,20 +37,28 @@ -include_lib("public_key/include/public_key.hrl"). -record(st, {provisioning_key, - cert_dir, - authorize_jwt}). - --record(cert, {id, - register = [], - invoke = [], + dev_cert, + cred_dir}). + +%% -record(cert, {id, +%% register = [], +%% invoke = [], +%% validity = [], +%% jwt, +%% cert}). + +-record(cred, {id, + right_to_register = [], + right_to_invoke = [], validity = [], + device_cert, jwt, - cert}). + cred}). -record(key, {id, key}). --define(CERTS, authorize_certs). +-define(CREDS, authorize_creds). -define(KEYS, authorize_keys). public_key_to_json(#'RSAPublicKey'{modulus = N, publicExponent = E}) -> @@ -101,8 +110,15 @@ get_key_pair() -> get_key_pair_from_pem(openssl, Pem) end. -authorize_jwt() -> - gen_server:call(?MODULE, authorize_jwt). +get_device_key() -> + case get_env(device_key) of + undefined -> + {undefined, undefined}; + [_|_] = File -> + get_key_pair_from_pem(openssl, File); + {openssl_pem, Pem} -> + get_key_pair_from_pem(openssl, Pem) + end. validate_message(JWT, Conn) -> gen_server:call(?MODULE, {validate_message, JWT, Conn}). @@ -110,17 +126,17 @@ validate_message(JWT, Conn) -> validate_service_call(Service, Conn) -> gen_server:call(?MODULE, {validate_service_call, Service, Conn}). -get_certificates() -> - get_certificates(local). +get_credentials() -> + get_credentials(local). -get_certificates(Conn) -> - gen_server:call(?MODULE, {get_certificates, Conn}). +get_credentials(Conn) -> + gen_server:call(?MODULE, {get_credentials, Conn}). filter_by_service(Services, Conn) -> gen_server:call(?MODULE, {filter_by_service, Services, Conn}). -find_cert_by_service(Service) -> - gen_server:call(?MODULE, {find_cert_by_service, Service}). +find_cred_by_service(Service) -> + gen_server:call(?MODULE, {find_cred_by_service, Service}). provisioning_key() -> gen_server:call(?MODULE, provisioning_key). @@ -128,8 +144,8 @@ provisioning_key() -> save_keys(Keys, Conn) -> gen_server:call(?MODULE, {save_keys, Keys, Conn}). -save_cert(Cert, JWT, Conn, LogId) -> - gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn, LogId}). +save_cred(Cred, JWT, Conn, PeerCert, LogId) -> + gen_server:call(?MODULE, {save_cred, Cred, JWT, Conn, PeerCert, LogId}). %% Gen_server functions @@ -137,7 +153,7 @@ start_link() -> create_ets(), case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of {ok, Pid} = Ok -> - ets:give_away(?CERTS, Pid, undefined), + ets:give_away(?CREDS, Pid, undefined), ets:give_away(?KEYS, Pid, undefined), Ok; Other -> @@ -145,18 +161,19 @@ start_link() -> end. init([]) -> - ProvisioningKey = get_pub_key(get_env(provisioning_key)), + ProvisioningKey = get_pub_key_from_cert(get_env(root_cert)), ?debug("ProvisioningKey = ~s~n", [pp_key(ProvisioningKey)]), - CertDir = setup:verify_dir(get_env(cert_dir)), - {ok, AuthJwt0} = file:read_file(get_env(authorize_jwt)), - AuthJwt = strip_nl(AuthJwt0), - ?debug("CertDir = ~p~n", [CertDir]), - Certs = scan_certs(CertDir, ProvisioningKey), - ?debug("scan_certs found ~p certificates~n", [length(Certs)]), - [ets:insert(?CERTS, {{local, C#cert.id}, C}) || C <- Certs], + {ok, DevCertBin} = file:read_file(get_env(device_cert)), + DevCert = strip_pem_begin_end(DevCertBin), + ?debug("Own DevCert = ~w", [abbrev_bin(DevCert)]), + CredDir = setup:verify_dir(get_env(cred_dir)), + ?debug("CredDir = ~p~n", [CredDir]), + Creds = scan_creds(CredDir, ProvisioningKey, DevCert), + ?debug("scan_creds found ~p credentials~n", [length(Creds)]), + [ets:insert(?CREDS, {{local, C#cred.id}, C}) || C <- Creds], {ok, #st{provisioning_key = ProvisioningKey, - cert_dir = CertDir, - authorize_jwt = AuthJwt}}. + dev_cert = DevCert, + cred_dir = CredDir}}. handle_call(Req, From, S) -> try handle_call_(Req, From, S) @@ -167,13 +184,11 @@ handle_call(Req, From, S) -> {reply, error, S} end. -handle_call_(authorize_jwt, _, S) -> - {reply, S#st.authorize_jwt, S}; handle_call_(provisioning_key, _, S) -> {reply, S#st.provisioning_key, S}; -handle_call_({get_certificates, Conn}, _, S) -> - Certs = certs_by_conn(Conn), - {reply, Certs, S}; +handle_call_({get_credentials, Conn}, _, S) -> + Creds = creds_by_conn(Conn), + {reply, Creds, S}; handle_call_({save_keys, Keys, Conn}, _, S) -> ?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]), save_keys_(Keys, Conn), @@ -182,14 +197,14 @@ handle_call_({validate_message, JWT, Conn}, _, S) -> {reply, validate_message_(JWT, Conn), S}; handle_call_({validate_service_call, Svc, Conn}, _, S) -> {reply, validate_service_call_(Svc, Conn), S}; -handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) -> - case process_cert_struct(Cert, JWT) of +handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) -> + case process_cred_struct(Cred, JWT, PeerCert) of invalid -> - log(LogId, "cert INVALID Conn=~s:~w", [IP, Port]), + log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]), {reply, {error, invalid}, S}; - #cert{} = C -> - ets:insert(?CERTS, {{Conn, C#cert.id}, C}), - log(LogId, "cert stored ~s Conn=~s:~w", [abbrev_bin(C#cert.id), IP, Port]), + #cred{} = C -> + ets:insert(?CREDS, {{Conn, C#cred.id}, C}), + log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]), {reply, ok, S} end; handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> @@ -197,9 +212,9 @@ handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> Filtered = filter_by_service_(Services, Conn), ?debug("Filtered = ~p~n", [Filtered]), {reply, Filtered, State}; -handle_call_({find_cert_by_service, Service} = R, _From, State) -> +handle_call_({find_cred_by_service, Service} = R, _From, State) -> ?debug("authorize_keys:handle_call(~p,...)~n", [R]), - Res = find_cert_by_service_(Service), + Res = find_cred_by_service_(Service), ?debug("Res = ~p~n", [case Res of {ok,{A,B}} -> {ok,{A,abbrev_bin(B)}}; _ -> Res end]), {reply, Res, State}; handle_call_(_, _, S) -> @@ -219,28 +234,28 @@ code_change(_FromVsn, S, _Extra) -> %% Local functions -certs_by_conn(Conn) -> - ?debug("certs_by_conn(~p)~n", [Conn]), +creds_by_conn(Conn) -> + ?debug("creds_by_conn(~p)~n", [Conn]), UTC = rvi_common:utc_timestamp(), - Certs = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{jwt = '$1', + Creds = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{jwt = '$1', validity = '$2', _='_'}}, [], [{{'$1', '$2'}}] }]), - ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]), - [C || {C,V} <- Certs, check_validity(V, UTC)]. + ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]), + [C || {C,V} <- Creds, check_validity(V, UTC)]. -cert_recs_by_conn(Conn) -> - ?debug("cert_recs_by_conn(~p)~n", [Conn]), +cred_recs_by_conn(Conn) -> + ?debug("cred_recs_by_conn(~p)~n", [Conn]), UTC = rvi_common:utc_timestamp(), - Certs = ets:select(?CERTS, [{ {{Conn,'_'}, '$1'}, + Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'}, [], ['$1'] }]), - ?debug("rough selection: ~p~n", [[abbrev_bin(C#cert.id) || C <- Certs]]), - [C || C <- Certs, check_validity(C#cert.validity, UTC)]. + ?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]), + [C || C <- Creds, check_validity(C#cred.validity, UTC)]. filter_by_service_(Services, Conn) -> - ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]), - Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1', - _ = '_'}}, + ?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]), + Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1', + _ = '_'}}, [], ['$1'] }]), ?debug("Services by conn (~p) -> ~p~n", [Conn, Invoke]), filter_svcs_(Services, Invoke). @@ -260,25 +275,25 @@ filter_svcs_([S|Svcs], Invoke) -> filter_svcs_([], _) -> []. -find_cert_by_service_(Service) -> +find_cred_by_service_(Service) -> SvcParts = split_path(strip_prot(Service)), - LocalCerts = ets:select(?CERTS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]), - ?debug("find_cert_by_service(~p~nLocalCerts = ~p~n", - [Service, [{Id,Reg,Inv} || #cert{id = Id, - invoke = Inv, - register = Reg} <- LocalCerts]]), + LocalCreds = ets:select(?CREDS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]), + ?debug("find_creds_by_service(~p~nLocalCreds = ~p~n", + [Service, [{Id,Reg,Inv} || #cred{id = Id, + right_to_invoke = Inv, + right_to_register = Reg} <- LocalCreds]]), case lists:foldl( - fun(#cert{register = Register} = C, {Max, _} = Acc) -> + fun(#cred{right_to_register = Register} = C, {Max, _} = Acc) -> case match_length(Register, SvcParts) of L when L > Max -> {L, C}; _ -> Acc end - end, {0, none}, LocalCerts) of + end, {0, none}, LocalCreds) of {0, none} -> {error, not_found}; - {_, #cert{id = Id, jwt = JWT}} -> + {_, #cred{id = Id, jwt = JWT}} -> {ok, {Id, JWT}} end. @@ -329,7 +344,7 @@ match_svc_(_, _) -> false. get_env(K) -> - Res = case application:get_env(rvi_core, K) of + Res = case setup:get_env(rvi_core, K) of {ok, V} -> V; _ -> undefined end, @@ -382,8 +397,32 @@ get_openssl_pub_key(File) -> undefined end. +get_pub_key_from_cert(File) -> + case file:read_file(File) of + {ok, Bin} -> + get_pub_key_from_cert_rec( + public_key:pem_entry_decode(hd(public_key:pem_decode(Bin)))); + Error -> + error({cannot_read_cert, [Error, {file, File}]}) + end. + +get_pub_key_from_cert_rec(#'Certificate'{ + tbsCertificate = + #'TBSCertificate'{ + subjectPublicKeyInfo = + #'SubjectPublicKeyInfo'{ + algorithm = + #'AlgorithmIdentifier'{ + algorithm = Algo}, + subjectPublicKey = Key}}}) -> + 'RSAPublicKey' = KeyAlg = pubkey_cert_records:supportedPublicKeyAlgorithms(Algo), + public_key:der_decode(KeyAlg, Key). + + + + create_ets() -> - create_ets(?CERTS, #cert.id), + create_ets(?CREDS, #cred.id), create_ets(?KEYS, #key.id). create_ets(Tab, KeyPos) -> @@ -396,42 +435,46 @@ create_ets(Tab, KeyPos) -> true end. -scan_certs(Dir, Key) -> - UTC = rvi_common:utc_timestamp(), - case file:list_dir(Dir) of - {ok, Fs} -> - lists:foldl( +scan_creds(Dir, Key, Cert) -> + case filelib:is_dir(Dir) of + true -> + UTC = rvi_common:utc_timestamp(), + filelib:fold_files( + Dir, + _FileRegexp = "\\.jwt$", + _Recursive = false, fun(F, Acc) -> - process_cert(filename:join(Dir, F), Key, UTC, Acc) - end, [], Fs); - Error -> - ?warning("Cannot read certs (~p): ~p~n", [Dir, Error]), - ok + process_cred(F, Key, Cert, UTC, Acc) + end, + []); + false -> + ?warning("Cannot read creads: ~p not a directory", [Dir]), + [] end. -process_cert(F, Key, UTC, Acc) -> +process_cred(F, Key, Cert, UTC, Acc) -> case file:read_file(F) of {ok, Bin} -> try authorize_sig:decode_jwt(strip_nl(Bin), Key) of - {_, Cert} -> - ?info("Unpacked Cert ~p:~n~p~n", [F, abbrev_payload(Cert)]), - case process_cert_struct(Cert, Bin, UTC) of + {_, Cred} -> + ?info("Unpacked Cred ~p:~n~p~n", [F, abbrev_payload(Cred)]), + case process_cred_struct(Cred, Bin, UTC, Cert) of invalid -> Acc; - #cert{} = C -> + #cred{} = C -> [C|Acc] end; invalid -> - ?warning("Invalid cert: ~p~n", [F]), + ?warning("Invalid cred: ~p~n", [F]), Acc catch error:Exception -> - ?warning("Cert validation failure (~p): ~p~n", + ?warning("Cred validation failure (~p): ~p~n", [F, Exception]), Acc end; Error -> - ?warning("Cannot read cert ~p: ~p~n", [F, Error]), + ?warning("Cannot read cred ~p: ~p~n", [F, Error]), Acc end. @@ -441,52 +484,76 @@ strip_nl(Bin) -> _ -> Bin end. -process_cert_struct(Cert, Bin) -> - process_cert_struct(Cert, Bin, rvi_common:utc_timestamp()). +strip_pem_begin_end(<<"-----BEGIN CERTIFICATE-----\n", Bin/binary>>) -> + re:replace(hd(re:split(Bin, <<"-----END CERTIFICATE-----">>, [])), + <<"\\v">>, <<>>, [global,{return,binary}]); +strip_pem_begin_end(Bin) -> + Bin. + + +process_cred_struct(Cred, Bin, Cert) -> + process_cred_struct(Cred, Bin, rvi_common:utc_timestamp(), to_pem(Cert)). + +to_pem(Cert) when is_binary(Cert) -> + %% The Peer cert is assumed to be DER encoded. + %% The cert in the cred is PEM-encoded + PEMCert = public_key:pem_encode([{'Certificate', Cert, not_encrypted}]), + strip_pem_begin_end(PEMCert); +to_pem(Other) -> + Other. + -process_cert_struct(Cert, Bin, UTC) -> - try process_cert_struct_(Cert, Bin, UTC) +process_cred_struct(Cred, Bin, UTC, Cert) -> + try process_cred_struct_(Cred, Bin, UTC, Cert) catch error:Err -> - ?warning("Failure processing Cert ~p~n~p", - [Cert, {Err, erlang:get_stacktrace()}]), + ?warning("Failure processing Cred ~p~n~p", + [Cred, {Err, erlang:get_stacktrace()}]), invalid end. -process_cert_struct_(Cert, Bin, UTC) -> - ID = cert_id(Cert), +process_cred_struct_(Cred, Bin, UTC, DevCert) -> + ID = cred_id(Cred), {ok, Register} = rvi_common:get_json_element( - [{'OR', ["sources", "register"]}], Cert), + [{'OR', ["right_to_register", "sources", "register"]}], Cred), {ok, Invoke} = rvi_common:get_json_element( - [{'OR', ["destinations", "invoke"]}], Cert), + [{'OR', ["right_to_invoke", "destinations", "invoke"]}], Cred), {ok, Start} = rvi_common:get_json_element( - ["validity", "start"], Cert), + ["validity", "start"], Cred), {ok, Stop} = rvi_common:get_json_element( - ["validity", "stop"], Cert), + ["validity", "stop"], Cred), + {ok, Cert} = rvi_common:get_json_element(["device_cert"], Cred), + case DevCert == undefined orelse Cert == DevCert of + false -> + ?warning("Wrong device_cert in cred~n~p~n~p", [Cert, DevCert]), + invalid; + true -> + ok + end, ?debug("Start = ~p; Stop = ~p~n", [Start, Stop]), Validity = {Start, Stop}, case check_validity(Start, Stop, UTC) of true -> - #cert{id = ID, - register = Register, - invoke = Invoke, + #cred{id = ID, + right_to_register = Register, + right_to_invoke = Invoke, validity = Validity, jwt = Bin, - cert = Cert}; + cred = Cred}; false -> - %% Cert outdated - ?warning("Outdated cert: Validity = ~p; UTC = ~p~n", + %% Cred outdated + ?warning("Outdated cred: Validity = ~p; UTC = ~p~n", [Validity, UTC]), invalid end. -cert_id(Cert) -> - case rvi_common:get_json_element(["id"], Cert) of +cred_id(Cred) -> + case rvi_common:get_json_element(["id"], Cred) of {ok, Id} -> Id; {error, undefined} -> - ?warning("Cert has no ID: ~p~n", [Cert]), - erlang:now() + ?warning("Cred has no ID: ~p~n", [Cred]), + erlang:unique_integer([positive,monotonic]) end. check_validity({Start, Stop}, UTC) -> @@ -539,14 +606,14 @@ validate_message_1([], _) -> error(invalid). validate_service_call_(Svc, Conn) -> - case lists:filter(fun(C) -> can_invoke(Svc, C) end, cert_recs_by_conn(Conn)) of + case lists:filter(fun(C) -> can_invoke(Svc, C) end, cred_recs_by_conn(Conn)) of [] -> invalid; - [#cert{id = ID}|_] -> + [#cred{id = ID}|_] -> {ok, ID} end. -can_invoke(Svc, #cert{invoke = In}) -> +can_invoke(Svc, #cred{right_to_invoke = In}) -> lists:any(fun(I) -> match_svc(I, Svc) end, In). pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) -> @@ -560,6 +627,11 @@ pp_key(#'RSAPublicKey'{modulus = Mod, publicExponent = Pub}) -> <<"#{'RSAPublicKey'{modulus = ", (abbrev_bin(M))/binary, ", publicExponent = ", P/binary, ", _ = ...}">>. +abbrev(B) when is_binary(B) -> + abbrev_bin(B); +abbrev(X) -> + abbrev_payload(X). + abbrev_bin(B) -> abbrev_bin(B, 20, 6). @@ -574,29 +646,58 @@ abbrev_bin(B, Len, Part) -> end catch error:_ -> B end. -abbrev_payload(Payload) -> +abbrev_payload(Payload) when is_list(Payload) -> try lists:map(fun abbrev_pl/1, Payload) - catch error:_ -> Payload end. + catch error:_ -> Payload end; +abbrev_payload(PL) -> + try abbrev_pl(PL) + catch error:_ -> PL end. abbrev_jwt({Hdr, Body} = X) -> try {Hdr, abbrev_payload(Body)} catch error:_ -> X end. +abbrev_pl(#cred{} = Payload) -> + list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B); + ([{_,_}|_]=L) -> abbrev_payload(L); + (A) -> A + end, tuple_to_list(Payload))); +abbrev_pl(#'RSAPublicKey'{} = Key) -> + pp_key(Key); +abbrev_pl(#'RSAPrivateKey'{} = Key) -> + pp_key(Key); +abbrev_pl(#'OTPCertificate'{} = Cert) -> + abbrev_deep_tuple(Cert); +abbrev_pl(#'OTPTBSCertificate'{} = Cert) -> + abbrev_deep_tuple(Cert); abbrev_pl({<<"keys">> = K, Ks}) -> {K, [abbrev_k(Ky) || Ky <- Ks]}; -abbrev_pl({<<"certs">> = C, Cs}) -> - {C, [abbrev_bin(Cert) || Cert <- Cs]}; +abbrev_pl({<<"creds">> = C, Cs}) -> + {C, [abbrev_bin(Cred) || Cred <- Cs]}; abbrev_pl({<<"certificate">> = K, C}) -> {K, abbrev_bin(C)}; +abbrev_pl({<<"device_cert">> = K, C}) -> + {K, abbrev_bin(C)}; +abbrev_pl({<<"credential">> = K, C}) -> + {K, abbrev_bin(C)}; abbrev_pl({<<"sign">> = K, S}) when is_binary(S) -> {K, abbrev_bin(S)}; +abbrev_pl({K, B}) when is_atom(K), is_binary(B), byte_size(B) > 50 -> + {K, abbrev_bin(B)}; +abbrev_pl({K,[{_,_}|_]=L}) -> + {K, abbrev_pl(L)}; abbrev_pl(B) when is_binary(B) -> abbrev_bin(B, 40, 10); abbrev_pl(L) when is_list(L) -> abbrev_payload(L); +abbrev_pl(T) when is_tuple(T), tuple_size(T) > 2 -> + abbrev_deep_tuple(T); abbrev_pl(X) -> X. +abbrev_deep_tuple(T) when is_tuple(T) -> + list_to_tuple([abbrev_pl(E) || E <- tuple_to_list(T)]). + abbrev_k(K) -> try lists:map(fun abbrev_elem/1, K) catch error:_ -> K end. @@ -606,7 +707,7 @@ abbrev_elem({<<"n">>, Bin}) -> abbrev_elem(X) -> X. -log([ID], Fmt, Args) -> - rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args)); -log(_, _, _) -> +log([ID], L, Fmt, Args) -> + rvi_log:log(ID, L, <<"authorize">>, rvi_log:format(Fmt, Args)); +log(_, _, _, _) -> ok. diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index c91b216..2291d21 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -18,19 +18,18 @@ terminate/2, code_change/3]). -export([start_json_server/0]). --export([get_authorize_jwt/1, - get_certificates/1, +-export([get_credentials/1, sign_message/2, validate_message/3, validate_authorization/3, validate_authorization/4, - store_certs/3, + store_creds/3, + store_creds/4, authorize_local_message/3, authorize_remote_message/3]). -export([filter_by_service/3]). %% for testing & development --export([sign/1]). -export([public_key/0, public_key_json/0, private_key/0]). @@ -51,7 +50,7 @@ start_link() -> init([]) -> ?debug("authorize_rpc:init(): called."), - {Priv, Pub} = authorize_keys:get_key_pair(), + {Priv, Pub} = authorize_keys:get_device_key(), ?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv), authorize_keys:pp_key(Pub)]), {ok, #st { cs = rvi_common:get_component_specification(), @@ -74,15 +73,10 @@ validate_message(CompSpec, JWT, Conn) -> [{jwt, JWT}, {conn, Conn}], [status, message], CompSpec). -get_authorize_jwt(CompSpec) -> - ?debug("authorize_rpc:get_authorize_jwt()~n", []), - rvi_common:request(authorize, ?MODULE, get_authorize_jwt, - [], [status, jwt], CompSpec). - -get_certificates(CompSpec) -> - ?debug("authorize_rpc:get_certificates()~n", []), - rvi_common:request(authorize, ?MODULE, get_certificates, - [], [status, certs], CompSpec). +get_credentials(CompSpec) -> + ?debug("authorize_rpc:get_credentials()~n", []), + rvi_common:request(authorize, ?MODULE, get_credentials, + [], [status, creds], CompSpec). validate_authorization(CompSpec, JWT, Conn) -> ?debug("authorize_rpc:validate_authorization():" @@ -92,19 +86,24 @@ validate_authorization(CompSpec, JWT, Conn) -> {conn, Conn}], [status], CompSpec). -validate_authorization(CompSpec, JWT, Certs, Conn) -> +validate_authorization(CompSpec, JWT, Creds, Conn) -> ?debug("authorize_rpc:validate_authorization():" " Conn = ~p~n", [Conn]), rvi_common:request(authorize, ?MODULE, validate_authorization, [{jwt, JWT}, - {certs, Certs}, + {creds, Creds}, {conn, Conn}], [status], CompSpec). -store_certs(CompSpec, Certs, Conn) -> - rvi_common:request(authorize, ?MODULE, store_certs, - [{certs, Certs}, - {conn, Conn}], +store_creds(CompSpec, Creds, Conn) -> + store_creds(CompSpec, Creds, Conn, undefined). + +store_creds(CompSpec, Creds, Conn, PeerCert) -> + ?debug("API: store_creds(), PeerCert = ~p", [authorize_keys:abbrev(PeerCert)]), + rvi_common:request(authorize, ?MODULE, store_creds, + [{creds, Creds}, + {conn, Conn}, + {peer_cert, PeerCert}], [status], CompSpec). authorize_local_message(CompSpec, Service, Params) -> @@ -130,11 +129,6 @@ filter_by_service(CompSpec, Services, Conn) -> { conn, Conn }], [status, services], CompSpec). -%% For testing while developing cert functionality -sign(Term) -> - %% Use private key of authorize_rpc to make a JWT token - gen_server:call(?SERVER, {sign, Term}). - public_key() -> gen_server:call(?SERVER, public_key). @@ -163,34 +157,30 @@ handle_rpc("validate_message", Args) -> gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }), {ok, [ {status, rvi_common:json_rpc_status(Status)}, {message, Msg} ]}; -handle_rpc("get_authorize_jwt", Args) -> +handle_rpc("get_credentials", Args) -> LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = - gen_server:call(?SERVER, { rvi, get_authorize_jwt, [LogId] }), - {ok, [ rvi_common:json_rpc_status(Status) | Rem ] }; -handle_rpc("get_certificates", Args) -> - LogId = rvi_common:get_json_log_id(Args), - [ Status | Rem ] = - gen_server:call(?SERVER, { rvi, get_certificates, [LogId] }), + gen_server:call(?SERVER, { rvi, get_credentials, [LogId] }), {ok, [ rvi_common:json_rpc_status(Status) | Rem ] }; handle_rpc("validate_authorization", Args) -> {ok, JWT} = rvi_common:get_json_element(["jwt"], Args), {ok, Conn} = rvi_common:get_json_element(["connection"], Args), LogId = rvi_common:get_json_log_id(Args), CmdArgs = - case rvi_common:get_json_element(["certs"], Args) of - {ok, Certs} -> [JWT, Certs, Conn, LogId]; + case rvi_common:get_json_element(["creds"], Args) of + {ok, Creds} -> [JWT, Creds, Conn, LogId]; {error, _} -> [JWT, Conn, LogId] end, [ Status | Rem ] = gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}), {ok, [ rvi_common:json_rpc_status(Status) | Rem] }; -handle_rpc("store_certs", Args) -> - {ok, Certs} = rvi_common:get_json_element(["certs"], Args), +handle_rpc("store_creds", Args) -> + {ok, Creds} = rvi_common:get_json_element(["creds"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), + {ok, PeerCert} = rvi_common:get_json_element(["peer_cert"], Args), LogId = rvi_common:get_json_log_id(Args), [ Status | Rem ] = - gen_server:call(?SERVER, {rvi, store_certs, [Certs, Conn, LogId]}), + gen_server:call(?SERVER, {rvi, store_creds, [Creds, Conn, PeerCert, LogId]}), {ok, [ rvi_common:json_rpc_status(Status) | Rem]}; handle_rpc("authorize_local_message", Args) -> {ok, Service} = rvi_common:get_json_element(["service"], Args), @@ -236,38 +226,35 @@ handle_notification(Other, _Args) -> %% handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) -> Sign = authorize_sig:encode_jwt(Msg, Key), - log(LogId, "signed", []), + log(LogId, result, "signed", []), {reply, [ ok, Sign ], State}; handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) -> try begin Res = authorize_keys:validate_message(JWT, Conn), - log(LogId, "validated", []), + log(LogId, result, "validated", []), {reply, [ok, Res], State} end catch error:_Err -> - log(LogId, "validation FAILED", []), + log(LogId, error, "validation FAILED", []), {reply, [not_found], State} end; -handle_call({rvi, get_authorize_jwt, [_LogId]}, _From, State) -> - {reply, [ ok, authorize_keys:authorize_jwt() ], State}; - -handle_call({rvi, get_certificates, [_LogId]}, _From, State) -> - {reply, [ ok, authorize_keys:get_certificates() ], State}; +handle_call({rvi, get_credentials, [_LogId]}, _From, State) -> + {reply, [ ok, authorize_keys:get_credentials() ], State}; handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) -> - %% The authorize JWT contains the public key used to sign the cert + %% The authorize JWT contains the public key used to sign the cred ?debug( "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", []), try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of {_Header, Keys} -> - log(LogId, "auth jwt validated", []), + log(LogId, result, "auth jwt validated", []), KeyStructs = get_json_element(["keys"], Keys, []), authorize_keys:save_keys(KeyStructs, Conn), {reply, [ok], State}; invalid -> ?warning("Invalid auth JWT from ~p~n", [Conn]), - log(LogId, "auth jwt INVALID", []), + log(LogId, error, "auth jwt INVALID", []), {reply, [not_found], State} catch error:_Err -> @@ -275,22 +262,22 @@ handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, Sta {reply, [not_found], State} end; -handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _From, State) -> - %% The authorize JWT contains the public key used to sign the cert +handle_call({rvi, validate_authorization, [JWT, Creds, Conn | [_] = LogId] }, _From, State) -> + %% The authorize JWT contains the public key used to sign the cred ?debug( "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n", []), try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of {_Header, Keys} -> - log(LogId, "auth jwt validated", []), + log(LogId, result, "auth jwt validated", []), KeyStructs = get_json_element(["keys"], Keys, []), ?debug("KeyStructs = ~p~n", [KeyStructs]), authorize_keys:save_keys(KeyStructs, Conn), - do_store_certs(Certs, Conn, LogId), + do_store_creds(Creds, Conn, undefined, LogId), {reply, [ok], State}; invalid -> ?warning("Invalid auth JWT from ~p~n", [Conn]), - log(LogId, "auth jwt INVALID", []), + log(LogId, error, "auth jwt INVALID", []), {reply, [not_found], State} catch error:_Err -> @@ -298,21 +285,17 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _F {reply, [not_found], State} end; -handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) -> - do_store_certs(Certs, Conn, LogId), +handle_call({rvi, store_creds, [Creds, Conn, PeerCert | LogId]}, _From, State) -> + do_store_creds(Creds, Conn, PeerCert, LogId), {reply, [ok], State}; handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) -> ?debug("authorize_rpc:handle_call(~p)~n", [R]), - case authorize_keys:find_cert_by_service(Service) of - {ok, {ID, _Cert}} -> - %% Msg = Params ++ [{<<"certificate">>, Cert}], - %% ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", - %% [authorize_keys:abbrev_payload(Msg)]), - %% Sig = authorize_sig:encode_jwt(Msg, Key), - log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]), + case authorize_keys:find_cred_by_service(Service) of + {ok, {ID, _Cred}} -> + log(LogId, result, "auth msg: Cred=~s", [authorize_keys:abbrev_bin(ID)]), {reply, [ok], State}; _ -> - log(LogId, "NO CERTS for ~s", [Service]), + log(LogId, error, "NO CREDS for ~s", [Service]), {reply, [ not_found ], State} end; @@ -330,11 +313,11 @@ handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]}, ?debug("authorize_rpc:authorize_remote_message(): parameters: ~p~n", [Parameters]), case authorize_keys:validate_service_call(SvcName, {IP, Port}) of invalid -> - log(LogId, "remote msg REJECTED", []), + log(LogId, error, "remote msg REJECTED", []), {reply, [ not_found ], State}; - {ok, CertID} -> - ?debug("validated Cert ID=~p", [CertID]), - log(LogId, "remote msg allowed: Cert=~s", [CertID]), + {ok, CredID} -> + ?debug("validated Cred ID=~p", [CredID]), + log(LogId, result, "remote msg allowed: Cred=~s", [CredID]), {reply, [ok], State} end; @@ -370,11 +353,12 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -do_store_certs(Certs, Conn, LogId) -> - ?debug("Storing ~p certs for conn ~p~n", [length(Certs), Conn]), - lists:foreach(fun(Cert) -> - store_cert(Cert, Conn, LogId) - end, Certs). +do_store_creds(Creds, Conn, PeerCert, LogId) -> + ?debug("Storing ~p creds for conn ~p~nPeerCert = ~w", + [length(Creds), Conn, authorize_keys:abbrev(PeerCert)]), + lists:foreach(fun(Cred) -> + store_cred(Cred, Conn, PeerCert, LogId) + end, Creds). get_json_element(Path, JSON, Default) -> case rvi_common:get_json_element(Path, JSON) of @@ -384,26 +368,26 @@ get_json_element(Path, JSON, Default) -> Default end. -store_cert(Cert, Conn, LogId) -> - case authorize_sig:decode_jwt(Cert, authorize_keys:provisioning_key()) of - {_CHeader, CertStruct} -> - case authorize_keys:save_cert(CertStruct, Cert, Conn, LogId) of +store_cred(CredJWT, Conn, PeerCert, LogId) -> + case authorize_sig:decode_jwt(CredJWT, authorize_keys:provisioning_key()) of + {_CHeader, CredStruct} -> + case authorize_keys:save_cred(CredStruct, CredJWT, Conn, PeerCert, LogId) of ok -> ok; {error, Reason} -> ?warning( - "Couldn't store certificate from ~p: ~p~n", + "Couldn't store credential from ~p: ~p~n", [Conn, Reason]), ok end; invalid -> - ?warning("Invalid certificate from ~p~n", [Conn]), + log(LogId, warning, "credential INVALID (~p)", [Conn]), ok end. -log([ID], Fmt, Args) -> - rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args)); -log(_, _, _) -> +log([ID], Lvl, Fmt, Args) -> + rvi_log:log(ID, Lvl, <<"authorize">>, rvi_log:format(Fmt, Args)); +log(_, _, _, _) -> ok. %% check_msg(Checks, Params) -> diff --git a/components/dlink/src/dlink.app.src b/components/dlink/src/dlink.app.src index 6666333..7dca4d5 100644 --- a/components/dlink/src/dlink.app.src +++ b/components/dlink/src/dlink.app.src @@ -14,7 +14,11 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + authorize, + service_edge, + service_discovery, + schedule ]}, {mod, {dlink_app, []}}, {start_phases, [{announce, []}]}, diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl index 1ad82b2..6a68e48 100644 --- a/components/dlink/src/dlink_data_json.erl +++ b/components/dlink/src/dlink_data_json.erl @@ -1,6 +1,10 @@ -module(dlink_data_json). --compile(export_all). +-export([encode/2, + decode/3]). +-export([init/1, + port_options/0]). + init(_Opts) -> []. @@ -8,41 +12,25 @@ init(_Opts) -> port_options() -> [list, {packet, 0}]. -decode(Msg, St) -> - {Msg1, St1} = append(St, Msg), - try exo_json:decode(St1, Msg1) of - {done, {ok, {struct, Elems}}, Rest} -> - {ok, [Elems], Rest}; - {done, {ok, {array, Structs}}, Rest} -> - {ok, [Str || {struct, Str} <- Structs], Rest}; - {done, {error, Reason}, Rest} -> - {error, Reason, Rest}; - {more, Cont} -> - {more, Cont} - catch - error:Error -> - {error, Error, St1}; - exit:Exit -> - {error, Exit, St1} - end. +decode(Msg, F, St) when is_function(F, 1) -> + jsx_decode_stream(Msg, F, St). + +encode(Msg, St) -> + {ok, rvi_common:term_to_json(Msg), St}. -encode({struct, _} = JSON, St) -> - try {ok, exo_json:encode(JSON), St} - catch exit:Error -> erlang:error(Error) - end; -encode({array, Structs} = JSON, St) -> - case lists:all(fun({struct,_}) -> true; - (_) -> false - end, Structs) of - true -> - {ok, exo_json:encode(JSON), St}; - false -> - erlang:error(invalid_json_structure) +jsx_decode_stream(Data, F, St) -> + case jsx_decode(Data, St) of + {incomplete, Cont} -> + {ok, Cont}; + {with_tail, Elems, <<>>} -> + F(Elems), + {ok, []}; + {with_tail, Elems, Rest} -> + F(Elems), + jsx_decode_stream(Rest, F, []) end. -append([], Msg) -> - {Msg, []}; -append([_|_] = St, Msg) -> - {St ++ Msg, []}; -append(Cont, Msg) when is_tuple(Cont) -> - {Msg, Cont}. +jsx_decode(Data, []) -> + jsx:decode(Data, [stream, return_tail]); +jsx_decode(Data, Cont) when is_function(Cont, 1) -> + Cont(Data). diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl index 9510a53..253da55 100644 --- a/components/dlink/src/dlink_data_msgpack.erl +++ b/components/dlink/src/dlink_data_msgpack.erl @@ -1,6 +1,10 @@ -module(dlink_data_msgpack). --compile(export_all). +-export([init/1, + decode/3, + encode/2]). + +-export([port_options/0]). -record(st, {opts = [{allow_atom, pack}, {enable_str, true}, @@ -13,15 +17,16 @@ port_options() -> init(_CS) -> #st{}. -decode(Msg0, #st{buf = Prev, opts = Opts} = St) -> +decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) -> Msg = append(Prev, Msg0), case msgpack:unpack_stream(Msg, Opts) of {error, incomplete} -> - {more, St#st{buf = Msg}}; + {ok, St#st{buf = Msg}}; {error, E} -> - {error, E, St}; + {error, E}; {Decoded, Rest} when is_binary(Rest) -> - {ok, Decoded, St#st{buf = Rest}} + F(Decoded), + decode(Rest, F, St#st{buf = <<>>}) end. encode({struct, Elems}, #st{opts = Opts} = St) -> diff --git a/components/dlink/src/dlink_data_rvi.erl b/components/dlink/src/dlink_data_rvi.erl deleted file mode 100644 index 01131be..0000000 --- a/components/dlink/src/dlink_data_rvi.erl +++ /dev/null @@ -1,123 +0,0 @@ --module(dlink_data_rvi). - --compile(export_all). - --record(dlink_data_rvi, {need, buf}). - --define(MAX_LINE, 79). - -init(_Opts) -> - undefined. - -port_options() -> - []. - -encode(Elems, St) -> - Bin = encode_(Elems, <<>>), - Sz = byte_size(Bin), - {ok, <<"&RVI|", - (integer_to_binary(Sz, 16))/binary, "\n", - Bin/binary>>, St}. - -encode_([{Key, Val}|T], Acc) -> - {Type, ValBin} = encode_val(Val), - Bin = encode_elem(to_bin(Key), Type, ValBin), - encode_(T, <>); -encode_([], Acc) -> - Acc. - -encode_val(V) when is_binary(V) -> {$B, V}; -encode_val(V) when is_integer(V) -> {$i, integer_to_binary(V,16)}; -encode_val(V) when is_atom(V) -> {$a, atom_to_binary(V, latin1)}; -encode_val(V) when is_float(V) -> - Bin = <>, - {$f, Bin}; -encode_val({T,_} = J) when T==array; T==struct -> - JSON = exo_json:encode(J), - {$J, iolist_to_binary(JSON)}; -encode_val([T|_] = L) when is_tuple(T) -> - {$L, encode_(L, <<>>)}. - -decode_value($B, Bin) -> Bin; -decode_value($i, Bin) -> binary_to_integer(Bin, 16); -decode_value($f, <>) -> F; -decode_value($a, Bin) -> binary_to_existing_atom(Bin, latin1); -decode_value($J, Bin) -> - {ok, Obj} = exo_json:decode_string(binary_to_list(Bin)), - Obj; -decode_value($L, Bin) -> - decode_packet(Bin). - -encode_elem(Key, Type, Bin) -> - BSz = byte_size(Bin), - case byte_size(Key) + BSz of - Sz when Sz =< 78, Type >= $a, Type =< $z -> - <>; - _ -> - <> - end. - -decode(<<"&RVI|", Rest/binary>>, undefined) -> - case erlang:decode_packet(line, Rest, [{line_length, 79}]) of - {more, _} -> - {more, Rest}; - {ok, Ln, Rest1} -> - LSz = byte_size(Ln), - LSz1 = LSz-1, - <> = Ln, - Bytes = binary_to_integer(Size, 16), - case Rest1 of - <> -> - {ok, decode_packet(Pkt), Tail}; - _ -> - {more, #dlink_data_rvi{need = Bytes, buf = Rest1}} - end - end; -decode(Data, #dlink_data_rvi{need = Bytes, buf = Buf} = St) -> - case <> of - <> -> - {ok, decode_packet(Pkt), Tail}; - Buf1 -> - {more, St#dlink_data_rvi{buf = Buf1}} - end; -decode(_, _St) -> - {error, unknown, undefined}. - -decode_packet(<<>>) -> - []; -decode_packet(P) -> - {ok, L, Rest} = erlang:decode_packet(line, P, [{line_length, ?MAX_LINE}]), - case split_line(L) of - {Key, Type, simple, Data} -> - [{Key, decode_value(Type, Data)}|decode_packet(Rest)]; - {Key, Type, Size} -> - Size1 = Size-1, - <> = Rest, - [{Key, decode_value(Type, VBin)}|decode_packet(Rest1)] - end. - -to_bin(V) when is_atom(V) -> atom_to_binary(V, latin1); -to_bin(V) when is_binary(V) -> V; -to_bin(V) when is_list(V) -> iolist_to_binary(V). - -split_line(L) -> - split_line(L, <<>>). - -split_line(<<"\\", $|, Rest/binary>>, Acc) -> - split_line(Rest, <>); -split_line(<<"|", T, ":", Rest/binary>>, Acc) -> - {Acc, T, simple, remove_nl(Rest)}; -split_line(<<"|", T, "|", Rest/binary>>, Acc) -> - SzBin = remove_nl(Rest), - {Acc, T, binary_to_integer(SzBin, 16)}; -split_line(<>, Acc) -> - split_line(T, <>). - - -remove_nl(B) -> - Sz = byte_size(B), - Sz1 = Sz-1, - <> = B, - V. diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl index 85a8663..6b3a64e 100644 --- a/components/dlink_bt/src/bt_connection.erl +++ b/components/dlink_bt/src/bt_connection.erl @@ -36,16 +36,20 @@ -define(SERVER, ?MODULE). +-define(PACKET_MOD, dlink_data_json). -record(st, { remote_addr = "00:00:00:00:00:00", channel = 0, - rfcomm_ref = undefined, - listen_ref = undefined, + rfcomm_ref, + listen_ref, + accept_ref, mode = bt, - mod = undefined, - func = undefined, - args = undefined + packet_mod = ?PACKET_MOD, + packet_st = [], + mod, + func, + args }). %%%=================================================================== @@ -124,11 +128,15 @@ is_connection_up(Addr, Channel) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) -> +init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) -> %% connect will block on rfcomm:open, so cast to self %% in order to let init return. + ?debug("init({connect, ~p, ~p, ~p, ~p, ~p, CS})", + [BTAddr, Channel, Mode, Mod, Fun]), gen_server:cast(self(), connect), + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS), + PktSt = PktMod:init(CS), {ok, #st{ remote_addr = BTAddr, channel = Channel, @@ -136,36 +144,47 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) -> mode = Mode, mod = Mod, func = Fun, - args = Arg + args = CS, + packet_mod = PktMod, + packet_st = PktSt }}; -init({accept, Channel, ListenRef, Mode, Mod, Fun, Arg}) -> - ARef = case Mode of - tcp -> - {ok, R} = exo_socket:async_accept(ListenRef), - R; - bt -> - {ok, R} = rfcomm:accept(ListenRef, infinity, self()), - R - end, +init({accept, Channel, ListenRef, Mode, Mod, Fun, CS}) -> + ?debug("init({accept, ~p, ~p, ~p, ~p, ~p, CS})", + [Channel, ListenRef, Mode, Mod, Fun]), ?debug("bt_connection:init(accept): self(): ~p", [self()]), ?debug("bt_connection:init(accept): Channel: ~p", [Channel]), ?debug("bt_connection:init(accept): ListenRef: ~p", [ListenRef]), - ?debug("bt_connection:init(accept): AcceptRef: ~p", [ARef]), ?debug("bt_connection:init(accept): Module: ~p", [Mod]), ?debug("bt_connection:init(accept): Function: ~p", [Fun]), - ?debug("bt_connection:init(accept): Arg: ~p", [Arg]), - + ?debug("bt_connection:init(accept): Arg: ~p", [CS]), + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS), + PktSt = PktMod:init(CS), + ARef = case Mode of + bt -> + %% Expected message is + %% {rfcomm, ARef, {accept, Addr, Chan}} + {ok, R} = rfcomm:accept(ListenRef, infinity, self()), + R; + tcp -> + %% -1 represents infinity + %% Expected message is + %% {inet_async,LSock,Ref,{ok,Socket}} + {ok, R} = prim_inet:async_accept(ListenRef, -1), + R + end, {ok, #st{ channel = Channel, - rfcomm_ref = ARef, - listen_ref = ListenRef, + rfcomm_ref = undefined, + accept_ref = ARef, mode = Mode, mod = Mod, func = Fun, - args = Arg + args = CS, + packet_mod = PktMod, + packet_st = PktSt }}. @@ -219,7 +238,7 @@ handle_cast(connect, #st { bt -> rfcomm:open(BTAddr, Channel); tcp -> - exo_socket:connect(BTAddr, Channel) + gen_tcp:connect(BTAddr, Channel, bt_listener:sock_opts()) end, case ConnRes of {ok, ConnRef} -> @@ -246,13 +265,18 @@ handle_cast(connect, #st { { stop, { connect_failed, Error}, St } end; -handle_cast({send, Data}, St) -> - ?debug("~p:handle_call(send): Sending: ~p", - [ ?MODULE, Data]), - - rfcomm:send(St#st.rfcomm_ref, Data), - - {noreply, St}; +handle_cast({send, Data}, #st{mode = Mode, + rfcomm_ref = Sock, + packet_mod = PMod, packet_st = PSt} = St) -> + ?debug("handle_cast(send): Sending: ~p", [Data]), + {ok, Encoded, PSt1} = PMod:encode(Data, PSt), + ?debug("Encoded = ~p", [Encoded]), + Res = case Mode of + bt -> rfcomm:send(Sock, Encoded); + tcp -> gen_tcp:send(Sock, Encoded) + end, + ?debug("send Res = ~p", [Res]), + {noreply, St#st{packet_st = PSt1}}; handle_cast(_Msg, State) -> ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), @@ -271,35 +295,43 @@ handle_cast(_Msg, State) -> %% An accept reference we've setup now has accetpted an %% incoming connection. -handle_info({rfcomm, _ARef, { accept, BTAddr, _ } }, +handle_info({rfcomm, ARef, { accept, BTAddr, _ } }, #st { mod = Mod, func = Fun, args = Arg, + listen_ref = LRef, + accept_ref = ARef, channel = Channel } = St) -> ?info("~p:handle_info(): bt_connection from ~w:~w\n", [?MODULE, BTAddr,Channel]), - + bt_listener:accept_ack(ok, LRef, BTAddr, Channel), Mod:Fun(self(), BTAddr, Channel, accepted, Arg), { noreply, St#st { remote_addr = BTAddr, - channel = Channel } }; + channel = Channel, + accept_ref = undefined} }; handle_info({rfcomm, _ConnRef, {data, Data}}, #st { remote_addr = BTAddr, channel = Channel, + packet_mod = PMod, + packet_st = PSt, mod = Mod, - func = Fun, - args = Arg } = State) -> + func = Fun } = State) -> ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), ?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]), ?info("~p:handle_info(data): ~p:~p -> ~p:~p", [ ?MODULE, BTAddr, Channel, Mod, Fun]), - Self = self(), - spawn(fun() -> Mod:Fun(Self, BTAddr, Channel, - data, Data, Arg) end), - - {noreply, State}; + case PMod:decode(Data, fun(Elems) -> + handle_elements(Elems, State) + end, PSt) of + {ok, PSt1} -> + {noreply, State#st{packet_st = PSt1}}; + {error, Reason} -> + ?error("decode failed: ~p", [Reason]), + {stop, Reason, State} + end; handle_info({rfcomm, ConnRef, closed}, @@ -339,13 +371,32 @@ handle_info({rfcomm, ConnRef, error}, bt_connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; +handle_info({tcp, Sock, Data}, #st{remote_addr = IP, + channel = Port, + rfcomm_ref = Sock, + packet_mod = PMod, + packet_st = PSt} = St) -> + ?debug("handle_info(data): From: ~p:~p", [IP, Port]), + case PMod:decode(Data, fun(Elems) -> + handle_elements(Elems, St) + end, PSt) of + {ok, PSt1} -> + inet:setopts(Sock, [{active, once}]), + {noreply, St#st{packet_st = PSt1}}; + {error, Reason} -> + ?error("decode failed, Reason = ~p", [Reason]), + {stop, Reason, St} + end; handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod, func = Fun, args = Arg} = St) -> ?debug("~p:handle_info(~p)", [?MODULE, Msg]), + inet_db:register_socket(Sock, inet_tcp), + inet:setopts(Sock, [{active, once}]), {ok, {BTAddr, Channel}} = inet:peername(Sock), Mod:Fun(self(), BTAddr, Channel, accepted, Arg), - {noreply, St}; + {noreply, St#st{rfcomm_ref = Sock, + remote_addr = BTAddr}}; handle_info(_Info, State) -> ?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]), @@ -380,3 +431,14 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +get_module_config(Key, Default, CS) -> + rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS). + +handle_elements(Elements, #st{remote_addr = BTAddr, + channel = Channel, + mod = Mod, + func = Fun, + args = Arg}) -> + ?debug("data complete; processed: ~p", + [authorize_keys:abbrev(Elements)]), + Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg). diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl index 1b1e049..e86dda3 100644 --- a/components/dlink_bt/src/bt_connection_manager.erl +++ b/components/dlink_bt/src/bt_connection_manager.erl @@ -67,6 +67,7 @@ find_connection_by_address(BTAddr, Channel) -> %% @end %%-------------------------------------------------------------------- start_link() -> + ?debug("start_link()", []), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl index 3711652..a1f1a49 100644 --- a/components/dlink_bt/src/bt_listener.erl +++ b/components/dlink_bt/src/bt_listener.erl @@ -14,13 +14,16 @@ -include_lib("rvi_common/include/rvi_common.hrl"). -export([start_link/1, add_listener/1, - remove_listener/1]). + remove_listener/1, + accept_ack/4]). +-export([sock_opts/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2]). -record(st, {listeners = [], acceptors = [], + mode = bt, cs = #component_spec{} }). @@ -34,41 +37,33 @@ add_listener(Channel) -> remove_listener(Channel) -> gen_server:call(?MODULE, {remove_listener, Channel}). +accept_ack(Result, LRef, Addr, Chan) -> + ?MODULE ! {accept, self(), LRef, Addr, Chan, Result}, + ok. + +sock_opts() -> + [binary, {active, once}, {packet, 0}]. + init(Mode) -> {ok, #st { listeners = [], acceptors = [], + mode = Mode, cs = rvi_common:set_value(bt_mode, Mode, rvi_common:get_component_specification()) } }. -handle_call({add_listener, Channel}, _From, #st{cs = CS} = St) -> +handle_call({add_listener, Channel}, _From, #st{mode = Mode, + listeners = Ls} = St) -> ?info("bt_listener:add_listener(): Setting up listener on channel ~p", [ Channel]), - Mode = rvi_common:get_value(bt_mode, bt, CS), case listen(Mode, Channel) of {ok, ListenRef} -> ?info("bt_listener:add_listener(): ListenRef: ~p", [ ListenRef]), - {ok, ConnPid} = bt_connection:accept(Channel, - ListenRef, - Mode, - dlink_bt_rpc, - handle_socket, - CS), - - - - %%{ noreply, NSt} = handle_info({accept, ListenRef, Channel, ok}, St), - - { reply, - ok, - St#st { - acceptors = [ { Channel, ConnPid } | St#st.acceptors ], - listeners = [ { ListenRef, Channel } | St#st.listeners ] - } - }; + St1 = St#st{listeners = [{ListenRef, Channel}|Ls]}, + {reply, ok, start_acceptor(ListenRef, Channel, St1)}; Err -> ?info("bt_listener:add_listener(): Failed: ~p", [ Err]), @@ -86,12 +81,19 @@ handle_call(_Msg, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) -> +handle_info({accept, Pid, ListenRef, BTAddr, Channel, ok}, + #st{listeners = Ls, + acceptors = As} = St) -> %% Fire up a new process to handle the %% future incoming connection. ?info("bt_listener:accept(): ListenRef: ~p", [ ListenRef]), ?info("bt_listener:accept(): Remote: ~p-~p", [BTAddr, Channel ]), + case lists:keyfind(ListenRef, 1, Ls) of + {_, Channel} -> + As1 = lists:keydelete(Pid, 2, As), + {noreply, start_acceptor(ListenRef, Channel, + St#st{acceptors = As1})}; %% Must fix multiple acceptors in bt_linux_drv.c %% {ok, ConnPid} = bt_connection:accept(Channel, %% ListenRef, @@ -101,7 +103,9 @@ handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) -> %% {noreply, St#st {acceptors = [ { Channel, ConnPid } | St#st.acceptors ]}}; - {noreply, St }; + _ -> + {noreply, St } + end; handle_info(_Msg, State) -> ?info("bt_listener:handle_info(): Unknown: ~p", [ _Msg]), @@ -115,4 +119,16 @@ terminate(_Reason, _State) -> listen(bt, Channel) -> rfcomm:listen(Channel); listen(tcp, Port) -> - exo_socket:listen(Port). + gen_tcp:listen(Port, sock_opts()). + +start_acceptor(ListenRef, Channel, #st{mode = Mode, + acceptors = As, + cs = CS} = St) -> + ?debug("start acceptor(~p, ~p, St)", [ListenRef, Channel]), + {ok, ConnPid} = bt_connection:accept(Channel, + ListenRef, + Mode, + dlink_bt_rpc, + handle_socket, + CS), + St#st{acceptors = [{Channel, ConnPid}|As]}. diff --git a/components/dlink_bt/src/dlink_bt.app.src b/components/dlink_bt/src/dlink_bt.app.src index e63d1ac..ba62655 100644 --- a/components/dlink_bt/src/dlink_bt.app.src +++ b/components/dlink_bt/src/dlink_bt.app.src @@ -17,7 +17,7 @@ {applications, [ kernel, stdlib, - rvi_common, + dlink, bt ]}, {mod, { dlink_bt_app, []}}, diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index ab5f8ac..542fd00 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -34,7 +34,7 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --include_lib("rvi_common/include/rvi_dlink.hrl"). +-include_lib("rvi_common/include/rvi_dlink_bin.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BT_CHANNEL, 1). @@ -60,6 +60,7 @@ }). -record(st, { + mode = bt, %% tcp | bt cs = #component_spec{} }). @@ -93,9 +94,11 @@ init([]) -> { keypos, #connection_entry.connection }]), CS = rvi_common:get_component_specification(), + Mode = get_mode(CS), service_discovery_rpc:subscribe(CS, ?MODULE), {ok, #st { + mode = Mode, cs = CS } }. @@ -105,15 +108,19 @@ start_json_server() -> start_connection_manager() -> + ?debug("start_connection_manager()", []), CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - server_opts, - [], - CompSpec), + ServerOpts = get_server_opts(CompSpec), + start_connection_manager(ServerOpts, CompSpec). + +start_connection_manager([], _) -> + ?debug("No BT server options set; start only the connection manager", []), + bt_connection_manager:start_link(); +start_connection_manager(ServerOpts, CompSpec) -> %% Retrieve the channel we should use - Mode = get_mode(BertOpts), - Channel = get_channel(Mode, BertOpts), + ?debug("ServerOpts = ~p", [ServerOpts]), + Mode = get_mode(ServerOpts), + Channel = get_channel(Mode, ServerOpts), ?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]), @@ -124,6 +131,7 @@ start_connection_manager() -> bt:start(), bt:debug(debug); tcp -> + ?debug("Mode == tcp; not starting bt driver", []), ok end, bt_listener:start_link(Mode), @@ -146,11 +154,21 @@ start_connection_manager() -> [], CompSpec), - setup_persistent_connections_(PersistentConnections, CompSpec), + setup_persistent_connections_(PersistentConnections, Mode, CompSpec), ok. -get_mode(Opts) -> +get_server_opts(CS) when element(1, CS) == component_spec -> + {ok, ServerOpts } = rvi_common:get_module_config(data_link, + ?MODULE, + server_opts, + [], + CS), + ServerOpts. + +get_mode(CS) when element(1, CS) == component_spec -> + get_mode(get_server_opts(CS)); +get_mode(Opts) when is_list(Opts) -> proplists:get_value(test_mode, Opts, bt). get_channel(tcp, Opts) -> @@ -159,15 +177,15 @@ get_channel(bt, Opts) -> proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL). -setup_persistent_connections_([ ], _CompSpec) -> +setup_persistent_connections_([ ], _, _CompSpec) -> ok; -setup_persistent_connections_([ BTAddress | T], CompSpec) -> +setup_persistent_connections_([ BTAddress | T], Mode, CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]), - [ BTAddr, Channel] = string:tokens(BTAddress, "-"), - connect_and_retry_remote(BTAddr, Channel, CompSpec), - setup_persistent_connections_(T, CompSpec), + [ BTAddr, Channel] = string:tokens(BTAddress, "-:"), %% Addr-Chan | IP:Port + connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec), + setup_persistent_connections_(T, Mode, CompSpec), ok. @@ -213,7 +231,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> %% %% Connect to a remote RVI node. %% -connect_remote(BTAddr, Channel, CompSpec) -> +connect_remote(BTAddr, Channel, Mode, CompSpec) -> case bt_connection_manager:find_connection_by_address(BTAddr, Channel) of { ok, _Pid } -> already_connected; @@ -225,7 +243,7 @@ connect_remote(BTAddr, Channel, CompSpec) -> %%FIXME %% Setup a genserver around the new connection. - case bt_connection:connect(BTAddr, Channel, + case bt_connection:connect(BTAddr, Channel, Mode, ?MODULE, handle_socket, CompSpec ) of { ok, Pid } -> ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p", @@ -239,45 +257,33 @@ connect_remote(BTAddr, Channel, CompSpec) -> end end. - -connect_and_retry_remote( BTAddr, Channel, CompSpec) -> +connect_and_retry_remote( BTAddr, Channel, Mode, CompSpec) -> ?info("dlink_bt:connect_and_retry_remote(): ~p:~p", [ BTAddr, Channel]), - case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of - ok -> ok; - + CS = start_log(<<"conn">>, "connect ~s:~s", [BTAddr, Channel], CompSpec), + case connect_remote(BTAddr, list_to_integer(Channel), Mode, CS) of + ok -> + ok; Err -> %% Failed to connect. Sleep and try again ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p", - [BTAddr, Channel, Err]), - - ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec", - [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]), - - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec), - + [BTAddr, Channel, Err]), + ?notice("dlink_bt:connect_and_retry_remote(~p:~p):" + " Will try again in ~p sec", + [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]), + setup_reconnect_timer( + ?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CS), not_available end. - - announce_local_service_(_CompSpec, [], _Service, _Availability) -> ok; announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), - Res = bt_connection:send(ConnPid, - term_to_json( - { struct, - [ - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_TRANSACTION_ID, 3}, - { ?DLINK_ARG_SIGNATURE, JWT } - ] - })), + Msg = availability_msg(Availability, [Service], CompSpec), + Res = bt_connection:send(ConnPid, Msg), ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -299,24 +305,23 @@ announce_local_service_(CompSpec, Service, Availability) -> process_data(_FromPid, RemoteBTAddr, RemoteChannel, ProtocolMod, Data, CompSpec) -> ?debug("dlink_bt:receive_data(): SetupAddress: {~p, ~p}", [ RemoteBTAddr, RemoteChannel ]), ?debug("dlink_bt:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), - Proto = list_to_atom(ProtocolMod), - Proto:receive_message(CompSpec, base64:decode_to_string(Data)), + Proto = list_to_existing_atom(ProtocolMod), + Proto:receive_message(CompSpec, {RemoteBTAddr, RemoteChannel}, Data), ok. -availability_msg(Availability, Services) -> - {struct, [{?DLINK_ARG_STATUS, status_string(Availability)}, - {?DLINK_ARG_SERVICES, {array, Services}}]}. +availability_msg(Availability, Services, CompSpec) -> + [{?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE}, + {?DLINK_ARG_STATUS, status_string(Availability)}, + {?DLINK_ARG_SERVICES, Services} + | log_id_tail(CompSpec)]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> - {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg), - {ok, Svcs} = rvi_common:get_json_element([?DLINK_ARG_SERVICES], Msg), +process_announce(Avail, Svcs, FromPid, Addr, Channel, CompSpec) -> ?debug("dlink_bt_rpc:service_announce(~p): Address: ~p:~p", [Avail, Addr, Channel]), - ?debug("dlink_bt_rpc:service_announce(~p): TransactionID: ~p", [Avail, TID]), ?debug("dlink_bt_rpc:service_announce(~p): Services: ~p", [Avail, Svcs]), case Avail of ?DLINK_ARG_AVAILABLE -> @@ -327,23 +332,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) end. -process_authorize(FromPid, - PeerBTAddr, - PeerBTChannel, - TransactionID, - RemoteAddress, - RemoteChannel, - Protocol, - Certificates, - Signature, - CompSpec) -> - +process_authorize(FromPid, PeerBTAddr, PeerBTChannel, + RemoteAddress, RemoteChannel, Protocol, + Credentials, CompSpec) -> ?info("dlink_bt:authorize(): Peer Address: ~p:~p", [PeerBTAddr, PeerBTChannel ]), ?info("dlink_bt:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemoteChannel ]), - ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]), - ?debug("dlink_bt:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_bt:authorize(): Certificates: ~p", [ Certificates ]), - ?debug("dlink_bt:authorize(): Signature: ~p", [ Signature ]), + ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]), + ?debug("dlink_bt:authorize(): Credentials: ~p", [ Credentials ]), %% If FromPid (the genserver managing the socket) is not yet registered %% with the conneciton manager, this is an incoming connection @@ -351,59 +346,42 @@ process_authorize(FromPid, %% a service announce Conn = {RemoteAddress, RemoteChannel}, - case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. + log(result, "auth ~s:~w", [RemoteAddress, RemoteChannel], CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn), + connection_authorized(FromPid, Conn, CompSpec). handle_socket(FromPid, PeerBTAddr, PeerChannel, data, - Payload, CompSpec) -> + Elems, CompSpec) -> - {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)), ?debug("dlink_bt:data(): Got ~p", [ Elems ]), + CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), + case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, + [ RemoteAddress, RemoteChannel, RVIProtocol, - CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + Credentials ] = + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), - Certificates = - case CertificatesTmp of - { array, C} -> C; - undefined -> [] - end, process_authorize(FromPid, PeerBTAddr, RemoteChannel, - TransactionID, RemoteAddress, RemoteChannel, - RVIProtocol, Certificates, Signature, CompSpec); - - + RemoteAddress, RemoteChannel, + RVIProtocol, Credentials, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> - Conn = {PeerBTAddr, PeerChannel}, - [ TransactionID, Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE], + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - process_availability( - Msg, FromPid, PeerBTAddr, PeerChannel, TransactionID, CompSpec); - _ -> - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + log("sa from ~s:~w", [PeerBTAddr, PeerChannel], CS), + process_announce(Status, Services, FromPid, PeerBTAddr, + PeerChannel, CS); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -414,14 +392,14 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data, ?DLINK_ARG_DATA], Elems, undefined), process_data(FromPid, PeerBTAddr, PeerChannel, - ProtocolMod, Data, CompSpec); + ProtocolMod, Data, CS); ?DLINK_CMD_PING -> ?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]), ok; undefined -> - ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]), + ?error("dlink_bt:data() cmd undefined., ~p", [ Elems ]), ok end. @@ -567,7 +545,7 @@ handle_cast(Other, St) -> {noreply, St}. -handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> +handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, #st{mode = Mode} = St) -> %% Do we already have a connection that supchannel service? case get_connections_by_service(Service) of [] -> %% Nope @@ -580,7 +558,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> Addr -> [ Address, Channel] = string:tokens(Addr, "-"), - case connect_remote(Address, list_to_integer(Channel), St#st.cs) of + case connect_remote(Address, list_to_integer(Channel), Mode, St#st.cs) of ok -> { reply, [ok, 2000], St }; %% 2 second timeout @@ -613,27 +591,20 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> ?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]), - Res = bt_connection:send(ConnPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ]})), - + Res = bt_connection:send( + ConnPid, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, Data } + ]), { reply, [ Res ], St} end; - - handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) -> %% Create a timer to handle periodic pings. - {ok, ServerOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - server_opts, [], - St#st.cs), + ServerOpts = get_server_opts(St#st.cs), Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec", @@ -656,12 +627,7 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) -> case bt_connection:is_connection_up(Pid) of true -> ?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]), - bt_connection:send(Pid, term_to_json( - { struct, - [ { ?DLINK_ARG_CMD, - ?DLINK_CMD_PING - }]})), - + bt_connection:send(Pid, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Channel, Timeout }); @@ -671,8 +637,9 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) -> {noreply, St}; %% Setup static nodes -handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, St) -> - connect_and_retry_remote(BTAddr, Channel, CompSpec), +handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, + #st{mode = Mode} = St) -> + connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec), { noreply, St }; handle_info(Info, St) -> @@ -688,17 +655,15 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> {ok,[{address, Address }]} = bt_drv:local_info([address]), bt_connection:send(Pid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, - { ?DLINK_ARG_PORT, SetupChannel }, - { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, - { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, + { ?DLINK_ARG_PORT, SetupChannel }, + { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, + { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } + | log_id_tail(CompSpec)]). connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) -> + log("authorized: ~s:~p", [RemoteAddress, RemoteChannel], CompSpec), case bt_connection_manager:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_bt:authorize(): New connection!"), @@ -721,15 +686,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ?info("dlink_bt:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteAddress, RemoteChannel]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), - bt_connection:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } ]})), - + AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec), + log("sending sa: ~s:~w", [RemoteAddress, RemoteChannel], CompSpec), + bt_connection:send(FromPid, AvailabilityMsg), %% Setup ping interval gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemoteChannel, FromPid }), ok. @@ -828,10 +787,6 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). - -term_to_json(Term) -> - binary_to_list(iolist_to_binary(exo_json:encode(Term))). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; @@ -841,28 +796,25 @@ opt(K, L, Def) -> opts(Keys, Elems, Def) -> [ opt(K, Elems, Def) || K <- Keys]. -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; - [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) - end. +start_log(Pfx, Fmt, Args, CS) -> + LogId = rvi_log:new_id(Pfx), + rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogId, CS). -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of - [ok] -> - true; - [not_found] -> - false - end. +log(Fmt, Args, CS) -> + log(info, Fmt, Args, CS). + +log(Lvl, Fmt, Args, CS) -> + rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS). + +log_id_tail(CompSpec) -> + rvi_common:log_id_json_tail(CompSpec). diff --git a/components/dlink_sms/src/dlink_sms.app.src b/components/dlink_sms/src/dlink_sms.app.src index a394888..d0f86e4 100644 --- a/components/dlink_sms/src/dlink_sms.app.src +++ b/components/dlink_sms/src/dlink_sms.app.src @@ -1,8 +1,9 @@ +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- %% %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -16,7 +17,8 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + dlink ]}, {mod, { dlink_sms_app, []}}, {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 77300d9..b24215c 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -36,6 +36,7 @@ -define(SERVER, ?MODULE). +-define(PACKET_MOD, dlink_data_json). -record(st, { ip = {0,0,0,0}, @@ -44,7 +45,9 @@ mod = undefined, func = undefined, args = undefined, - pst = undefined %% Payload state + packet_mod = ?PACKET_MOD, + packet_st = [], + cs }). %%%=================================================================== @@ -52,8 +55,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, Arg) -> - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of +setup(IP, Port, Sock, Mod, Fun, CS) -> + ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), + case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -120,7 +124,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, Arg}) -> +init({IP, Port, Sock, Mod, Fun, CompSpec}) -> case IP of undefined -> ok; _ -> connection_manager:add_connection(IP, Port, self()) @@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) -> ?debug("connection:init(): Sock: ~p", [Sock]), ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), - ?debug("connection:init(): Arg: ~p", [Arg]), - %% Grab socket control + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), + PktSt = PktMod:init(CompSpec), {ok, #st{ ip = IP, port = Port, sock = Sock, mod = Mod, func = Fun, - args = Arg, - pst = undefined + packet_mod = PktMod, + packet_st = PktSt, + cs = CompSpec }}. +get_module_config(Key, Default, CS) -> + rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS). %%-------------------------------------------------------------------- %% @private @@ -182,13 +189,14 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({send, Data}, St) -> +handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), + {ok, Encoded, PSt1} = PMod:encode(Data, PSt), + ?debug("Encoded = ~p", [Encoded]), + gen_tcp:send(St#st.sock, Encoded), - gen_tcp:send(St#st.sock, Data), - - {noreply, St}; + {noreply, St#st{packet_st = PSt1}}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data}, handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, - mod = Mod, - func = Fun, - args = Arg, - pst = PST} = State) -> + packet_mod = PMod, + packet_st = PSt} = State) -> ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), - - case jsx_decode_stream(Data, PST) of - { [], NPST } -> - ?debug("handle_info(data incomplete)", []), + case PMod:decode(Data, fun(Elems) -> + handle_elements(Elems, State) + end, PSt) of + {ok, PSt1} -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st { pst = NPST} }; - - { JSONElements, NPST } -> - ?debug("data complete: Processed: ~p", - [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]), - FromPid = self(), - [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg) - || SingleElem <- JSONElements], - inet:setopts(Sock, [ { active, once } ]), - {noreply, State#st { pst = NPST} } + {noreply, State#st{packet_st = PSt1}}; + {error, Reason} -> + ?error("decode failed, Reason = ~p", [Reason]), + {stop, Reason, State} end; - - handle_info({tcp_closed, Sock}, #st { ip = IP, port = Port, @@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock}, func = Fun, args = Arg } = State) -> ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]), - Mod:Fun(self(), IP, Port,closed, Arg), + Mod:Fun(self(), IP, Port, closed, Arg), gen_tcp:close(Sock), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; @@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -jsx_decode_stream(Data, St) -> - jsx_decode_stream(Data, St, []). - -jsx_decode_stream(Data, undefined, Acc) -> - case jsx:decode(Data, [stream, return_tail]) of - {incomplete, Cont} -> - {lists:reverse(Acc), Cont}; - {with_tail, Elems, <<>>} -> - {lists:reverse([Elems|Acc]), undefined}; - {with_tail, Elems, Rest} -> - jsx_decode_stream(Rest, undefined, [Elems|Acc]) - end. +%% jsx_decode_stream(Data, St) -> +%% jsx_decode_stream(Data, St, []). + +%% jsx_decode_stream(Data, undefined, Acc) -> +%% case jsx:decode(Data, [stream, return_tail]) of +%% {incomplete, Cont} -> +%% {lists:reverse(Acc), Cont}; +%% {with_tail, Elems, <<>>} -> +%% {lists:reverse([Elems|Acc]), undefined}; +%% {with_tail, Elems, Rest} -> +%% jsx_decode_stream(Rest, undefined, [Elems|Acc]) +%% end. + +%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) -> +%% case PMod:decode(Data, PSt) of +%% {ok, Elements, PSt1} -> +%% ?debug("data complete: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% {ok, PSt1}; +%% {more, Elements, Rest, PSt1} -> +%% ?debug("data complete with Rest: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS); +%% {more, PSt1} -> +%% {ok, PSt1}; +%% { -> + +handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, + ip = IP, port = Port}) -> + ?debug("data complete: Processed: ~p", + [authorize_keys:abbrev(Elements)]), + Mod:Fun(self(), IP, Port, data, Elements, CS). diff --git a/components/dlink_tcp/src/dlink_tcp.app.src b/components/dlink_tcp/src/dlink_tcp.app.src index 53a32b8..5ba7760 100644 --- a/components/dlink_tcp/src/dlink_tcp.app.src +++ b/components/dlink_tcp/src/dlink_tcp.app.src @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -16,7 +16,8 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + dlink ]}, {mod, { dlink_tcp_app, []}}, {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index b3ec80d..83e0a24 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -44,7 +44,7 @@ -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.0"). +-define(DLINK_TCP_VERSION, "1.1"). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) -> %% Setup a genserver around the new connection. {ok, Pid } = connection:setup(IP, Port, Sock, - ?MODULE, handle_socket, [CompSpec] ), + ?MODULE, handle_socket, CompSpec ), %% Send authorize - { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - connection:send( - Pid, - term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, - get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) - ])), + send_authorize(Pid, CompSpec), ok; {error, Err } -> @@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) -> CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), case connect_remote(IP, list_to_integer(Port), CS) of ok -> - log("connected", [], CS), + log(result, "connected", [], CS), ok; Err -> %% Failed to connect. Sleep and try again @@ -266,16 +253,8 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), - Res = connection:send( - ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec) - ])), + Msg = availability_msg(Availability, [Service], CompSpec), + Res = connection:send(ConnPid, Msg), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) -> handle_socket_(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); -handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> +handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. -handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> +handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]), @@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - RemoteAddress, + [ RemoteAddress, RemotePort, ProtoVersion, - CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + Credentials ] = + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), - - Certificates = - case CertificatesTmp of - C when is_list(C) -> C; - undefined -> [] - end, process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CS); + RemoteAddress, RemotePort, + ProtoVersion, Credentials, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - ProtoVersion, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_SIGNATURE], + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), - Conn = {PeerIP, PeerPort}, log("sa from ~s:~w", [PeerIP, PeerPort], CS), - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - ?debug("Service Announce~nMsg = ~p~n", [Msg]), - process_announce(Msg, FromPid, PeerIP, PeerPort, - TransactionID, ProtoVersion, CompSpec); - _ -> - log("sa INVALID", [], CS), - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) -> {noreply, St}; handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) -> + ?debug("handle_socket, Arg (CS) = ~p", [Arg]), try handle_socket_(FromPid, IP, Port, Event, Arg) catch C:E -> ?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), + error("Caught ~p:~p", [C, E]), ok end, {noreply, St}; @@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> Res = connection:send(ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ])), + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, Data } + ]), { reply, [ Res ], St} end; @@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) -> }) || SvcName <- SvcNameList ], ok. -availability_msg(Availability, Services) -> - [{ ?DLINK_ARG_STATUS, status_string(Availability) }, - { ?DLINK_ARG_SERVICES, Services }]. +availability_msg(Availability, Services, CompSpec) -> + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, Services } + | log_id_tail(CompSpec) ]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, - RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> +process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, ProtoVersion, Credentials, CompSpec) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), - ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]), - ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), - + ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]), {NRemoteAddress, NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, - log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), - case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. + log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn), + connection_authorized(FromPid, Conn, CompSpec). send_authorize(Pid, CompSpec) -> {LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) ])). + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) }, + { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, + { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } + | log_id_tail(CompSpec) ]). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered @@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteIP, RemotePort]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), + AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec), log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), - connection:send(FromPid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec)])), - + connection:send(FromPid, AvailabilityMsg), %% Setup ping interval gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), ok. @@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> ?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]), ?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), Proto = list_to_existing_atom(ProtocolMod), - Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, - base64:decode_to_string(Data)). + Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data). -process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) -> - [ Avail, - Svcs ] = - opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), +process_announce(Avail, Services, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), - ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]), - ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]), + ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]), case Avail of ?DLINK_ARG_AVAILABLE -> - add_services(Svcs, FromPid), - service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); + add_services(Services, FromPid), + service_discovery_rpc:register_services(CompSpec, Services, ?MODULE); ?DLINK_ARG_UNAVAILABLE -> - delete_services(FromPid, Svcs), - service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) + delete_services(FromPid, Services), + service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE) end, ok. @@ -837,36 +775,15 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). - -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; - [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) - end. - -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of - [ok] -> - true; - [not_found] -> - false - end. - -term_to_json(Term) -> - rvi_common:term_to_json(Term). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; @@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) -> rvi_common:set_value(rvi_log_id, LogId, CS). log(Fmt, Args, CS) -> - rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). + log(info, Fmt, Args, CS). + +log(Lvl, Fmt, Args, CS) -> + rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS). + +log_id_tail(CompSpec) -> + rvi_common:log_id_json_tail(CompSpec). diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 45c0691..4512a59 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -63,7 +63,7 @@ terminate(_Reason, _State) -> ok. sock_opts() -> - [binary, {active, once}, {packet, 4}]. + [binary, {active, once}, {packet, 0}]. new_connection(IP, Port, Sock, State) -> ?debug("listener:new_connection(): Peer IP: ~p (ignored)", [IP]), @@ -75,5 +75,5 @@ new_connection(IP, Port, Sock, State) -> %% Provide component spec as extra arg. {ok, _P} = connection:setup(undefined, 0, Sock, dlink_tcp_rpc, - handle_socket, [gen_nb_server:get_cb_state(State)]), + handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. diff --git a/components/dlink_tls/src/dlink_tls.app.src b/components/dlink_tls/src/dlink_tls.app.src index 7b53a18..3698775 100644 --- a/components/dlink_tls/src/dlink_tls.app.src +++ b/components/dlink_tls/src/dlink_tls.app.src @@ -15,7 +15,8 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + dlink ]}, {mod, { dlink_tls_app, []}}, {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index c125ca3..583009c 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -19,7 +19,7 @@ -behaviour(gen_server). -include_lib("lager/include/log.hrl"). - +-include_lib("public_key/include/public_key.hrl"). %% API @@ -145,7 +145,6 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): Sock: ~p", [Sock]), ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), - %% Grab socket control {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), PktSt = PktMod:init(CompSpec), {ok, #st{ @@ -194,7 +193,11 @@ handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) -> ?debug("upgrade to TLS succcessful~n", []), ssl:setopts(NewS, [{active, Last}]), {ok, {IP, Port}} = ssl:peername(NewS), - NewCS = rvi_common:set_value(dlink_tls_role, client, CompSpec), + {ok, PeerCert} = ssl:peercert(NewS), + ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), + NewCS = rvi_common:set_value( + dlink_tls_role, Role, + rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)), {reply, ok, St#st{sock = NewS, mode = tls, role = Role, ip = inet_parse:ntoa(IP), port = Port, cs = NewCS}}; @@ -219,7 +222,7 @@ handle_call(_Request, _From, State) -> %%-------------------------------------------------------------------- handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_call(send): Sending: ~p", - [ ?MODULE, Data]), + [ ?MODULE, abbrev(Data)]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), case St#st.mode of tcp -> gen_tcp:send(St#st.sock, Encoded); @@ -253,52 +256,38 @@ handle_info({tcp, Sock, Data}, #st { ip = undefined } = St) -> {ok, {IP, Port}} = inet:peername(Sock), NSt = St#st { ip = inet_parse:ntoa(IP), port = Port }, - ?warning("YESSSS"), handle_info({tcp, Sock, Data}, NSt); -handle_info({ssl, Sock, Data}, - #st{ip = IP, port = Port, - mod = Mod, func = Fun, cs = CS, - packet_mod = PMod, packet_st = PSt} = State) -> - ?debug("handle_info(data): ~p", [Data]), - case PMod:decode(Data, PSt) of - {ok, Elements, PSt1} -> - ?debug("~p:handle_info(data complete): Processed: ~p", - [?MODULE, Elements]), - Mod:Fun(self(), IP, Port, data, Elements, CS), +handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port, + packet_mod = PMod, packet_st = PSt} = State) -> + ?debug("handle_info(data): Data: ~p", [abbrev(Data)]), + ?debug("handle_info(data): From: ~p:~p ", [ IP, Port]), + case PMod:decode(Data, fun(Elems) -> + handle_elems(Elems, State) + end, PSt) of + {ok, PSt1} -> ssl:setopts(Sock, [{active, once}]), {noreply, State#st{packet_st = PSt1}}; {error, Reason} -> - {stop, Reason, State}; - {more, PSt1} -> - ?debug("~p:handle_info(data incomplete)", [ ?MODULE]), - ssl:setopts(Sock, [{active, once}]), - {noreply, State#st{packet_st = PSt1}} + {stop, Reason, State} end; handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, - mod = Mod, - func = Fun, - cs = CS, packet_mod = PMod, packet_st = PSt} = State) -> - ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), - ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]), - - case PMod:decode(Data, PSt) of - {ok, Elements, PSt1} -> - ?debug("~p:handle_info(data complete): Processed: ~p", - [?MODULE, Elements]), - Mod:Fun(self(), IP, Port, data, Elements, CS), + ?debug("handle_info(data): Data: ~p", [Data]), + ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), + + case PMod:decode(Data, fun(Elems) -> + handle_elems(Elems, State) + end, PSt) of + {ok, PSt1} -> inet:setopts(Sock, [{active, once}]), {noreply, State#st{packet_st = PSt1}}; {error, Reason} -> - {stop, Reason, State}; - {more, PSt1} -> - ?debug("~p:handle_info(data incomplete)", [ ?MODULE]), - inet:setopts(Sock, [{active, once}]), - {noreply, State#st{packet_st = PSt1}} + ?debug("decode failed, Reason = ~p", [Reason]), + {stop, Reason, State} end; handle_info({tcp_closed, Sock}, @@ -361,23 +350,87 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== do_upgrade(Sock, client, CompSpec) -> - ssl:connect(Sock, tls_opts(client, CompSpec)); + Opts = tls_opts(client, CompSpec), + ?debug("TLS Opts = ~p", [Opts]), + ssl:connect(Sock, Opts); do_upgrade(Sock, server, CompSpec) -> - ssl:ssl_accept(Sock, tls_opts(server, CompSpec)). + Opts = tls_opts(client, CompSpec), + ?debug("TLS Opts = ~p", [Opts]), + ssl:ssl_accept(Sock, Opts). %% FIXME: For now, use the example certs delivered with the OTP SSL appl. -tls_opts(Role, CompSpec) -> - Dir = tls_dir(Role), - {ok, CertFile} = get_config(certfile, filename:join(Dir, "cert.pem"), CompSpec), - {ok, KeyFile} = get_config(keyfile, filename:join(Dir, "key.pem"), CompSpec), - [{certfile, CertFile}, - {keyfile, KeyFile}]. - -tls_dir(Role) when Role==client; - Role==server -> - filename:join([code:lib_dir(ssl), "examples", "certs", "etc", - atom_to_list(Role)]). - -get_config(Key, Default, CompSpec) -> - rvi_common:get_module_config( - dlink_tls, dlink_tls_rpc, Key, Default, CompSpec). +tls_opts(Role, _CompSpec) -> + {ok, DevCert} = setup:get_env(rvi_core, device_cert), + {ok, DevKey} = setup:get_env(rvi_core, device_key), + {ok, CACert} = setup:get_env(rvi_core, root_cert), + [ + {verify, verify_peer}, + {certfile, DevCert}, + {keyfile, DevKey}, + {cacertfile, CACert}, + {verify_fun, {fun verify_fun/3, public_root_key()}}, + {partial_chain, fun(X) -> + partial_chain(Role, X) + end} + ]. + +public_root_key() -> + authorize_keys:provisioning_key(). + +verify_fun(Cert, What, St) -> + ?debug("verify_fun(~p, ~p, ~p)", [abbrev(Cert), What, abbrev(St)]), + verify_fun_(Cert, What, St). + +verify_fun_(Cert, {bad_cert, selfsigned_peer}, PubKey) -> + ?debug("Verify self-signed cert: ~p", [abbrev(Cert)]), + try verify_cert_sig(Cert, PubKey) of + true -> + ?debug("verified!", []), + {valid, PubKey}; + false -> + ?debug("verification FAILED", []), + {bad_cert, invalid_signature} + catch + error:Error -> + ?debug("Caught error:~p~n~p", [Error, erlang:get_stacktrace()]), + {fail, PubKey} + end; +verify_fun_(_, {bad_cert, Reason}, St) -> + ?debug("Bad cert: ~p", [Reason]), + {fail, St}; +verify_fun_(_, {extension, _}, St) -> + {unknown, St}; +verify_fun_(_, valid, St) -> + {valid, St}; +verify_fun_(_, valid_peer, St) -> + {valid_peer, St}. + +partial_chain(_, Certs) -> + ?debug("partial_chain() invoked, length(Certs) = ~w", [length(Certs)]), + Decoded = (catch [public_key:der_decode('Certificate', C) + || C <- Certs]), + ?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]), + {trusted_ca, hd(Certs)}. + +handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS, + ip = IP, port = Port}) -> + ?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]), + Mod:Fun(self(), IP, Port, data, Elements, CS), + ok. + +verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS, + signature = Sig}, PubKey) -> + DER = public_key:pkix_encode('OTPTBSCertificate', TBS, otp), + {SignType, _} = signature_algorithm(TBS), + public_key:verify(DER, SignType, Sig, PubKey). + +signature_algorithm(#'OTPCertificate'{tbsCertificate = TBS}) -> + signature_algorithm(TBS); +signature_algorithm(#'OTPTBSCertificate'{ + signature = #'SignatureAlgorithm'{ + algorithm = Algo}}) -> + public_key:pkix_sign_types(Algo). + + +abbrev(T) -> + authorize_keys:abbrev(T). diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index e42a156..9df8f54 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -100,26 +100,14 @@ start_connection_manager() -> [], CompSpec), ?debug("TlsOpts = ~p", [TlsOpts]), - IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), - Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), - ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener dlink_tls_connmgr:start_link(), {ok,Pid} = dlink_tls_listener:start_link(), - ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), - %% Add listener port. - case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of - ok -> - ?notice("---- RVI Node External Address: ~s", - [ application:get_env(rvi_core, node_address, undefined)]); + setup_initial_listeners(Pid, TlsOpts, CompSpec), - Err -> - ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), - ok - end, ?info("dlink_tls:init_rvi_component(): Setting up persistent connections."), {ok, PersistentConnections } = rvi_common:get_module_config(data_link, @@ -130,6 +118,23 @@ start_connection_manager() -> setup_persistent_connections_(PersistentConnections, CompSpec), ok. +setup_initial_listeners(Pid, [], CompSpec) -> + ?debug("no initial listeners", []); +setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) -> + IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), + Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), + %% Add listener port. + ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), + case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of + ok -> + ?notice("---- RVI Node External Address: ~s", + [ application:get_env(rvi_core, node_address, undefined)]); + + Err -> + ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), + ok + end. + setup_persistent_connections_([ ], _CompSpec) -> ok; @@ -324,7 +329,7 @@ handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) -> handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]), - ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]), + ?debug("data(): Elems ~p~nCS = ~p", [abbrev(Elems), abbrev(CompSpec)]), CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), @@ -333,28 +338,28 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), [ RemoteAddress, RemotePort, - Signature ] = + Credentials ] = opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, - Signature, CS); - - ?DLINK_CMD_CERT_EXCHANGE -> - ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]), - [ Certs ] = - opts([?DLINK_ARG_CERTIFICATES], Elems, undefined), - ?debug("Certs = ~p", [Certs]), - log("certs from ~s:~w", [PeerIP, PeerPort], CS), - authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}), - case rvi_common:get_value(dlink_tls_role, client, CS) of - client -> ok; - server -> - send_certs(FromPid, CompSpec) - end, - ok; + Credentials, CS); + + %% ?DLINK_CMD_CRED_EXCHANGE -> + %% ?debug("got cred exch ~s:~w", [PeerIP, PeerPort]), + %% [ Creds ] = + %% opts([?DLINK_ARG_CREDENTIALS], Elems, undefined), + %% ?debug("Creds = ~p", [Creds]), + %% log("creds from ~s:~w", [PeerIP, PeerPort], CS), + %% authorize_rpc:store_creds(CS, Creds, {PeerIP, PeerPort}), + %% case rvi_common:get_value(dlink_tls_role, client, CS) of + %% client -> ok; + %% server -> + %% send_creds(FromPid, CompSpec) + %% end, + %% ok; ?DLINK_CMD_SERVICE_ANNOUNCE -> ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), @@ -364,7 +369,6 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?DLINK_ARG_SERVICES], Elems, undefined), - Conn = {PeerIP, PeerPort}, log("sa from ~s:~w", [PeerIP, PeerPort], CS), process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); @@ -644,12 +648,11 @@ status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, - RemotePort, Signature, CompSpec) -> + RemotePort, Credentials, CompSpec) -> ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]), ?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]), - ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), - { _NRemoteAddress, _NRemotePort} = Conn = + { NRemoteAddress, NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -658,33 +661,30 @@ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, { PeerIP, PeerPort }; _ -> { RemoteAddress, RemotePort} end, - - case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. - -send_certs(Pid, CompSpec) -> - ?debug("send_certs (Pid = ~p)", [Pid]), - {LocalIP, LocalPort} = rvi_common:node_address_tuple(), - dlink_tls_conn:send(Pid, rvi_common:pass_log_id( - [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE}, - {?DLINK_ARG_ADDRESS, LocalIP}, - {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, - {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)). + log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), + PeerCert = rvi_common:get_value(dlink_tls_peer_cert, not_found, CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn, PeerCert), + connection_authorized(FromPid, Conn, CompSpec). + +%% send_creds(Pid, CompSpec) -> +%% ?debug("send_creds (Pid = ~p)", [Pid]), +%% {LocalIP, LocalPort} = rvi_common:node_address_tuple(), +%% dlink_tls_conn:send(Pid, rvi_common:pass_log_id( +%% [{?DLINK_ARG_CMD, ?DLINK_CMD_CRED_EXCHANGE}, +%% {?DLINK_ARG_ADDRESS, LocalIP}, +%% {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, +%% {?DLINK_ARG_CREDENTIALS, [get_credentials(CompSpec)]}], CompSpec)). send_authorize(Pid, CompSpec) -> - ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]), + ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, abbrev(CompSpec)]), {LocalIP, LocalPort} = rvi_common:node_address_tuple(), - JWT = get_authorize_jwt(CompSpec), + Creds = get_credentials(CompSpec), dlink_tls_conn:send(Pid, rvi_common:pass_log_id( [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE}, + {?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION}, {?DLINK_ARG_ADDRESS, LocalIP}, {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, - {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)). + {?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)). %% dlink_tls_conn:send(Pid, %% term_to_json( %% {struct, @@ -708,18 +708,8 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid), ?debug("dlink_tls:authorize(): Sending authorize."), _Res = send_authorize(FromPid, CompSpec), - case rvi_common:get_value(dlink_tls_role, server, CompSpec) of - server -> ok; - client -> - send_certs(FromPid, CompSpec) - end, - %% ?debug("dlink_tls:upgrade connection", []), - %% UgRes = dlink_tls_conn:upgrade(FromPid, server), - %% ?debug("upgrade result: ~p", [UgRes]), ok; _ -> - %% UgRes = dlink_tls_conn:upgrade(FromPid, client), - %% ?debug("upgrade result: ~p", [UgRes]), ok end, @@ -797,31 +787,31 @@ get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; - [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) - end. +%% get_authorize_jwt(CompSpec) -> +%% case authorize_rpc:get_authorize_jwt(CompSpec) of +%% [ok, JWT] -> +%% JWT; +%% [not_found] -> +%% ?error("No authorize JWT~n", []), +%% error(cannot_authorize) +%% end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -validate_auth_jwt(JWT, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of - [ok] -> - true; - [not_found] -> - false - end. +%% validate_auth_jwt(JWT, Conn, CompSpec) -> +%% case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of +%% [ok] -> +%% true; +%% [not_found] -> +%% false +%% end. term_to_json(Term) -> binary_to_list(iolist_to_binary(exo_json:encode(Term))). @@ -834,7 +824,7 @@ opt(K, L, Def) -> opts(Keys, Elems, Def) -> Res = [ opt(K, Elems, Def) || K <- Keys], - ?debug("opts(~p) -> ~p", [Keys, Elems]), + ?debug("opts(~p) -> ~p", [Keys, abbrev(Elems)]), Res. @@ -848,3 +838,6 @@ start_log(Pfx, Fmt, Args, CS) -> log(Fmt, Args, CS) -> rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). + +abbrev(Data) -> + authorize_keys:abbrev(Data). diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl index f4f4cb8..6ccabd2 100644 --- a/components/rvi_common/include/rvi_dlink_bin.hrl +++ b/components/rvi_common/include/rvi_dlink_bin.hrl @@ -8,24 +8,24 @@ %% Commonly used protocol identifiers across dlink implementations %% --define(DLINK_CMD_AUTHORIZE, <<"au">>). --define(DLINK_CMD_CERT_EXCHANGE, <<"crt">>). --define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>). --define(DLINK_CMD_RECEIVE, <<"rcv">>). --define(DLINK_CMD_FRAG, <<"frg">>). --define(DLINK_CMD_PING, <<"ping">>). +-define(DLINK_CMD_AUTHORIZE, <<"au">>). +-define(DLINK_CMD_CRED_EXCHANGE, <<"cre">>). +-define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>). +-define(DLINK_CMD_RECEIVE, <<"rcv">>). +-define(DLINK_CMD_FRAG, <<"frg">>). +-define(DLINK_CMD_PING, <<"ping">>). --define(DLINK_ARG_CMD, <<"cmd">>). --define(DLINK_ARG_TRANSACTION_ID, <<"tid">>). --define(DLINK_ARG_ADDRESS, <<"addr">>). --define(DLINK_ARG_PORT, <<"port">>). --define(DLINK_ARG_VERSION, <<"ver">>). --define(DLINK_ARG_CERTIFICATE, <<"cert">>). --define(DLINK_ARG_CERTIFICATES, <<"certs">>). --define(DLINK_ARG_SIGNATURE, <<"sign">>). --define(DLINK_ARG_SERVICES, <<"svcs">>). --define(DLINK_ARG_MODULE, <<"mod">>). --define(DLINK_ARG_AVAILABLE, <<"av">>). --define(DLINK_ARG_UNAVAILABLE, <<"un">>). --define(DLINK_ARG_STATUS, <<"stat">>). --define(DLINK_ARG_DATA, <<"data">>). +-define(DLINK_ARG_CMD, <<"cmd">>). +-define(DLINK_ARG_TRANSACTION_ID, <<"tid">>). +-define(DLINK_ARG_ADDRESS, <<"addr">>). +-define(DLINK_ARG_PORT, <<"port">>). +-define(DLINK_ARG_VERSION, <<"ver">>). +-define(DLINK_ARG_CREDENTIAL, <<"cred">>). +-define(DLINK_ARG_CREDENTIALS, <<"creds">>). +-define(DLINK_ARG_SIGNATURE, <<"sign">>). +-define(DLINK_ARG_SERVICES, <<"svcs">>). +-define(DLINK_ARG_MODULE, <<"mod">>). +-define(DLINK_ARG_AVAILABLE, <<"av">>). +-define(DLINK_ARG_UNAVAILABLE, <<"un">>). +-define(DLINK_ARG_STATUS, <<"stat">>). +-define(DLINK_ARG_DATA, <<"data">>). diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl index 31700ca..cc3e7b9 100644 --- a/components/rvi_common/src/exoport_exo_http.erl +++ b/components/rvi_common/src/exoport_exo_http.erl @@ -31,6 +31,13 @@ instance(SupMod, AppMod, Opts) -> handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == 'POST' -> + ensure_ready(fun handle_post/4, Socket, Request, Body, AppMod); + +handle_body(Socket, _Request, _Body, _AppMod) -> + exo_http_server:response(Socket, undefined, 404, "Not Found", + "Object not found. Try using POST method."). + +handle_post(Socket, Request, Body, AppMod) -> case Request#http_request.headers of #http_chdr{content_type = "application/json" ++ _ = T} when T=="application/json"; @@ -39,11 +46,7 @@ handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == ' handle_post_json(Socket, Request, Body, AppMod); #http_chdr{content_type = "multipart/" ++ _} -> handle_post_multipart(Socket, Request, Body, AppMod) - end; - -handle_body(Socket, _Request, _Body, _AppMod) -> - exo_http_server:response(Socket, undefined, 404, "Not Found", - "Object not found. Try using POST method."). + end. handle_post_json(Socket, _Request, Body, AppMod) -> try decode_json(Body) of @@ -211,3 +214,15 @@ opt(K, L, Def) -> {_, V} -> V; false -> Def end. + + +ensure_ready(F, Socket, Request, Body, AppMod) -> + try rvi_server:ensure_ready(_Timeout = 10000), + F(Socket, Request, Body, AppMod) + catch + error:timeout -> + ?error("~p timeout waiting for rvi_core", [?MODULE]), + exo_http_server:response(Socket, undefined, 501, + "Internal Error", + "Internal Error") + end. diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index c9bbabb..0b54d53 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -191,7 +191,6 @@ notification(Component, ?debug(" Module : ~p", [Module]), ?debug(" Function : ~p", [Function]), ?debug(" InArgPropList : ~p", [InArgPropList]), - ?debug(" CompSpec : ~p", [CompSpec]), case get_module_type(Component, Module, CompSpec) of %% We have a gen_server @@ -288,6 +287,8 @@ unstruct(X) -> %% If Path is just a single element, convert to list and try again. +get_json_element(_, []) -> + {error, undefined}; get_json_element(ElemPath, JSON) when is_atom(ElemPath) -> get_json_element([ElemPath], JSON); @@ -299,7 +300,7 @@ get_json_element(ElemPath, JSON) when is_tuple(JSON) -> get_json_element(ElemPath, [T|_] = JSON) when is_tuple(T) -> get_json_element_(ElemPath, JSON); -get_json_element(ElemPath, JSON) when is_list(JSON) -> +get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) -> case exo_json:decode_string(JSON) of {ok, Data } -> get_json_element_(ElemPath, Data); @@ -311,7 +312,7 @@ get_json_element(ElemPath, JSON) when is_list(JSON) -> get_json_element(P, J) -> ?warning("get_json_element(): Unknown call structure; Path: ~p | JSON: ~p", [P, J]), - {error, call, {P, J}}. + {error, {call, {P, J}}}. get_json_element_(_, undefined) -> { error, undefined }; @@ -508,7 +509,7 @@ get_component_config_(Component, Default, CompList) -> get_component_specification() -> CS = get_component_specification_(), - lager:debug("CompSpec = ~p", [CS]), + %% lager:debug("CompSpec = ~p", [CS]), CS. get_component_specification_() -> @@ -580,8 +581,8 @@ get_component_modules(_, _) -> get_module_specification(Component, Module, CompSpec) -> case get_component_modules(Component, CompSpec) of undefined -> - ?debug("get_module_specification(): Missing: rvi_core:component: ~p: ~p", - [Component, CompSpec]), + ?debug("get_module_specification(): Missing: rvi_core:component: ~p", + [Component]), undefined; Modules -> @@ -614,7 +615,7 @@ get_module_config(Component, Module, Key, CompSpec) -> case proplists:get_value(Key, ModConf, undefined ) of undefined -> ?debug("get_module_config(): Missing component spec: " - "rvi_core:component:~p:~p:~p{...}: ~p", + "~p:~p:~p{...}: ~p", [Component, Module, Key, ModConf]), {error, {not_found, Component, Module, Key}}; @@ -764,9 +765,9 @@ start_json_rpc_server(Component, Module, Supervisor, XOpts) -> Module, ExoHttpOpts); Err -> - ?info("rvi_common:start_json_rpc_server(~p:~p): " - "No JSON-RPC address setup. skip", - [ Component, Module ]), + ?debug("start_json_rpc_server(~p:~p): " + "No JSON-RPC address setup. skip", + [ Component, Module ]), Err end. diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl index 5d0fa8e..962252c 100644 --- a/components/rvi_common/src/rvi_log.erl +++ b/components/rvi_common/src/rvi_log.erl @@ -2,15 +2,22 @@ -behaviour(gen_server). +-compile(export_all). + -export([start_link/0, log/3, - flog/4, + flog/4, flog/5, new_id/1, fetch/1, timestamp/0, format/2 ]). +-export([entry/3, + exit_good/3, + exit_warn/3, + exit_fail/3]). + -export([start_json_server/0, handle_rpc/2, handle_notification/2]). @@ -28,6 +35,7 @@ seq = 1, node_tag}). -record(evt, {id, + level, component, event}). -define(IDS, rvi_log_ids). @@ -35,13 +43,35 @@ -define(MAX_LENGTH, 60). +%% levels +-define(INFO, 0). +-define(GOOD, 1). +-define(WARN, 2). +-define(FAIL, 3). + start_link() -> create_tabs(), gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +entry(TID, Component, Event) -> log(TID, ?INFO, Component, Event). +exit_good(TID, Component, Event) -> log(TID, ?GOOD, Component, Event). +exit_warn(TID, Component, Event) -> log(TID, ?WARN, Component, Event). +exit_fail(TID, Component, Event) -> log(TID, ?FAIL, Component, Event). + log(TID, Component, Event) when is_binary(TID), is_binary(Component) -> - gen_server:cast(?MODULE, {log, TID, timestamp(), Component, bin(Event)}). + log(TID, ?INFO, Component, Event). + +log(TID, Level, Component, Event) when is_binary(TID), is_binary(Component) -> + gen_server:cast(?MODULE, {log, TID, level_num(Level), timestamp(), Component, bin(Event)}). + + +level_num(L) when is_integer(L), L >= ?INFO, L =< ?FAIL -> + L; +level_num(info ) -> 0; +level_num(result ) -> 1; +level_num(warning) -> 2; +level_num(error ) -> 3. new_id(Prefix) -> gen_server:call(?MODULE, {new_id, bin(Prefix)}). @@ -63,18 +93,156 @@ trunc_msg(Bin) -> flog(Fmt, Args, Component, CS) -> + flog(0, Fmt, Args, Component, CS). + +flog(Level, Fmt, Args, Component, CS) -> LogTID = rvi_common:get_log_id(CS), - log(LogTID, Component, format(Fmt, Args)). + log(LogTID, Level, Component, format(Fmt, Args)). timestamp() -> os:timestamp(). +%% shorthand for the select pattern +-define(ELEM(A), {element, #evt.A, '$_'}). +-define(PROD, {{'$1', ?ELEM(level), ?ELEM(component), ?ELEM(event)}}). + fetch(Tids) -> + fetch(Tids, []). + +fetch(Tids, Args) -> TidSet = select_ids(Tids), - [{Tid, ets:select(?EVENTS, [{#evt{id = {Tid,'$1'}, - component = '$2', - event = '$3'}, [], [{{'$1', '$2', '$3'}}] }])} - || Tid <- TidSet]. + lists:foldr( + fun(Tid, Acc) -> + case match_events( + Args, + ets:select(?EVENTS, [{#evt{id = {Tid,'$1'}, + level = '$2', + component = '$3', + event = '$4'}, [], [?PROD] }])) of + [] -> Acc; + Events -> + [{Tid, Events}|Acc] + end + end, [], TidSet). + +match_events(_, []) -> + []; +match_events(Args, Events) -> + lists:foldl( + fun({<<"level">>, Str}, Acc) -> + filter_by_level(Acc, parse_level_expr(Str)); + (_, Acc) -> + Acc + end, Events, Args). + +%% The level comparison expressions use the following syntax: +%% Expr :: Int +%% | BinOp Expr +%% | UnaryOp Expr +%% | '(' Expr ')' +%% +%% BinOp :: '==' | '>=' | '=>' | '<=' | '=<' | '!=' | '|' | '&' +%% UnaryOp :: '!' | 'not' +%% +%% The token scanning pass translates the tokens to standard Erlang tokens +%% The parsing pass first infills variable references where needed, e.g. +%% "> 1" would be translated to "L > 1" (where L is a variable bound to the +%% current Event Level at evaluation time), and +%% "1" would be translated to "L == 1". +%% +%% The infill pass is needed in order to produce a legal Erlang grammar. It's followed +%% by a call to the erlang parser. The short forms are expanded in the parsed form. +%% +%% When parsing and evaluating the expressions any exception is translated into a failed +%% comparison. +%% +parse_level_expr(Str) -> + try level_grammar(scan_level_expr(Str, [])) + catch + error:_ -> + [{atom, 1, false}] + end. + +scan_level_expr(<<"!=", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'=/=',1}|Acc]); +scan_level_expr(<<"==", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'=:=',1}|Acc]); +scan_level_expr(<<"=>", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'=<',1}|Acc]); +scan_level_expr(<<"<=", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'>=',1}|Acc]); +scan_level_expr(<<">=", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'>=',1}|Acc]); +scan_level_expr(<<"=<", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'=<',1}|Acc]); +scan_level_expr(<<">", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'>',1}|Acc]); +scan_level_expr(<<"<", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'<',1}|Acc]); +scan_level_expr(<<"|", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'orelse',1}|Acc]); +scan_level_expr(<<"&", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'andalso',1}|Acc]); +scan_level_expr(<>, Acc) when I >= $0, I =< $3 -> + scan_level_expr(Rest, [{integer, 1, I-$0} | Acc]); +scan_level_expr(<<"(", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'(',1}|Acc]); +scan_level_expr(<<")", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{')',1}|Acc]); +scan_level_expr(<<"not", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'not',1}|Acc]); +scan_level_expr(<<"!", Rest/binary>>, Acc) -> + scan_level_expr(Rest, [{'not',1}|Acc]); +scan_level_expr(<<"\s", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc); +scan_level_expr(<<"\t", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc); +scan_level_expr(<<"\n", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc); +scan_level_expr(<<>>, Acc) -> + lists:reverse(Acc). + +level_grammar(Toks) -> + case erl_parse:parse_exprs(infill_vars(Toks)) of + {ok, Exprs} -> + expand_shorthand(Exprs); + Error -> + error(Error) + end. + +-define(is_op(H), H=='=:='; H=='=!='; H=='>'; H=='<'; H=='>='; H=='=<'). + +infill_vars([{O,_}=H|T]) when ?is_op(O) -> + [{var,1,'L'},H|infill_vars(T)]; +infill_vars([H|T]) -> + [H|infill_vars(T)]; +infill_vars([]) -> + [{dot,1}]. + +expand_shorthand([Expr|Exprs]) -> + [expand_shorthand_(Expr)|expand_shorthand(Exprs)]; +expand_shorthand([]) -> + []. + +expand_shorthand_({integer,_,_} = I) -> + {op, 1, '=:=', {var,1,'L'}, I}; +expand_shorthand_({op,Ln,Op,A,B}) when Op=='andalso'; + Op=='orelse' -> + {op,Ln,Op,expand_shorthand_(A), expand_shorthand_(B)}; +expand_shorthand_({op,Ln,'not',E}) -> + {op,Ln,'not',expand_shorthand_(E)}; +expand_shorthand_(Expr) -> + Expr. + +filter_by_level([H|T], Expr) -> + Res = try erl_eval:exprs(Expr, [{'L', element(2,H)}]) of + {value, true, _} -> true; + _ -> false + catch + _:_ -> false + end, + if Res -> + [H|filter_by_level(T, Expr)]; + true -> + filter_by_level(T, Expr) + end. init(_) -> {ok, Tag} = rvi_common:get_module_config( @@ -117,11 +285,11 @@ handle_notification(Other, _Args) -> ok. handle_cast(log_start, #st{n = N, node_tag = Tag} = St) -> - do_log(tid_(<<"rvi_log">>, 0, Tag), timestamp(), <<"rvi_common">>, + do_log(tid_(<<"rvi_log">>, 0, Tag), ?INFO, timestamp(), <<"rvi_common">>, format("Started - Tag = ~s", [Tag]), N), {noreply, St}; -handle_cast({log, TID, TS, Component, Event}, #st{n = N} = St) -> - do_log(TID, TS, Component, Event, N), +handle_cast({log, TID, Level, TS, Component, Event}, #st{n = N} = St) -> + do_log(TID, Level, TS, Component, Event, N), {noreply, St}; handle_cast(_, St) -> {noreply, St}. @@ -161,10 +329,10 @@ maybe_new(T, Opts) -> tid_(Prefix, Seq, Tag) -> <>. -do_log(Tid, TS, Component, Event, N) -> +do_log(Tid, Level, TS, Component, Event, N) -> case ets:member(?IDS, Tid) of true -> - store_event(Tid, TS, Component, Event); + store_event(Tid, Level, TS, Component, Event); false -> case ets:info(?IDS, size) of Sz when Sz >= N -> @@ -173,13 +341,14 @@ do_log(Tid, TS, Component, Event, N) -> ok end, ets:insert(?IDS, {Tid}), - store_event(Tid, TS, Component, Event) + store_event(Tid, Level, TS, Component, Event) end. -store_event(Tid, TS, Component, Event) -> - rvi_log_log:info("~-20s ~-12s ~s", [Tid, Component, Event]), - ?info("RVI_LOG: ~p/~p/~p", [Tid, Component, Event]), +store_event(Tid, Level, TS, Component, Event) -> + rvi_log_log:info("~-20s ~-2w ~-12s ~s", [Tid, Level, Component, Event]), + ?info("RVI_LOG: ~p/~w/~p/~p", [Tid, Level, Component, Event]), ets:insert(?EVENTS, #evt{id = {Tid, TS}, + level = Level, component = Component, event = Event}). diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index b4f14ab..bd7ab54 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -92,7 +92,7 @@ get_modules_by_service(CompSpec, Service) -> register_services(CompSpec, Services, DataLinkModule) -> ?debug("~p:register_services()", [?MODULE]), - ?debug(" CompSpec : ~p", [CompSpec]), + ?debug(" CompSpec : ~p", [authorize_keys:abbrev(CompSpec)]), ?debug(" Services : ~p", [Services]), ?debug(" DataLinkMod : ~p", [DataLinkModule]), rvi_common:notification(service_discovery, ?MODULE, register_services, diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index a7c75e0..576dc13 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -236,7 +236,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), - + LogId = log_id_json_tail(Params ++ Parameters), ?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]), ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]), ?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]), @@ -244,7 +244,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> case gen_server:call( ?SERVER, {rvi, handle_local_message, - [ SvcName, Timeout, Parameters]}) of + [ SvcName, Timeout, Parameters | LogId ]}) of [not_found] -> {ok, [{status, rvi_common:json(not_found)}]}; [Res, TID] -> @@ -318,9 +318,9 @@ handle_rpc(<<"message">>, Args) -> {ok, SvcName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args), + LogId = log_id_json_tail(Args ++ Parameters), [ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message, - [ SvcName, Timeout, Parameters]}), - + [ SvcName, Timeout, Parameters | LogId]}), {ok, [ { status, rvi_common:json_rpc_status(Res) }, { transaction_id, TID }, { method, <<"message">>} @@ -396,12 +396,12 @@ handle_notification(Other, _Args) -> %% the only calls invoked by other components, and not the locally %% connected services that uses the same HTTP port to transmit their %% register_service, and message calls. -handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> +handle_call({ rvi, register_local_service, [SvcName, URL | T] }, _From, St) -> ?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]), ?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]), FullSvcName = rvi_common:local_service_to_string(SvcName), - CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs), + CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs), ?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]), ets:insert(?SERVICE_TABLE, #service_entry { @@ -416,7 +416,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) -> %% Return ok. { reply, [ ok, FullSvcName ], St }; -handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) -> +handle_call({ rvi, unregister_local_service, [SvcName | T] }, _From, St) -> ?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [SvcName]), @@ -424,7 +424,7 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) -> %% Register with service discovery, will trigger callback to service_available() %% that forwards the registration to other connected services. - CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs), + CS = start_log(T, "unreg local service: ~s", [SvcName], St#st.cs), service_discovery_rpc:unregister_services(CS, [SvcName], local), %% Return ok. @@ -444,11 +444,11 @@ handle_call({rvi, get_available_services, []}, _From, St) -> %% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}] handle_call({ rvi, handle_local_message, - [SvcName, TimeoutArg, Parameters] }, _From, St) -> + [SvcName, TimeoutArg, Parameters | Tail] }, _From, St) -> ?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]), ?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]), ?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]), - CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs), + CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs), %% %% Authorize local message and retrieve a certificate / signature %% that will be accepted by the receiving node that will deliver @@ -468,7 +468,7 @@ handle_call({ rvi, handle_local_message, %% If the timeout is more than 24 hrs old when parsed as unix time, %% then we are looking at a relative msec timeout. Convert accordingly %% - { Mega, Sec, _Micro } = now(), + { Mega, Sec, _Micro } = os:timestamp(), Now = Mega * 1000000 + Sec, Timeout = @@ -746,11 +746,34 @@ announce_service_availability(Available, SvcName) -> end end, BlockURLs, ?SERVICE_TABLE). +start_log(Info, Fmt, Args, CS) -> + ?debug("start_log(~p,~p,~p,~p)", [Info,Fmt,Args,CS]), + case rvi_common:get_json_element([<<"rvi_log_id">>], Info) of + {ok, ID} -> + start_log_(ID, Fmt, Args, CS); + {error, _} -> + start_log(Fmt, Args, CS) + end. + +start_log(Fmt, Args, CS) -> + start_log_(rvi_log:new_id(pfx()), Fmt, Args, CS). + +start_log_(ID, Fmt, Args, CS) -> + CS1 = rvi_common:set_value(rvi_log_id, ID, CS), + log(Fmt, Args, CS1), + CS1. -start_log(Pfx, Fmt, Args, CS) -> - LogTID = rvi_log:new_id(Pfx), - rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)), - rvi_common:set_value(rvi_log_id, LogTID, CS). +pfx() -> + <<"svc_edge">>. log(Fmt, Args, CS) -> - rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS). + rvi_log:flog(Fmt, Args, pfx(), CS). + +log_id_json_tail(Args) -> + ?debug("Args = ~p", [Args]), + case rvi_common:get_json_element([<<"rvi_log_id">>], Args) of + {ok, ID} -> + [{<<"rvi_log_id">>, ID}]; + {error, _} -> + [] + end. -- cgit v1.2.1