summaryrefslogtreecommitdiff
path: root/components
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
committerUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
commit6cfeffca9f8e93e45dd885702a77896e2a1d0951 (patch)
tree620e2dd9006b52df7129d135fa7256d793571df1 /components
parent7d098a34b25704dbaa8bea0217ca6b7be37a0e48 (diff)
downloadrvi_core-6cfeffca9f8e93e45dd885702a77896e2a1d0951.tar.gz
new protocol & setup scripts
Diffstat (limited to 'components')
-rw-r--r--components/authorize/src/authorize_keys.erl343
-rw-r--r--components/authorize/src/authorize_rpc.erl146
-rw-r--r--components/dlink/src/dlink.app.src6
-rw-r--r--components/dlink/src/dlink_data_json.erl60
-rw-r--r--components/dlink/src/dlink_data_msgpack.erl15
-rw-r--r--components/dlink/src/dlink_data_rvi.erl123
-rw-r--r--components/dlink_bt/src/bt_connection.erl144
-rw-r--r--components/dlink_bt/src/bt_connection_manager.erl1
-rw-r--r--components/dlink_bt/src/bt_listener.erl64
-rw-r--r--components/dlink_bt/src/dlink_bt.app.src2
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl296
-rw-r--r--components/dlink_sms/src/dlink_sms.app.src6
-rw-r--r--components/dlink_tcp/src/connection.erl110
-rw-r--r--components/dlink_tcp/src/dlink_tcp.app.src5
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl207
-rw-r--r--components/dlink_tcp/src/listener.erl4
-rw-r--r--components/dlink_tls/src/dlink_tls.app.src3
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl159
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl167
-rw-r--r--components/rvi_common/include/rvi_dlink_bin.hrl40
-rw-r--r--components/rvi_common/src/exoport_exo_http.erl25
-rw-r--r--components/rvi_common/src/rvi_common.erl21
-rw-r--r--components/rvi_common/src/rvi_log.erl201
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl2
-rw-r--r--components/service_edge/src/service_edge_rpc.erl55
25 files changed, 1198 insertions, 1007 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index 70bb021..a362c7b 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -2,24 +2,25 @@
-behaviour(gen_server).
-export([get_key_pair/0,
+ get_device_key/0,
get_key_pair_from_pem/2,
get_pub_key/1,
- authorize_jwt/0,
provisioning_key/0,
signed_public_key/2,
save_keys/2,
- save_cert/4]).
--export([get_certificates/0,
- get_certificates/1]).
+ save_cred/5]).
+-export([get_credentials/0,
+ get_credentials/1]).
-export([validate_message/2,
validate_service_call/2]).
-export([filter_by_service/2,
- find_cert_by_service/1]).
+ find_cred_by_service/1]).
-export([public_key_to_json/1,
json_to_public_key/1]).
-export([self_signed_public_key/0]). % just temporary
-export([pp_key/1,
+ abbrev/1,
abbrev_bin/1,
abbrev_payload/1,
abbrev_jwt/1]).
@@ -36,20 +37,28 @@
-include_lib("public_key/include/public_key.hrl").
-record(st, {provisioning_key,
- cert_dir,
- authorize_jwt}).
-
--record(cert, {id,
- register = [],
- invoke = [],
+ dev_cert,
+ cred_dir}).
+
+%% -record(cert, {id,
+%% register = [],
+%% invoke = [],
+%% validity = [],
+%% jwt,
+%% cert}).
+
+-record(cred, {id,
+ right_to_register = [],
+ right_to_invoke = [],
validity = [],
+ device_cert,
jwt,
- cert}).
+ cred}).
-record(key, {id,
key}).
--define(CERTS, authorize_certs).
+-define(CREDS, authorize_creds).
-define(KEYS, authorize_keys).
public_key_to_json(#'RSAPublicKey'{modulus = N, publicExponent = E}) ->
@@ -101,8 +110,15 @@ get_key_pair() ->
get_key_pair_from_pem(openssl, Pem)
end.
-authorize_jwt() ->
- gen_server:call(?MODULE, authorize_jwt).
+get_device_key() ->
+ case get_env(device_key) of
+ undefined ->
+ {undefined, undefined};
+ [_|_] = File ->
+ get_key_pair_from_pem(openssl, File);
+ {openssl_pem, Pem} ->
+ get_key_pair_from_pem(openssl, Pem)
+ end.
validate_message(JWT, Conn) ->
gen_server:call(?MODULE, {validate_message, JWT, Conn}).
@@ -110,17 +126,17 @@ validate_message(JWT, Conn) ->
validate_service_call(Service, Conn) ->
gen_server:call(?MODULE, {validate_service_call, Service, Conn}).
-get_certificates() ->
- get_certificates(local).
+get_credentials() ->
+ get_credentials(local).
-get_certificates(Conn) ->
- gen_server:call(?MODULE, {get_certificates, Conn}).
+get_credentials(Conn) ->
+ gen_server:call(?MODULE, {get_credentials, Conn}).
filter_by_service(Services, Conn) ->
gen_server:call(?MODULE, {filter_by_service, Services, Conn}).
-find_cert_by_service(Service) ->
- gen_server:call(?MODULE, {find_cert_by_service, Service}).
+find_cred_by_service(Service) ->
+ gen_server:call(?MODULE, {find_cred_by_service, Service}).
provisioning_key() ->
gen_server:call(?MODULE, provisioning_key).
@@ -128,8 +144,8 @@ provisioning_key() ->
save_keys(Keys, Conn) ->
gen_server:call(?MODULE, {save_keys, Keys, Conn}).
-save_cert(Cert, JWT, Conn, LogId) ->
- gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn, LogId}).
+save_cred(Cred, JWT, Conn, PeerCert, LogId) ->
+ gen_server:call(?MODULE, {save_cred, Cred, JWT, Conn, PeerCert, LogId}).
%% Gen_server functions
@@ -137,7 +153,7 @@ start_link() ->
create_ets(),
case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of
{ok, Pid} = Ok ->
- ets:give_away(?CERTS, Pid, undefined),
+ ets:give_away(?CREDS, Pid, undefined),
ets:give_away(?KEYS, Pid, undefined),
Ok;
Other ->
@@ -145,18 +161,19 @@ start_link() ->
end.
init([]) ->
- ProvisioningKey = get_pub_key(get_env(provisioning_key)),
+ ProvisioningKey = get_pub_key_from_cert(get_env(root_cert)),
?debug("ProvisioningKey = ~s~n", [pp_key(ProvisioningKey)]),
- CertDir = setup:verify_dir(get_env(cert_dir)),
- {ok, AuthJwt0} = file:read_file(get_env(authorize_jwt)),
- AuthJwt = strip_nl(AuthJwt0),
- ?debug("CertDir = ~p~n", [CertDir]),
- Certs = scan_certs(CertDir, ProvisioningKey),
- ?debug("scan_certs found ~p certificates~n", [length(Certs)]),
- [ets:insert(?CERTS, {{local, C#cert.id}, C}) || C <- Certs],
+ {ok, DevCertBin} = file:read_file(get_env(device_cert)),
+ DevCert = strip_pem_begin_end(DevCertBin),
+ ?debug("Own DevCert = ~w", [abbrev_bin(DevCert)]),
+ CredDir = setup:verify_dir(get_env(cred_dir)),
+ ?debug("CredDir = ~p~n", [CredDir]),
+ Creds = scan_creds(CredDir, ProvisioningKey, DevCert),
+ ?debug("scan_creds found ~p credentials~n", [length(Creds)]),
+ [ets:insert(?CREDS, {{local, C#cred.id}, C}) || C <- Creds],
{ok, #st{provisioning_key = ProvisioningKey,
- cert_dir = CertDir,
- authorize_jwt = AuthJwt}}.
+ dev_cert = DevCert,
+ cred_dir = CredDir}}.
handle_call(Req, From, S) ->
try handle_call_(Req, From, S)
@@ -167,13 +184,11 @@ handle_call(Req, From, S) ->
{reply, error, S}
end.
-handle_call_(authorize_jwt, _, S) ->
- {reply, S#st.authorize_jwt, S};
handle_call_(provisioning_key, _, S) ->
{reply, S#st.provisioning_key, S};
-handle_call_({get_certificates, Conn}, _, S) ->
- Certs = certs_by_conn(Conn),
- {reply, Certs, S};
+handle_call_({get_credentials, Conn}, _, S) ->
+ Creds = creds_by_conn(Conn),
+ {reply, Creds, S};
handle_call_({save_keys, Keys, Conn}, _, S) ->
?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]),
save_keys_(Keys, Conn),
@@ -182,14 +197,14 @@ handle_call_({validate_message, JWT, Conn}, _, S) ->
{reply, validate_message_(JWT, Conn), S};
handle_call_({validate_service_call, Svc, Conn}, _, S) ->
{reply, validate_service_call_(Svc, Conn), S};
-handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) ->
- case process_cert_struct(Cert, JWT) of
+handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) ->
+ case process_cred_struct(Cred, JWT, PeerCert) of
invalid ->
- log(LogId, "cert INVALID Conn=~s:~w", [IP, Port]),
+ log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]),
{reply, {error, invalid}, S};
- #cert{} = C ->
- ets:insert(?CERTS, {{Conn, C#cert.id}, C}),
- log(LogId, "cert stored ~s Conn=~s:~w", [abbrev_bin(C#cert.id), IP, Port]),
+ #cred{} = C ->
+ ets:insert(?CREDS, {{Conn, C#cred.id}, C}),
+ log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]),
{reply, ok, S}
end;
handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
@@ -197,9 +212,9 @@ handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
Filtered = filter_by_service_(Services, Conn),
?debug("Filtered = ~p~n", [Filtered]),
{reply, Filtered, State};
-handle_call_({find_cert_by_service, Service} = R, _From, State) ->
+handle_call_({find_cred_by_service, Service} = R, _From, State) ->
?debug("authorize_keys:handle_call(~p,...)~n", [R]),
- Res = find_cert_by_service_(Service),
+ Res = find_cred_by_service_(Service),
?debug("Res = ~p~n", [case Res of {ok,{A,B}} -> {ok,{A,abbrev_bin(B)}}; _ -> Res end]),
{reply, Res, State};
handle_call_(_, _, S) ->
@@ -219,28 +234,28 @@ code_change(_FromVsn, S, _Extra) ->
%% Local functions
-certs_by_conn(Conn) ->
- ?debug("certs_by_conn(~p)~n", [Conn]),
+creds_by_conn(Conn) ->
+ ?debug("creds_by_conn(~p)~n", [Conn]),
UTC = rvi_common:utc_timestamp(),
- Certs = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{jwt = '$1',
+ Creds = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{jwt = '$1',
validity = '$2',
_='_'}},
[], [{{'$1', '$2'}}] }]),
- ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]),
- [C || {C,V} <- Certs, check_validity(V, UTC)].
+ ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]),
+ [C || {C,V} <- Creds, check_validity(V, UTC)].
-cert_recs_by_conn(Conn) ->
- ?debug("cert_recs_by_conn(~p)~n", [Conn]),
+cred_recs_by_conn(Conn) ->
+ ?debug("cred_recs_by_conn(~p)~n", [Conn]),
UTC = rvi_common:utc_timestamp(),
- Certs = ets:select(?CERTS, [{ {{Conn,'_'}, '$1'},
+ Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'},
[], ['$1'] }]),
- ?debug("rough selection: ~p~n", [[abbrev_bin(C#cert.id) || C <- Certs]]),
- [C || C <- Certs, check_validity(C#cert.validity, UTC)].
+ ?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]),
+ [C || C <- Creds, check_validity(C#cred.validity, UTC)].
filter_by_service_(Services, Conn) ->
- ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]),
- Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1',
- _ = '_'}},
+ ?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]),
+ Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1',
+ _ = '_'}},
[], ['$1'] }]),
?debug("Services by conn (~p) -> ~p~n", [Conn, Invoke]),
filter_svcs_(Services, Invoke).
@@ -260,25 +275,25 @@ filter_svcs_([S|Svcs], Invoke) ->
filter_svcs_([], _) ->
[].
-find_cert_by_service_(Service) ->
+find_cred_by_service_(Service) ->
SvcParts = split_path(strip_prot(Service)),
- LocalCerts = ets:select(?CERTS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]),
- ?debug("find_cert_by_service(~p~nLocalCerts = ~p~n",
- [Service, [{Id,Reg,Inv} || #cert{id = Id,
- invoke = Inv,
- register = Reg} <- LocalCerts]]),
+ LocalCreds = ets:select(?CREDS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]),
+ ?debug("find_creds_by_service(~p~nLocalCreds = ~p~n",
+ [Service, [{Id,Reg,Inv} || #cred{id = Id,
+ right_to_invoke = Inv,
+ right_to_register = Reg} <- LocalCreds]]),
case lists:foldl(
- fun(#cert{register = Register} = C, {Max, _} = Acc) ->
+ fun(#cred{right_to_register = Register} = C, {Max, _} = Acc) ->
case match_length(Register, SvcParts) of
L when L > Max ->
{L, C};
_ ->
Acc
end
- end, {0, none}, LocalCerts) of
+ end, {0, none}, LocalCreds) of
{0, none} ->
{error, not_found};
- {_, #cert{id = Id, jwt = JWT}} ->
+ {_, #cred{id = Id, jwt = JWT}} ->
{ok, {Id, JWT}}
end.
@@ -329,7 +344,7 @@ match_svc_(_, _) ->
false.
get_env(K) ->
- Res = case application:get_env(rvi_core, K) of
+ Res = case setup:get_env(rvi_core, K) of
{ok, V} -> V;
_ -> undefined
end,
@@ -382,8 +397,32 @@ get_openssl_pub_key(File) ->
undefined
end.
+get_pub_key_from_cert(File) ->
+ case file:read_file(File) of
+ {ok, Bin} ->
+ get_pub_key_from_cert_rec(
+ public_key:pem_entry_decode(hd(public_key:pem_decode(Bin))));
+ Error ->
+ error({cannot_read_cert, [Error, {file, File}]})
+ end.
+
+get_pub_key_from_cert_rec(#'Certificate'{
+ tbsCertificate =
+ #'TBSCertificate'{
+ subjectPublicKeyInfo =
+ #'SubjectPublicKeyInfo'{
+ algorithm =
+ #'AlgorithmIdentifier'{
+ algorithm = Algo},
+ subjectPublicKey = Key}}}) ->
+ 'RSAPublicKey' = KeyAlg = pubkey_cert_records:supportedPublicKeyAlgorithms(Algo),
+ public_key:der_decode(KeyAlg, Key).
+
+
+
+
create_ets() ->
- create_ets(?CERTS, #cert.id),
+ create_ets(?CREDS, #cred.id),
create_ets(?KEYS, #key.id).
create_ets(Tab, KeyPos) ->
@@ -396,42 +435,46 @@ create_ets(Tab, KeyPos) ->
true
end.
-scan_certs(Dir, Key) ->
- UTC = rvi_common:utc_timestamp(),
- case file:list_dir(Dir) of
- {ok, Fs} ->
- lists:foldl(
+scan_creds(Dir, Key, Cert) ->
+ case filelib:is_dir(Dir) of
+ true ->
+ UTC = rvi_common:utc_timestamp(),
+ filelib:fold_files(
+ Dir,
+ _FileRegexp = "\\.jwt$",
+ _Recursive = false,
fun(F, Acc) ->
- process_cert(filename:join(Dir, F), Key, UTC, Acc)
- end, [], Fs);
- Error ->
- ?warning("Cannot read certs (~p): ~p~n", [Dir, Error]),
- ok
+ process_cred(F, Key, Cert, UTC, Acc)
+ end,
+ []);
+ false ->
+ ?warning("Cannot read creads: ~p not a directory", [Dir]),
+ []
end.
-process_cert(F, Key, UTC, Acc) ->
+process_cred(F, Key, Cert, UTC, Acc) ->
case file:read_file(F) of
{ok, Bin} ->
try authorize_sig:decode_jwt(strip_nl(Bin), Key) of
- {_, Cert} ->
- ?info("Unpacked Cert ~p:~n~p~n", [F, abbrev_payload(Cert)]),
- case process_cert_struct(Cert, Bin, UTC) of
+ {_, Cred} ->
+ ?info("Unpacked Cred ~p:~n~p~n", [F, abbrev_payload(Cred)]),
+ case process_cred_struct(Cred, Bin, UTC, Cert) of
invalid ->
Acc;
- #cert{} = C ->
+ #cred{} = C ->
[C|Acc]
end;
invalid ->
- ?warning("Invalid cert: ~p~n", [F]),
+ ?warning("Invalid cred: ~p~n", [F]),
Acc
catch
error:Exception ->
- ?warning("Cert validation failure (~p): ~p~n",
+ ?warning("Cred validation failure (~p): ~p~n",
[F, Exception]),
Acc
end;
Error ->
- ?warning("Cannot read cert ~p: ~p~n", [F, Error]),
+ ?warning("Cannot read cred ~p: ~p~n", [F, Error]),
Acc
end.
@@ -441,52 +484,76 @@ strip_nl(Bin) ->
_ -> Bin
end.
-process_cert_struct(Cert, Bin) ->
- process_cert_struct(Cert, Bin, rvi_common:utc_timestamp()).
+strip_pem_begin_end(<<"-----BEGIN CERTIFICATE-----\n", Bin/binary>>) ->
+ re:replace(hd(re:split(Bin, <<"-----END CERTIFICATE-----">>, [])),
+ <<"\\v">>, <<>>, [global,{return,binary}]);
+strip_pem_begin_end(Bin) ->
+ Bin.
+
+
+process_cred_struct(Cred, Bin, Cert) ->
+ process_cred_struct(Cred, Bin, rvi_common:utc_timestamp(), to_pem(Cert)).
+
+to_pem(Cert) when is_binary(Cert) ->
+ %% The Peer cert is assumed to be DER encoded.
+ %% The cert in the cred is PEM-encoded
+ PEMCert = public_key:pem_encode([{'Certificate', Cert, not_encrypted}]),
+ strip_pem_begin_end(PEMCert);
+to_pem(Other) ->
+ Other.
+
-process_cert_struct(Cert, Bin, UTC) ->
- try process_cert_struct_(Cert, Bin, UTC)
+process_cred_struct(Cred, Bin, UTC, Cert) ->
+ try process_cred_struct_(Cred, Bin, UTC, Cert)
catch
error:Err ->
- ?warning("Failure processing Cert ~p~n~p",
- [Cert, {Err, erlang:get_stacktrace()}]),
+ ?warning("Failure processing Cred ~p~n~p",
+ [Cred, {Err, erlang:get_stacktrace()}]),
invalid
end.
-process_cert_struct_(Cert, Bin, UTC) ->
- ID = cert_id(Cert),
+process_cred_struct_(Cred, Bin, UTC, DevCert) ->
+ ID = cred_id(Cred),
{ok, Register} = rvi_common:get_json_element(
- [{'OR', ["sources", "register"]}], Cert),
+ [{'OR', ["right_to_register", "sources", "register"]}], Cred),
{ok, Invoke} = rvi_common:get_json_element(
- [{'OR', ["destinations", "invoke"]}], Cert),
+ [{'OR', ["right_to_invoke", "destinations", "invoke"]}], Cred),
{ok, Start} = rvi_common:get_json_element(
- ["validity", "start"], Cert),
+ ["validity", "start"], Cred),
{ok, Stop} = rvi_common:get_json_element(
- ["validity", "stop"], Cert),
+ ["validity", "stop"], Cred),
+ {ok, Cert} = rvi_common:get_json_element(["device_cert"], Cred),
+ case DevCert == undefined orelse Cert == DevCert of
+ false ->
+ ?warning("Wrong device_cert in cred~n~p~n~p", [Cert, DevCert]),
+ invalid;
+ true ->
+ ok
+ end,
?debug("Start = ~p; Stop = ~p~n", [Start, Stop]),
Validity = {Start, Stop},
case check_validity(Start, Stop, UTC) of
true ->
- #cert{id = ID,
- register = Register,
- invoke = Invoke,
+ #cred{id = ID,
+ right_to_register = Register,
+ right_to_invoke = Invoke,
validity = Validity,
jwt = Bin,
- cert = Cert};
+ cred = Cred};
false ->
- %% Cert outdated
- ?warning("Outdated cert: Validity = ~p; UTC = ~p~n",
+ %% Cred outdated
+ ?warning("Outdated cred: Validity = ~p; UTC = ~p~n",
[Validity, UTC]),
invalid
end.
-cert_id(Cert) ->
- case rvi_common:get_json_element(["id"], Cert) of
+cred_id(Cred) ->
+ case rvi_common:get_json_element(["id"], Cred) of
{ok, Id} ->
Id;
{error, undefined} ->
- ?warning("Cert has no ID: ~p~n", [Cert]),
- erlang:now()
+ ?warning("Cred has no ID: ~p~n", [Cred]),
+ erlang:unique_integer([positive,monotonic])
end.
check_validity({Start, Stop}, UTC) ->
@@ -539,14 +606,14 @@ validate_message_1([], _) ->
error(invalid).
validate_service_call_(Svc, Conn) ->
- case lists:filter(fun(C) -> can_invoke(Svc, C) end, cert_recs_by_conn(Conn)) of
+ case lists:filter(fun(C) -> can_invoke(Svc, C) end, cred_recs_by_conn(Conn)) of
[] ->
invalid;
- [#cert{id = ID}|_] ->
+ [#cred{id = ID}|_] ->
{ok, ID}
end.
-can_invoke(Svc, #cert{invoke = In}) ->
+can_invoke(Svc, #cred{right_to_invoke = In}) ->
lists:any(fun(I) -> match_svc(I, Svc) end, In).
pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) ->
@@ -560,6 +627,11 @@ pp_key(#'RSAPublicKey'{modulus = Mod, publicExponent = Pub}) ->
<<"#{'RSAPublicKey'{modulus = ", (abbrev_bin(M))/binary,
", publicExponent = ", P/binary, ", _ = ...}">>.
+abbrev(B) when is_binary(B) ->
+ abbrev_bin(B);
+abbrev(X) ->
+ abbrev_payload(X).
+
abbrev_bin(B) ->
abbrev_bin(B, 20, 6).
@@ -574,29 +646,58 @@ abbrev_bin(B, Len, Part) ->
end
catch error:_ -> B end.
-abbrev_payload(Payload) ->
+abbrev_payload(Payload) when is_list(Payload) ->
try lists:map(fun abbrev_pl/1, Payload)
- catch error:_ -> Payload end.
+ catch error:_ -> Payload end;
+abbrev_payload(PL) ->
+ try abbrev_pl(PL)
+ catch error:_ -> PL end.
abbrev_jwt({Hdr, Body} = X) ->
try {Hdr, abbrev_payload(Body)}
catch error:_ -> X end.
+abbrev_pl(#cred{} = Payload) ->
+ list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B);
+ ([{_,_}|_]=L) -> abbrev_payload(L);
+ (A) -> A
+ end, tuple_to_list(Payload)));
+abbrev_pl(#'RSAPublicKey'{} = Key) ->
+ pp_key(Key);
+abbrev_pl(#'RSAPrivateKey'{} = Key) ->
+ pp_key(Key);
+abbrev_pl(#'OTPCertificate'{} = Cert) ->
+ abbrev_deep_tuple(Cert);
+abbrev_pl(#'OTPTBSCertificate'{} = Cert) ->
+ abbrev_deep_tuple(Cert);
abbrev_pl({<<"keys">> = K, Ks}) ->
{K, [abbrev_k(Ky) || Ky <- Ks]};
-abbrev_pl({<<"certs">> = C, Cs}) ->
- {C, [abbrev_bin(Cert) || Cert <- Cs]};
+abbrev_pl({<<"creds">> = C, Cs}) ->
+ {C, [abbrev_bin(Cred) || Cred <- Cs]};
abbrev_pl({<<"certificate">> = K, C}) ->
{K, abbrev_bin(C)};
+abbrev_pl({<<"device_cert">> = K, C}) ->
+ {K, abbrev_bin(C)};
+abbrev_pl({<<"credential">> = K, C}) ->
+ {K, abbrev_bin(C)};
abbrev_pl({<<"sign">> = K, S}) when is_binary(S) ->
{K, abbrev_bin(S)};
+abbrev_pl({K, B}) when is_atom(K), is_binary(B), byte_size(B) > 50 ->
+ {K, abbrev_bin(B)};
+abbrev_pl({K,[{_,_}|_]=L}) ->
+ {K, abbrev_pl(L)};
abbrev_pl(B) when is_binary(B) ->
abbrev_bin(B, 40, 10);
abbrev_pl(L) when is_list(L) ->
abbrev_payload(L);
+abbrev_pl(T) when is_tuple(T), tuple_size(T) > 2 ->
+ abbrev_deep_tuple(T);
abbrev_pl(X) ->
X.
+abbrev_deep_tuple(T) when is_tuple(T) ->
+ list_to_tuple([abbrev_pl(E) || E <- tuple_to_list(T)]).
+
abbrev_k(K) ->
try lists:map(fun abbrev_elem/1, K)
catch error:_ -> K end.
@@ -606,7 +707,7 @@ abbrev_elem({<<"n">>, Bin}) ->
abbrev_elem(X) ->
X.
-log([ID], Fmt, Args) ->
- rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
-log(_, _, _) ->
+log([ID], L, Fmt, Args) ->
+ rvi_log:log(ID, L, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _, _) ->
ok.
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index c91b216..2291d21 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -18,19 +18,18 @@
terminate/2, code_change/3]).
-export([start_json_server/0]).
--export([get_authorize_jwt/1,
- get_certificates/1,
+-export([get_credentials/1,
sign_message/2,
validate_message/3,
validate_authorization/3,
validate_authorization/4,
- store_certs/3,
+ store_creds/3,
+ store_creds/4,
authorize_local_message/3,
authorize_remote_message/3]).
-export([filter_by_service/3]).
%% for testing & development
--export([sign/1]).
-export([public_key/0, public_key_json/0,
private_key/0]).
@@ -51,7 +50,7 @@ start_link() ->
init([]) ->
?debug("authorize_rpc:init(): called."),
- {Priv, Pub} = authorize_keys:get_key_pair(),
+ {Priv, Pub} = authorize_keys:get_device_key(),
?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv),
authorize_keys:pp_key(Pub)]),
{ok, #st { cs = rvi_common:get_component_specification(),
@@ -74,15 +73,10 @@ validate_message(CompSpec, JWT, Conn) ->
[{jwt, JWT},
{conn, Conn}], [status, message], CompSpec).
-get_authorize_jwt(CompSpec) ->
- ?debug("authorize_rpc:get_authorize_jwt()~n", []),
- rvi_common:request(authorize, ?MODULE, get_authorize_jwt,
- [], [status, jwt], CompSpec).
-
-get_certificates(CompSpec) ->
- ?debug("authorize_rpc:get_certificates()~n", []),
- rvi_common:request(authorize, ?MODULE, get_certificates,
- [], [status, certs], CompSpec).
+get_credentials(CompSpec) ->
+ ?debug("authorize_rpc:get_credentials()~n", []),
+ rvi_common:request(authorize, ?MODULE, get_credentials,
+ [], [status, creds], CompSpec).
validate_authorization(CompSpec, JWT, Conn) ->
?debug("authorize_rpc:validate_authorization():"
@@ -92,19 +86,24 @@ validate_authorization(CompSpec, JWT, Conn) ->
{conn, Conn}],
[status], CompSpec).
-validate_authorization(CompSpec, JWT, Certs, Conn) ->
+validate_authorization(CompSpec, JWT, Creds, Conn) ->
?debug("authorize_rpc:validate_authorization():"
" Conn = ~p~n", [Conn]),
rvi_common:request(authorize, ?MODULE, validate_authorization,
[{jwt, JWT},
- {certs, Certs},
+ {creds, Creds},
{conn, Conn}],
[status], CompSpec).
-store_certs(CompSpec, Certs, Conn) ->
- rvi_common:request(authorize, ?MODULE, store_certs,
- [{certs, Certs},
- {conn, Conn}],
+store_creds(CompSpec, Creds, Conn) ->
+ store_creds(CompSpec, Creds, Conn, undefined).
+
+store_creds(CompSpec, Creds, Conn, PeerCert) ->
+ ?debug("API: store_creds(), PeerCert = ~p", [authorize_keys:abbrev(PeerCert)]),
+ rvi_common:request(authorize, ?MODULE, store_creds,
+ [{creds, Creds},
+ {conn, Conn},
+ {peer_cert, PeerCert}],
[status], CompSpec).
authorize_local_message(CompSpec, Service, Params) ->
@@ -130,11 +129,6 @@ filter_by_service(CompSpec, Services, Conn) ->
{ conn, Conn }],
[status, services], CompSpec).
-%% For testing while developing cert functionality
-sign(Term) ->
- %% Use private key of authorize_rpc to make a JWT token
- gen_server:call(?SERVER, {sign, Term}).
-
public_key() ->
gen_server:call(?SERVER, public_key).
@@ -163,34 +157,30 @@ handle_rpc("validate_message", Args) ->
gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }),
{ok, [ {status, rvi_common:json_rpc_status(Status)},
{message, Msg} ]};
-handle_rpc("get_authorize_jwt", Args) ->
+handle_rpc("get_credentials", Args) ->
LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_authorize_jwt, [LogId] }),
- {ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
-handle_rpc("get_certificates", Args) ->
- LogId = rvi_common:get_json_log_id(Args),
- [ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_certificates, [LogId] }),
+ gen_server:call(?SERVER, { rvi, get_credentials, [LogId] }),
{ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
handle_rpc("validate_authorization", Args) ->
{ok, JWT} = rvi_common:get_json_element(["jwt"], Args),
{ok, Conn} = rvi_common:get_json_element(["connection"], Args),
LogId = rvi_common:get_json_log_id(Args),
CmdArgs =
- case rvi_common:get_json_element(["certs"], Args) of
- {ok, Certs} -> [JWT, Certs, Conn, LogId];
+ case rvi_common:get_json_element(["creds"], Args) of
+ {ok, Creds} -> [JWT, Creds, Conn, LogId];
{error, _} -> [JWT, Conn, LogId]
end,
[ Status | Rem ] =
gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}),
{ok, [ rvi_common:json_rpc_status(Status) | Rem] };
-handle_rpc("store_certs", Args) ->
- {ok, Certs} = rvi_common:get_json_element(["certs"], Args),
+handle_rpc("store_creds", Args) ->
+ {ok, Creds} = rvi_common:get_json_element(["creds"], Args),
{ok, Conn} = rvi_common:get_json_element(["conn"], Args),
+ {ok, PeerCert} = rvi_common:get_json_element(["peer_cert"], Args),
LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, {rvi, store_certs, [Certs, Conn, LogId]}),
+ gen_server:call(?SERVER, {rvi, store_creds, [Creds, Conn, PeerCert, LogId]}),
{ok, [ rvi_common:json_rpc_status(Status) | Rem]};
handle_rpc("authorize_local_message", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
@@ -236,38 +226,35 @@ handle_notification(Other, _Args) ->
%%
handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) ->
Sign = authorize_sig:encode_jwt(Msg, Key),
- log(LogId, "signed", []),
+ log(LogId, result, "signed", []),
{reply, [ ok, Sign ], State};
handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) ->
try begin Res = authorize_keys:validate_message(JWT, Conn),
- log(LogId, "validated", []),
+ log(LogId, result, "validated", []),
{reply, [ok, Res], State}
end
catch
error:_Err ->
- log(LogId, "validation FAILED", []),
+ log(LogId, error, "validation FAILED", []),
{reply, [not_found], State}
end;
-handle_call({rvi, get_authorize_jwt, [_LogId]}, _From, State) ->
- {reply, [ ok, authorize_keys:authorize_jwt() ], State};
-
-handle_call({rvi, get_certificates, [_LogId]}, _From, State) ->
- {reply, [ ok, authorize_keys:get_certificates() ], State};
+handle_call({rvi, get_credentials, [_LogId]}, _From, State) ->
+ {reply, [ ok, authorize_keys:get_credentials() ], State};
handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) ->
- %% The authorize JWT contains the public key used to sign the cert
+ %% The authorize JWT contains the public key used to sign the cred
?debug(
"authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
[]),
try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
{_Header, Keys} ->
- log(LogId, "auth jwt validated", []),
+ log(LogId, result, "auth jwt validated", []),
KeyStructs = get_json_element(["keys"], Keys, []),
authorize_keys:save_keys(KeyStructs, Conn),
{reply, [ok], State};
invalid ->
?warning("Invalid auth JWT from ~p~n", [Conn]),
- log(LogId, "auth jwt INVALID", []),
+ log(LogId, error, "auth jwt INVALID", []),
{reply, [not_found], State}
catch
error:_Err ->
@@ -275,22 +262,22 @@ handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, Sta
{reply, [not_found], State}
end;
-handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _From, State) ->
- %% The authorize JWT contains the public key used to sign the cert
+handle_call({rvi, validate_authorization, [JWT, Creds, Conn | [_] = LogId] }, _From, State) ->
+ %% The authorize JWT contains the public key used to sign the cred
?debug(
"authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
[]),
try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
{_Header, Keys} ->
- log(LogId, "auth jwt validated", []),
+ log(LogId, result, "auth jwt validated", []),
KeyStructs = get_json_element(["keys"], Keys, []),
?debug("KeyStructs = ~p~n", [KeyStructs]),
authorize_keys:save_keys(KeyStructs, Conn),
- do_store_certs(Certs, Conn, LogId),
+ do_store_creds(Creds, Conn, undefined, LogId),
{reply, [ok], State};
invalid ->
?warning("Invalid auth JWT from ~p~n", [Conn]),
- log(LogId, "auth jwt INVALID", []),
+ log(LogId, error, "auth jwt INVALID", []),
{reply, [not_found], State}
catch
error:_Err ->
@@ -298,21 +285,17 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _F
{reply, [not_found], State}
end;
-handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) ->
- do_store_certs(Certs, Conn, LogId),
+handle_call({rvi, store_creds, [Creds, Conn, PeerCert | LogId]}, _From, State) ->
+ do_store_creds(Creds, Conn, PeerCert, LogId),
{reply, [ok], State};
handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) ->
?debug("authorize_rpc:handle_call(~p)~n", [R]),
- case authorize_keys:find_cert_by_service(Service) of
- {ok, {ID, _Cert}} ->
- %% Msg = Params ++ [{<<"certificate">>, Cert}],
- %% ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n",
- %% [authorize_keys:abbrev_payload(Msg)]),
- %% Sig = authorize_sig:encode_jwt(Msg, Key),
- log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]),
+ case authorize_keys:find_cred_by_service(Service) of
+ {ok, {ID, _Cred}} ->
+ log(LogId, result, "auth msg: Cred=~s", [authorize_keys:abbrev_bin(ID)]),
{reply, [ok], State};
_ ->
- log(LogId, "NO CERTS for ~s", [Service]),
+ log(LogId, error, "NO CREDS for ~s", [Service]),
{reply, [ not_found ], State}
end;
@@ -330,11 +313,11 @@ handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]},
?debug("authorize_rpc:authorize_remote_message(): parameters: ~p~n", [Parameters]),
case authorize_keys:validate_service_call(SvcName, {IP, Port}) of
invalid ->
- log(LogId, "remote msg REJECTED", []),
+ log(LogId, error, "remote msg REJECTED", []),
{reply, [ not_found ], State};
- {ok, CertID} ->
- ?debug("validated Cert ID=~p", [CertID]),
- log(LogId, "remote msg allowed: Cert=~s", [CertID]),
+ {ok, CredID} ->
+ ?debug("validated Cred ID=~p", [CredID]),
+ log(LogId, result, "remote msg allowed: Cred=~s", [CredID]),
{reply, [ok], State}
end;
@@ -370,11 +353,12 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-do_store_certs(Certs, Conn, LogId) ->
- ?debug("Storing ~p certs for conn ~p~n", [length(Certs), Conn]),
- lists:foreach(fun(Cert) ->
- store_cert(Cert, Conn, LogId)
- end, Certs).
+do_store_creds(Creds, Conn, PeerCert, LogId) ->
+ ?debug("Storing ~p creds for conn ~p~nPeerCert = ~w",
+ [length(Creds), Conn, authorize_keys:abbrev(PeerCert)]),
+ lists:foreach(fun(Cred) ->
+ store_cred(Cred, Conn, PeerCert, LogId)
+ end, Creds).
get_json_element(Path, JSON, Default) ->
case rvi_common:get_json_element(Path, JSON) of
@@ -384,26 +368,26 @@ get_json_element(Path, JSON, Default) ->
Default
end.
-store_cert(Cert, Conn, LogId) ->
- case authorize_sig:decode_jwt(Cert, authorize_keys:provisioning_key()) of
- {_CHeader, CertStruct} ->
- case authorize_keys:save_cert(CertStruct, Cert, Conn, LogId) of
+store_cred(CredJWT, Conn, PeerCert, LogId) ->
+ case authorize_sig:decode_jwt(CredJWT, authorize_keys:provisioning_key()) of
+ {_CHeader, CredStruct} ->
+ case authorize_keys:save_cred(CredStruct, CredJWT, Conn, PeerCert, LogId) of
ok ->
ok;
{error, Reason} ->
?warning(
- "Couldn't store certificate from ~p: ~p~n",
+ "Couldn't store credential from ~p: ~p~n",
[Conn, Reason]),
ok
end;
invalid ->
- ?warning("Invalid certificate from ~p~n", [Conn]),
+ log(LogId, warning, "credential INVALID (~p)", [Conn]),
ok
end.
-log([ID], Fmt, Args) ->
- rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
-log(_, _, _) ->
+log([ID], Lvl, Fmt, Args) ->
+ rvi_log:log(ID, Lvl, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _, _) ->
ok.
%% check_msg(Checks, Params) ->
diff --git a/components/dlink/src/dlink.app.src b/components/dlink/src/dlink.app.src
index 6666333..7dca4d5 100644
--- a/components/dlink/src/dlink.app.src
+++ b/components/dlink/src/dlink.app.src
@@ -14,7 +14,11 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ authorize,
+ service_edge,
+ service_discovery,
+ schedule
]},
{mod, {dlink_app, []}},
{start_phases, [{announce, []}]},
diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl
index 1ad82b2..6a68e48 100644
--- a/components/dlink/src/dlink_data_json.erl
+++ b/components/dlink/src/dlink_data_json.erl
@@ -1,6 +1,10 @@
-module(dlink_data_json).
--compile(export_all).
+-export([encode/2,
+ decode/3]).
+-export([init/1,
+ port_options/0]).
+
init(_Opts) ->
[].
@@ -8,41 +12,25 @@ init(_Opts) ->
port_options() ->
[list, {packet, 0}].
-decode(Msg, St) ->
- {Msg1, St1} = append(St, Msg),
- try exo_json:decode(St1, Msg1) of
- {done, {ok, {struct, Elems}}, Rest} ->
- {ok, [Elems], Rest};
- {done, {ok, {array, Structs}}, Rest} ->
- {ok, [Str || {struct, Str} <- Structs], Rest};
- {done, {error, Reason}, Rest} ->
- {error, Reason, Rest};
- {more, Cont} ->
- {more, Cont}
- catch
- error:Error ->
- {error, Error, St1};
- exit:Exit ->
- {error, Exit, St1}
- end.
+decode(Msg, F, St) when is_function(F, 1) ->
+ jsx_decode_stream(Msg, F, St).
+
+encode(Msg, St) ->
+ {ok, rvi_common:term_to_json(Msg), St}.
-encode({struct, _} = JSON, St) ->
- try {ok, exo_json:encode(JSON), St}
- catch exit:Error -> erlang:error(Error)
- end;
-encode({array, Structs} = JSON, St) ->
- case lists:all(fun({struct,_}) -> true;
- (_) -> false
- end, Structs) of
- true ->
- {ok, exo_json:encode(JSON), St};
- false ->
- erlang:error(invalid_json_structure)
+jsx_decode_stream(Data, F, St) ->
+ case jsx_decode(Data, St) of
+ {incomplete, Cont} ->
+ {ok, Cont};
+ {with_tail, Elems, <<>>} ->
+ F(Elems),
+ {ok, []};
+ {with_tail, Elems, Rest} ->
+ F(Elems),
+ jsx_decode_stream(Rest, F, [])
end.
-append([], Msg) ->
- {Msg, []};
-append([_|_] = St, Msg) ->
- {St ++ Msg, []};
-append(Cont, Msg) when is_tuple(Cont) ->
- {Msg, Cont}.
+jsx_decode(Data, []) ->
+ jsx:decode(Data, [stream, return_tail]);
+jsx_decode(Data, Cont) when is_function(Cont, 1) ->
+ Cont(Data).
diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl
index 9510a53..253da55 100644
--- a/components/dlink/src/dlink_data_msgpack.erl
+++ b/components/dlink/src/dlink_data_msgpack.erl
@@ -1,6 +1,10 @@
-module(dlink_data_msgpack).
--compile(export_all).
+-export([init/1,
+ decode/3,
+ encode/2]).
+
+-export([port_options/0]).
-record(st, {opts = [{allow_atom, pack},
{enable_str, true},
@@ -13,15 +17,16 @@ port_options() ->
init(_CS) ->
#st{}.
-decode(Msg0, #st{buf = Prev, opts = Opts} = St) ->
+decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) ->
Msg = append(Prev, Msg0),
case msgpack:unpack_stream(Msg, Opts) of
{error, incomplete} ->
- {more, St#st{buf = Msg}};
+ {ok, St#st{buf = Msg}};
{error, E} ->
- {error, E, St};
+ {error, E};
{Decoded, Rest} when is_binary(Rest) ->
- {ok, Decoded, St#st{buf = Rest}}
+ F(Decoded),
+ decode(Rest, F, St#st{buf = <<>>})
end.
encode({struct, Elems}, #st{opts = Opts} = St) ->
diff --git a/components/dlink/src/dlink_data_rvi.erl b/components/dlink/src/dlink_data_rvi.erl
deleted file mode 100644
index 01131be..0000000
--- a/components/dlink/src/dlink_data_rvi.erl
+++ /dev/null
@@ -1,123 +0,0 @@
--module(dlink_data_rvi).
-
--compile(export_all).
-
--record(dlink_data_rvi, {need, buf}).
-
--define(MAX_LINE, 79).
-
-init(_Opts) ->
- undefined.
-
-port_options() ->
- [].
-
-encode(Elems, St) ->
- Bin = encode_(Elems, <<>>),
- Sz = byte_size(Bin),
- {ok, <<"&RVI|",
- (integer_to_binary(Sz, 16))/binary, "\n",
- Bin/binary>>, St}.
-
-encode_([{Key, Val}|T], Acc) ->
- {Type, ValBin} = encode_val(Val),
- Bin = encode_elem(to_bin(Key), Type, ValBin),
- encode_(T, <<Acc/binary, Bin/binary>>);
-encode_([], Acc) ->
- Acc.
-
-encode_val(V) when is_binary(V) -> {$B, V};
-encode_val(V) when is_integer(V) -> {$i, integer_to_binary(V,16)};
-encode_val(V) when is_atom(V) -> {$a, atom_to_binary(V, latin1)};
-encode_val(V) when is_float(V) ->
- Bin = <<V/float>>,
- {$f, Bin};
-encode_val({T,_} = J) when T==array; T==struct ->
- JSON = exo_json:encode(J),
- {$J, iolist_to_binary(JSON)};
-encode_val([T|_] = L) when is_tuple(T) ->
- {$L, encode_(L, <<>>)}.
-
-decode_value($B, Bin) -> Bin;
-decode_value($i, Bin) -> binary_to_integer(Bin, 16);
-decode_value($f, <<F/float>>) -> F;
-decode_value($a, Bin) -> binary_to_existing_atom(Bin, latin1);
-decode_value($J, Bin) ->
- {ok, Obj} = exo_json:decode_string(binary_to_list(Bin)),
- Obj;
-decode_value($L, Bin) ->
- decode_packet(Bin).
-
-encode_elem(Key, Type, Bin) ->
- BSz = byte_size(Bin),
- case byte_size(Key) + BSz of
- Sz when Sz =< 78, Type >= $a, Type =< $z ->
- <<Key/binary, "|", Type, ":", Bin/binary, "\n">>;
- _ ->
- <<Key/binary, "|", Type, "|",
- (integer_to_binary(BSz+1,16))/binary, "\n",
- Bin/binary, "\n">>
- end.
-
-decode(<<"&RVI|", Rest/binary>>, undefined) ->
- case erlang:decode_packet(line, Rest, [{line_length, 79}]) of
- {more, _} ->
- {more, Rest};
- {ok, Ln, Rest1} ->
- LSz = byte_size(Ln),
- LSz1 = LSz-1,
- <<Size:LSz1/binary, "\n">> = Ln,
- Bytes = binary_to_integer(Size, 16),
- case Rest1 of
- <<Pkt:Bytes/binary, Tail/binary>> ->
- {ok, decode_packet(Pkt), Tail};
- _ ->
- {more, #dlink_data_rvi{need = Bytes, buf = Rest1}}
- end
- end;
-decode(Data, #dlink_data_rvi{need = Bytes, buf = Buf} = St) ->
- case <<Buf/binary, Data/binary>> of
- <<Pkt:Bytes/binary, Tail/binary>> ->
- {ok, decode_packet(Pkt), Tail};
- Buf1 ->
- {more, St#dlink_data_rvi{buf = Buf1}}
- end;
-decode(_, _St) ->
- {error, unknown, undefined}.
-
-decode_packet(<<>>) ->
- [];
-decode_packet(P) ->
- {ok, L, Rest} = erlang:decode_packet(line, P, [{line_length, ?MAX_LINE}]),
- case split_line(L) of
- {Key, Type, simple, Data} ->
- [{Key, decode_value(Type, Data)}|decode_packet(Rest)];
- {Key, Type, Size} ->
- Size1 = Size-1,
- <<VBin:Size1/binary, "\n", Rest1/binary>> = Rest,
- [{Key, decode_value(Type, VBin)}|decode_packet(Rest1)]
- end.
-
-to_bin(V) when is_atom(V) -> atom_to_binary(V, latin1);
-to_bin(V) when is_binary(V) -> V;
-to_bin(V) when is_list(V) -> iolist_to_binary(V).
-
-split_line(L) ->
- split_line(L, <<>>).
-
-split_line(<<"\\", $|, Rest/binary>>, Acc) ->
- split_line(Rest, <<Acc/binary, $|>>);
-split_line(<<"|", T, ":", Rest/binary>>, Acc) ->
- {Acc, T, simple, remove_nl(Rest)};
-split_line(<<"|", T, "|", Rest/binary>>, Acc) ->
- SzBin = remove_nl(Rest),
- {Acc, T, binary_to_integer(SzBin, 16)};
-split_line(<<H, T/binary>>, Acc) ->
- split_line(T, <<Acc/binary, H>>).
-
-
-remove_nl(B) ->
- Sz = byte_size(B),
- Sz1 = Sz-1,
- <<V:Sz1/binary, "\n">> = B,
- V.
diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl
index 85a8663..6b3a64e 100644
--- a/components/dlink_bt/src/bt_connection.erl
+++ b/components/dlink_bt/src/bt_connection.erl
@@ -36,16 +36,20 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
remote_addr = "00:00:00:00:00:00",
channel = 0,
- rfcomm_ref = undefined,
- listen_ref = undefined,
+ rfcomm_ref,
+ listen_ref,
+ accept_ref,
mode = bt,
- mod = undefined,
- func = undefined,
- args = undefined
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ mod,
+ func,
+ args
}).
%%%===================================================================
@@ -124,11 +128,15 @@ is_connection_up(Addr, Channel) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) ->
+init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) ->
%% connect will block on rfcomm:open, so cast to self
%% in order to let init return.
+ ?debug("init({connect, ~p, ~p, ~p, ~p, ~p, CS})",
+ [BTAddr, Channel, Mode, Mod, Fun]),
gen_server:cast(self(), connect),
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
+ PktSt = PktMod:init(CS),
{ok, #st{
remote_addr = BTAddr,
channel = Channel,
@@ -136,36 +144,47 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) ->
mode = Mode,
mod = Mod,
func = Fun,
- args = Arg
+ args = CS,
+ packet_mod = PktMod,
+ packet_st = PktSt
}};
-init({accept, Channel, ListenRef, Mode, Mod, Fun, Arg}) ->
- ARef = case Mode of
- tcp ->
- {ok, R} = exo_socket:async_accept(ListenRef),
- R;
- bt ->
- {ok, R} = rfcomm:accept(ListenRef, infinity, self()),
- R
- end,
+init({accept, Channel, ListenRef, Mode, Mod, Fun, CS}) ->
+ ?debug("init({accept, ~p, ~p, ~p, ~p, ~p, CS})",
+ [Channel, ListenRef, Mode, Mod, Fun]),
?debug("bt_connection:init(accept): self(): ~p", [self()]),
?debug("bt_connection:init(accept): Channel: ~p", [Channel]),
?debug("bt_connection:init(accept): ListenRef: ~p", [ListenRef]),
- ?debug("bt_connection:init(accept): AcceptRef: ~p", [ARef]),
?debug("bt_connection:init(accept): Module: ~p", [Mod]),
?debug("bt_connection:init(accept): Function: ~p", [Fun]),
- ?debug("bt_connection:init(accept): Arg: ~p", [Arg]),
-
+ ?debug("bt_connection:init(accept): Arg: ~p", [CS]),
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
+ PktSt = PktMod:init(CS),
+ ARef = case Mode of
+ bt ->
+ %% Expected message is
+ %% {rfcomm, ARef, {accept, Addr, Chan}}
+ {ok, R} = rfcomm:accept(ListenRef, infinity, self()),
+ R;
+ tcp ->
+ %% -1 represents infinity
+ %% Expected message is
+ %% {inet_async,LSock,Ref,{ok,Socket}}
+ {ok, R} = prim_inet:async_accept(ListenRef, -1),
+ R
+ end,
{ok, #st{
channel = Channel,
- rfcomm_ref = ARef,
- listen_ref = ListenRef,
+ rfcomm_ref = undefined,
+ accept_ref = ARef,
mode = Mode,
mod = Mod,
func = Fun,
- args = Arg
+ args = CS,
+ packet_mod = PktMod,
+ packet_st = PktSt
}}.
@@ -219,7 +238,7 @@ handle_cast(connect, #st {
bt ->
rfcomm:open(BTAddr, Channel);
tcp ->
- exo_socket:connect(BTAddr, Channel)
+ gen_tcp:connect(BTAddr, Channel, bt_listener:sock_opts())
end,
case ConnRes of
{ok, ConnRef} ->
@@ -246,13 +265,18 @@ handle_cast(connect, #st {
{ stop, { connect_failed, Error}, St }
end;
-handle_cast({send, Data}, St) ->
- ?debug("~p:handle_call(send): Sending: ~p",
- [ ?MODULE, Data]),
-
- rfcomm:send(St#st.rfcomm_ref, Data),
-
- {noreply, St};
+handle_cast({send, Data}, #st{mode = Mode,
+ rfcomm_ref = Sock,
+ packet_mod = PMod, packet_st = PSt} = St) ->
+ ?debug("handle_cast(send): Sending: ~p", [Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ Res = case Mode of
+ bt -> rfcomm:send(Sock, Encoded);
+ tcp -> gen_tcp:send(Sock, Encoded)
+ end,
+ ?debug("send Res = ~p", [Res]),
+ {noreply, St#st{packet_st = PSt1}};
handle_cast(_Msg, State) ->
?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
@@ -271,35 +295,43 @@ handle_cast(_Msg, State) ->
%% An accept reference we've setup now has accetpted an
%% incoming connection.
-handle_info({rfcomm, _ARef, { accept, BTAddr, _ } },
+handle_info({rfcomm, ARef, { accept, BTAddr, _ } },
#st { mod = Mod,
func = Fun,
args = Arg,
+ listen_ref = LRef,
+ accept_ref = ARef,
channel = Channel } = St) ->
?info("~p:handle_info(): bt_connection from ~w:~w\n",
[?MODULE, BTAddr,Channel]),
-
+ bt_listener:accept_ack(ok, LRef, BTAddr, Channel),
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
{ noreply, St#st { remote_addr = BTAddr,
- channel = Channel } };
+ channel = Channel,
+ accept_ref = undefined} };
handle_info({rfcomm, _ConnRef, {data, Data}},
#st { remote_addr = BTAddr,
channel = Channel,
+ packet_mod = PMod,
+ packet_st = PSt,
mod = Mod,
- func = Fun,
- args = Arg } = State) ->
+ func = Fun } = State) ->
?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]),
?info("~p:handle_info(data): ~p:~p -> ~p:~p",
[ ?MODULE, BTAddr, Channel, Mod, Fun]),
- Self = self(),
- spawn(fun() -> Mod:Fun(Self, BTAddr, Channel,
- data, Data, Arg) end),
-
- {noreply, State};
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed: ~p", [Reason]),
+ {stop, Reason, State}
+ end;
handle_info({rfcomm, ConnRef, closed},
@@ -339,13 +371,32 @@ handle_info({rfcomm, ConnRef, error},
bt_connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
+handle_info({tcp, Sock, Data}, #st{remote_addr = IP,
+ channel = Port,
+ rfcomm_ref = Sock,
+ packet_mod = PMod,
+ packet_st = PSt} = St) ->
+ ?debug("handle_info(data): From: ~p:~p", [IP, Port]),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, St)
+ end, PSt) of
+ {ok, PSt1} ->
+ inet:setopts(Sock, [{active, once}]),
+ {noreply, St#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, St}
+ end;
handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod,
func = Fun,
args = Arg} = St) ->
?debug("~p:handle_info(~p)", [?MODULE, Msg]),
+ inet_db:register_socket(Sock, inet_tcp),
+ inet:setopts(Sock, [{active, once}]),
{ok, {BTAddr, Channel}} = inet:peername(Sock),
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
- {noreply, St};
+ {noreply, St#st{rfcomm_ref = Sock,
+ remote_addr = BTAddr}};
handle_info(_Info, State) ->
?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]),
@@ -380,3 +431,14 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
+
+handle_elements(Elements, #st{remote_addr = BTAddr,
+ channel = Channel,
+ mod = Mod,
+ func = Fun,
+ args = Arg}) ->
+ ?debug("data complete; processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg).
diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl
index 1b1e049..e86dda3 100644
--- a/components/dlink_bt/src/bt_connection_manager.erl
+++ b/components/dlink_bt/src/bt_connection_manager.erl
@@ -67,6 +67,7 @@ find_connection_by_address(BTAddr, Channel) ->
%% @end
%%--------------------------------------------------------------------
start_link() ->
+ ?debug("start_link()", []),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl
index 3711652..a1f1a49 100644
--- a/components/dlink_bt/src/bt_listener.erl
+++ b/components/dlink_bt/src/bt_listener.erl
@@ -14,13 +14,16 @@
-include_lib("rvi_common/include/rvi_common.hrl").
-export([start_link/1,
add_listener/1,
- remove_listener/1]).
+ remove_listener/1,
+ accept_ack/4]).
+-export([sock_opts/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2]).
-record(st, {listeners = [],
acceptors = [],
+ mode = bt,
cs = #component_spec{}
}).
@@ -34,41 +37,33 @@ add_listener(Channel) ->
remove_listener(Channel) ->
gen_server:call(?MODULE, {remove_listener, Channel}).
+accept_ack(Result, LRef, Addr, Chan) ->
+ ?MODULE ! {accept, self(), LRef, Addr, Chan, Result},
+ ok.
+
+sock_opts() ->
+ [binary, {active, once}, {packet, 0}].
+
init(Mode) ->
{ok, #st {
listeners = [],
acceptors = [],
+ mode = Mode,
cs = rvi_common:set_value(bt_mode, Mode, rvi_common:get_component_specification())
}
}.
-handle_call({add_listener, Channel}, _From, #st{cs = CS} = St) ->
+handle_call({add_listener, Channel}, _From, #st{mode = Mode,
+ listeners = Ls} = St) ->
?info("bt_listener:add_listener(): Setting up listener on channel ~p", [ Channel]),
- Mode = rvi_common:get_value(bt_mode, bt, CS),
case listen(Mode, Channel) of
{ok, ListenRef} ->
?info("bt_listener:add_listener(): ListenRef: ~p", [ ListenRef]),
- {ok, ConnPid} = bt_connection:accept(Channel,
- ListenRef,
- Mode,
- dlink_bt_rpc,
- handle_socket,
- CS),
-
-
-
- %%{ noreply, NSt} = handle_info({accept, ListenRef, Channel, ok}, St),
-
- { reply,
- ok,
- St#st {
- acceptors = [ { Channel, ConnPid } | St#st.acceptors ],
- listeners = [ { ListenRef, Channel } | St#st.listeners ]
- }
- };
+ St1 = St#st{listeners = [{ListenRef, Channel}|Ls]},
+ {reply, ok, start_acceptor(ListenRef, Channel, St1)};
Err ->
?info("bt_listener:add_listener(): Failed: ~p", [ Err]),
@@ -86,12 +81,19 @@ handle_call(_Msg, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) ->
+handle_info({accept, Pid, ListenRef, BTAddr, Channel, ok},
+ #st{listeners = Ls,
+ acceptors = As} = St) ->
%% Fire up a new process to handle the
%% future incoming connection.
?info("bt_listener:accept(): ListenRef: ~p", [ ListenRef]),
?info("bt_listener:accept(): Remote: ~p-~p", [BTAddr, Channel ]),
+ case lists:keyfind(ListenRef, 1, Ls) of
+ {_, Channel} ->
+ As1 = lists:keydelete(Pid, 2, As),
+ {noreply, start_acceptor(ListenRef, Channel,
+ St#st{acceptors = As1})};
%% Must fix multiple acceptors in bt_linux_drv.c
%% {ok, ConnPid} = bt_connection:accept(Channel,
%% ListenRef,
@@ -101,7 +103,9 @@ handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) ->
%% {noreply, St#st {acceptors = [ { Channel, ConnPid } | St#st.acceptors ]}};
- {noreply, St };
+ _ ->
+ {noreply, St }
+ end;
handle_info(_Msg, State) ->
?info("bt_listener:handle_info(): Unknown: ~p", [ _Msg]),
@@ -115,4 +119,16 @@ terminate(_Reason, _State) ->
listen(bt, Channel) ->
rfcomm:listen(Channel);
listen(tcp, Port) ->
- exo_socket:listen(Port).
+ gen_tcp:listen(Port, sock_opts()).
+
+start_acceptor(ListenRef, Channel, #st{mode = Mode,
+ acceptors = As,
+ cs = CS} = St) ->
+ ?debug("start acceptor(~p, ~p, St)", [ListenRef, Channel]),
+ {ok, ConnPid} = bt_connection:accept(Channel,
+ ListenRef,
+ Mode,
+ dlink_bt_rpc,
+ handle_socket,
+ CS),
+ St#st{acceptors = [{Channel, ConnPid}|As]}.
diff --git a/components/dlink_bt/src/dlink_bt.app.src b/components/dlink_bt/src/dlink_bt.app.src
index e63d1ac..ba62655 100644
--- a/components/dlink_bt/src/dlink_bt.app.src
+++ b/components/dlink_bt/src/dlink_bt.app.src
@@ -17,7 +17,7 @@
{applications, [
kernel,
stdlib,
- rvi_common,
+ dlink,
bt
]},
{mod, { dlink_bt_app, []}},
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index ab5f8ac..542fd00 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -34,7 +34,7 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
+-include_lib("rvi_common/include/rvi_dlink_bin.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BT_CHANNEL, 1).
@@ -60,6 +60,7 @@
}).
-record(st, {
+ mode = bt, %% tcp | bt
cs = #component_spec{}
}).
@@ -93,9 +94,11 @@ init([]) ->
{ keypos, #connection_entry.connection }]),
CS = rvi_common:get_component_specification(),
+ Mode = get_mode(CS),
service_discovery_rpc:subscribe(CS, ?MODULE),
{ok, #st {
+ mode = Mode,
cs = CS
}
}.
@@ -105,15 +108,19 @@ start_json_server() ->
start_connection_manager() ->
+ ?debug("start_connection_manager()", []),
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts,
- [],
- CompSpec),
+ ServerOpts = get_server_opts(CompSpec),
+ start_connection_manager(ServerOpts, CompSpec).
+
+start_connection_manager([], _) ->
+ ?debug("No BT server options set; start only the connection manager", []),
+ bt_connection_manager:start_link();
+start_connection_manager(ServerOpts, CompSpec) ->
%% Retrieve the channel we should use
- Mode = get_mode(BertOpts),
- Channel = get_channel(Mode, BertOpts),
+ ?debug("ServerOpts = ~p", [ServerOpts]),
+ Mode = get_mode(ServerOpts),
+ Channel = get_channel(Mode, ServerOpts),
?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]),
@@ -124,6 +131,7 @@ start_connection_manager() ->
bt:start(),
bt:debug(debug);
tcp ->
+ ?debug("Mode == tcp; not starting bt driver", []),
ok
end,
bt_listener:start_link(Mode),
@@ -146,11 +154,21 @@ start_connection_manager() ->
[],
CompSpec),
- setup_persistent_connections_(PersistentConnections, CompSpec),
+ setup_persistent_connections_(PersistentConnections, Mode, CompSpec),
ok.
-get_mode(Opts) ->
+get_server_opts(CS) when element(1, CS) == component_spec ->
+ {ok, ServerOpts } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ server_opts,
+ [],
+ CS),
+ ServerOpts.
+
+get_mode(CS) when element(1, CS) == component_spec ->
+ get_mode(get_server_opts(CS));
+get_mode(Opts) when is_list(Opts) ->
proplists:get_value(test_mode, Opts, bt).
get_channel(tcp, Opts) ->
@@ -159,15 +177,15 @@ get_channel(bt, Opts) ->
proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL).
-setup_persistent_connections_([ ], _CompSpec) ->
+setup_persistent_connections_([ ], _, _CompSpec) ->
ok;
-setup_persistent_connections_([ BTAddress | T], CompSpec) ->
+setup_persistent_connections_([ BTAddress | T], Mode, CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]),
- [ BTAddr, Channel] = string:tokens(BTAddress, "-"),
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
- setup_persistent_connections_(T, CompSpec),
+ [ BTAddr, Channel] = string:tokens(BTAddress, "-:"), %% Addr-Chan | IP:Port
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
+ setup_persistent_connections_(T, Mode, CompSpec),
ok.
@@ -213,7 +231,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
%%
%% Connect to a remote RVI node.
%%
-connect_remote(BTAddr, Channel, CompSpec) ->
+connect_remote(BTAddr, Channel, Mode, CompSpec) ->
case bt_connection_manager:find_connection_by_address(BTAddr, Channel) of
{ ok, _Pid } ->
already_connected;
@@ -225,7 +243,7 @@ connect_remote(BTAddr, Channel, CompSpec) ->
%%FIXME
%% Setup a genserver around the new connection.
- case bt_connection:connect(BTAddr, Channel,
+ case bt_connection:connect(BTAddr, Channel, Mode,
?MODULE, handle_socket, CompSpec ) of
{ ok, Pid } ->
?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p",
@@ -239,45 +257,33 @@ connect_remote(BTAddr, Channel, CompSpec) ->
end
end.
-
-connect_and_retry_remote( BTAddr, Channel, CompSpec) ->
+connect_and_retry_remote( BTAddr, Channel, Mode, CompSpec) ->
?info("dlink_bt:connect_and_retry_remote(): ~p:~p",
[ BTAddr, Channel]),
- case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of
- ok -> ok;
-
+ CS = start_log(<<"conn">>, "connect ~s:~s", [BTAddr, Channel], CompSpec),
+ case connect_remote(BTAddr, list_to_integer(Channel), Mode, CS) of
+ ok ->
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p",
- [BTAddr, Channel, Err]),
-
- ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
- [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec),
-
+ [BTAddr, Channel, Err]),
+ ?notice("dlink_bt:connect_and_retry_remote(~p:~p):"
+ " Will try again in ~p sec",
+ [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
+ setup_reconnect_timer(
+ ?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CS),
not_available
end.
-
-
announce_local_service_(_CompSpec, [], _Service, _Availability) ->
ok;
announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- { struct,
- [
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_TRANSACTION_ID, 3},
- { ?DLINK_ARG_SIGNATURE, JWT }
- ]
- })),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = bt_connection:send(ConnPid, Msg),
?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -299,24 +305,23 @@ announce_local_service_(CompSpec, Service, Availability) ->
process_data(_FromPid, RemoteBTAddr, RemoteChannel, ProtocolMod, Data, CompSpec) ->
?debug("dlink_bt:receive_data(): SetupAddress: {~p, ~p}", [ RemoteBTAddr, RemoteChannel ]),
?debug("dlink_bt:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
- Proto = list_to_atom(ProtocolMod),
- Proto:receive_message(CompSpec, base64:decode_to_string(Data)),
+ Proto = list_to_existing_atom(ProtocolMod),
+ Proto:receive_message(CompSpec, {RemoteBTAddr, RemoteChannel}, Data),
ok.
-availability_msg(Availability, Services) ->
- {struct, [{?DLINK_ARG_STATUS, status_string(Availability)},
- {?DLINK_ARG_SERVICES, {array, Services}}]}.
+availability_msg(Availability, Services, CompSpec) ->
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE},
+ {?DLINK_ARG_STATUS, status_string(Availability)},
+ {?DLINK_ARG_SERVICES, Services}
+ | log_id_tail(CompSpec)].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
- {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg),
- {ok, Svcs} = rvi_common:get_json_element([?DLINK_ARG_SERVICES], Msg),
+process_announce(Avail, Svcs, FromPid, Addr, Channel, CompSpec) ->
?debug("dlink_bt_rpc:service_announce(~p): Address: ~p:~p", [Avail, Addr, Channel]),
- ?debug("dlink_bt_rpc:service_announce(~p): TransactionID: ~p", [Avail, TID]),
?debug("dlink_bt_rpc:service_announce(~p): Services: ~p", [Avail, Svcs]),
case Avail of
?DLINK_ARG_AVAILABLE ->
@@ -327,23 +332,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
end.
-process_authorize(FromPid,
- PeerBTAddr,
- PeerBTChannel,
- TransactionID,
- RemoteAddress,
- RemoteChannel,
- Protocol,
- Certificates,
- Signature,
- CompSpec) ->
-
+process_authorize(FromPid, PeerBTAddr, PeerBTChannel,
+ RemoteAddress, RemoteChannel, Protocol,
+ Credentials, CompSpec) ->
?info("dlink_bt:authorize(): Peer Address: ~p:~p", [PeerBTAddr, PeerBTChannel ]),
?info("dlink_bt:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemoteChannel ]),
- ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
- ?debug("dlink_bt:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_bt:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_bt:authorize(): Signature: ~p", [ Signature ]),
+ ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
+ ?debug("dlink_bt:authorize(): Credentials: ~p", [ Credentials ]),
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the conneciton manager, this is an incoming connection
@@ -351,59 +346,42 @@ process_authorize(FromPid,
%% a service announce
Conn = {RemoteAddress, RemoteChannel},
- case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
- Payload, CompSpec) ->
+ Elems, CompSpec) ->
- {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)),
?debug("dlink_bt:data(): Got ~p", [ Elems ]),
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
+
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemoteChannel,
RVIProtocol,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
- Certificates =
- case CertificatesTmp of
- { array, C} -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerBTAddr, RemoteChannel,
- TransactionID, RemoteAddress, RemoteChannel,
- RVIProtocol, Certificates, Signature, CompSpec);
-
-
+ RemoteAddress, RemoteChannel,
+ RVIProtocol, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
- Conn = {PeerBTAddr, PeerChannel},
- [ TransactionID, Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- process_availability(
- Msg, FromPid, PeerBTAddr, PeerChannel, TransactionID, CompSpec);
- _ ->
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ log("sa from ~s:~w", [PeerBTAddr, PeerChannel], CS),
+ process_announce(Status, Services, FromPid, PeerBTAddr,
+ PeerChannel, CS);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -414,14 +392,14 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
?DLINK_ARG_DATA],
Elems, undefined),
process_data(FromPid, PeerBTAddr, PeerChannel,
- ProtocolMod, Data, CompSpec);
+ ProtocolMod, Data, CS);
?DLINK_CMD_PING ->
?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]),
ok;
undefined ->
- ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
+ ?error("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
ok
end.
@@ -567,7 +545,7 @@ handle_cast(Other, St) ->
{noreply, St}.
-handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
+handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, #st{mode = Mode} = St) ->
%% Do we already have a connection that supchannel service?
case get_connections_by_service(Service) of
[] -> %% Nope
@@ -580,7 +558,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
Addr ->
[ Address, Channel] = string:tokens(Addr, "-"),
- case connect_remote(Address, list_to_integer(Channel), St#st.cs) of
+ case connect_remote(Address, list_to_integer(Channel), Mode, St#st.cs) of
ok ->
{ reply, [ok, 2000], St }; %% 2 second timeout
@@ -613,27 +591,20 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ]})),
-
+ Res = bt_connection:send(
+ ConnPid,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
-
-
handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
- {ok, ServerOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts, [],
- St#st.cs),
+ ServerOpts = get_server_opts(St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec",
@@ -656,12 +627,7 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
case bt_connection:is_connection_up(Pid) of
true ->
?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]),
- bt_connection:send(Pid, term_to_json(
- { struct,
- [ { ?DLINK_ARG_CMD,
- ?DLINK_CMD_PING
- }]})),
-
+ bt_connection:send(Pid, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Channel, Timeout });
@@ -671,8 +637,9 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
{noreply, St};
%% Setup static nodes
-handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, St) ->
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
+handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec },
+ #st{mode = Mode} = St) ->
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
{ noreply, St };
handle_info(Info, St) ->
@@ -688,17 +655,15 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
{ok,[{address, Address }]} = bt_drv:local_info([address]),
bt_connection:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
- { ?DLINK_ARG_PORT, SetupChannel },
- { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
- { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
+ { ?DLINK_ARG_PORT, SetupChannel },
+ { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec)]).
connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ->
+ log("authorized: ~s:~p", [RemoteAddress, RemoteChannel], CompSpec),
case bt_connection_manager:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_bt:authorize(): New connection!"),
@@ -721,15 +686,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
?info("dlink_bt:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteAddress, RemoteChannel]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
- bt_connection:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ]})),
-
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
+ log("sending sa: ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ bt_connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemoteChannel, FromPid }),
ok.
@@ -828,10 +787,6 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-term_to_json(Term) ->
- binary_to_list(iolist_to_binary(exo_json:encode(Term))).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -841,28 +796,25 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
[ opt(K, Elems, Def) || K <- Keys].
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
+log(Fmt, Args, CS) ->
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).
diff --git a/components/dlink_sms/src/dlink_sms.app.src b/components/dlink_sms/src/dlink_sms.app.src
index a394888..d0f86e4 100644
--- a/components/dlink_sms/src/dlink_sms.app.src
+++ b/components/dlink_sms/src/dlink_sms.app.src
@@ -1,8 +1,9 @@
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
%%
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -16,7 +17,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_sms_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 77300d9..b24215c 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -36,6 +36,7 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
ip = {0,0,0,0},
@@ -44,7 +45,9 @@
mod = undefined,
func = undefined,
args = undefined,
- pst = undefined %% Payload state
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ cs
}).
%%%===================================================================
@@ -52,8 +55,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, Arg) ->
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of
+setup(IP, Port, Sock, Mod, Fun, CS) ->
+ ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
+ case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -120,7 +124,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, Arg}) ->
+init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> connection_manager:add_connection(IP, Port, self())
@@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- ?debug("connection:init(): Arg: ~p", [Arg]),
- %% Grab socket control
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
+ PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
port = Port,
sock = Sock,
mod = Mod,
func = Fun,
- args = Arg,
- pst = undefined
+ packet_mod = PktMod,
+ packet_st = PktSt,
+ cs = CompSpec
}}.
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
%%--------------------------------------------------------------------
%% @private
@@ -182,13 +189,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({send, Data}, St) ->
+handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ gen_tcp:send(St#st.sock, Encoded),
- gen_tcp:send(St#st.sock, Data),
-
- {noreply, St};
+ {noreply, St#st{packet_st = PSt1}};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data},
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- args = Arg,
- pst = PST} = State) ->
+ packet_mod = PMod,
+ packet_st = PSt} = State) ->
?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
-
- case jsx_decode_stream(Data, PST) of
- { [], NPST } ->
- ?debug("handle_info(data incomplete)", []),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st { pst = NPST} };
-
- { JSONElements, NPST } ->
- ?debug("data complete: Processed: ~p",
- [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]),
- FromPid = self(),
- [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg)
- || SingleElem <- JSONElements],
- inet:setopts(Sock, [ { active, once } ]),
- {noreply, State#st { pst = NPST} }
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
end;
-
-
handle_info({tcp_closed, Sock},
#st { ip = IP,
port = Port,
@@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock},
func = Fun,
args = Arg } = State) ->
?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port,closed, Arg),
+ Mod:Fun(self(), IP, Port, closed, Arg),
gen_tcp:close(Sock),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-jsx_decode_stream(Data, St) ->
- jsx_decode_stream(Data, St, []).
-
-jsx_decode_stream(Data, undefined, Acc) ->
- case jsx:decode(Data, [stream, return_tail]) of
- {incomplete, Cont} ->
- {lists:reverse(Acc), Cont};
- {with_tail, Elems, <<>>} ->
- {lists:reverse([Elems|Acc]), undefined};
- {with_tail, Elems, Rest} ->
- jsx_decode_stream(Rest, undefined, [Elems|Acc])
- end.
+%% jsx_decode_stream(Data, St) ->
+%% jsx_decode_stream(Data, St, []).
+
+%% jsx_decode_stream(Data, undefined, Acc) ->
+%% case jsx:decode(Data, [stream, return_tail]) of
+%% {incomplete, Cont} ->
+%% {lists:reverse(Acc), Cont};
+%% {with_tail, Elems, <<>>} ->
+%% {lists:reverse([Elems|Acc]), undefined};
+%% {with_tail, Elems, Rest} ->
+%% jsx_decode_stream(Rest, undefined, [Elems|Acc])
+%% end.
+
+%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) ->
+%% case PMod:decode(Data, PSt) of
+%% {ok, Elements, PSt1} ->
+%% ?debug("data complete: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% {ok, PSt1};
+%% {more, Elements, Rest, PSt1} ->
+%% ?debug("data complete with Rest: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS);
+%% {more, PSt1} ->
+%% {ok, PSt1};
+%% { ->
+
+handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("data complete: Processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS).
diff --git a/components/dlink_tcp/src/dlink_tcp.app.src b/components/dlink_tcp/src/dlink_tcp.app.src
index 53a32b8..5ba7760 100644
--- a/components/dlink_tcp/src/dlink_tcp.app.src
+++ b/components/dlink_tcp/src/dlink_tcp.app.src
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -16,7 +16,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_tcp_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index b3ec80d..83e0a24 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.0").
+-define(DLINK_TCP_VERSION, "1.1").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) ->
%% Setup a genserver around the new connection.
{ok, Pid } = connection:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
+ ?MODULE, handle_socket, CompSpec ),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- connection:send(
- Pid,
- term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
@@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
case connect_remote(IP, list_to_integer(Port), CS) of
ok ->
- log("connected", [], CS),
+ log(result, "connected", [], CS),
ok;
Err -> %% Failed to connect. Sleep and try again
@@ -266,16 +253,8 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = connection:send(
- ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = connection:send(ConnPid, Msg),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) ->
handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]),
@@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemotePort,
ProtoVersion,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- C when is_list(C) -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CS);
+ RemoteAddress, RemotePort,
+ ProtoVersion, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- ?debug("Service Announce~nMsg = ~p~n", [Msg]),
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- log("sa INVALID", [], CS),
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) ->
{noreply, St};
handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) ->
+ ?debug("handle_socket, Arg (CS) = ~p", [Arg]),
try handle_socket_(FromPid, IP, Port, Event, Arg)
catch
C:E ->
?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]),
+ error("Caught ~p:~p", [C, E]),
ok
end,
{noreply, St};
@@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
Res = connection:send(ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ])),
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
@@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) ->
}) || SvcName <- SvcNameList ],
ok.
-availability_msg(Availability, Services) ->
- [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, Services }].
+availability_msg(Availability, Services, CompSpec) ->
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }
+ | log_id_tail(CompSpec) ].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
-
+ ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
{NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec) ])).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
+ { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec) ]).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
@@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
- connection:send(FromPid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)])),
-
+ connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
ok.
@@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
Proto = list_to_existing_atom(ProtocolMod),
- Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
- base64:decode_to_string(Data)).
+ Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data).
-process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- Svcs ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Services, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]),
- ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]),
+ ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]),
case Avail of
?DLINK_ARG_AVAILABLE ->
- add_services(Svcs, FromPid),
- service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE);
+ add_services(Services, FromPid),
+ service_discovery_rpc:register_services(CompSpec, Services, ?MODULE);
?DLINK_ARG_UNAVAILABLE ->
- delete_services(FromPid, Svcs),
- service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
+ delete_services(FromPid, Services),
+ service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE)
end,
ok.
@@ -837,36 +775,15 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
-
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
-
-term_to_json(Term) ->
- rvi_common:term_to_json(Term).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) ->
rvi_common:set_value(rvi_log_id, LogId, CS).
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 45c0691..4512a59 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -63,7 +63,7 @@ terminate(_Reason, _State) ->
ok.
sock_opts() ->
- [binary, {active, once}, {packet, 4}].
+ [binary, {active, once}, {packet, 0}].
new_connection(IP, Port, Sock, State) ->
?debug("listener:new_connection(): Peer IP: ~p (ignored)", [IP]),
@@ -75,5 +75,5 @@ new_connection(IP, Port, Sock, State) ->
%% Provide component spec as extra arg.
{ok, _P} = connection:setup(undefined, 0, Sock,
dlink_tcp_rpc,
- handle_socket, [gen_nb_server:get_cb_state(State)]),
+ handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.
diff --git a/components/dlink_tls/src/dlink_tls.app.src b/components/dlink_tls/src/dlink_tls.app.src
index 7b53a18..3698775 100644
--- a/components/dlink_tls/src/dlink_tls.app.src
+++ b/components/dlink_tls/src/dlink_tls.app.src
@@ -15,7 +15,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_tls_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index c125ca3..583009c 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-include_lib("lager/include/log.hrl").
-
+-include_lib("public_key/include/public_key.hrl").
%% API
@@ -145,7 +145,6 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- %% Grab socket control
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
{ok, #st{
@@ -194,7 +193,11 @@ handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
?debug("upgrade to TLS succcessful~n", []),
ssl:setopts(NewS, [{active, Last}]),
{ok, {IP, Port}} = ssl:peername(NewS),
- NewCS = rvi_common:set_value(dlink_tls_role, client, CompSpec),
+ {ok, PeerCert} = ssl:peercert(NewS),
+ ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
+ NewCS = rvi_common:set_value(
+ dlink_tls_role, Role,
+ rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)),
{reply, ok, St#st{sock = NewS, mode = tls, role = Role,
ip = inet_parse:ntoa(IP), port = Port,
cs = NewCS}};
@@ -219,7 +222,7 @@ handle_call(_Request, _From, State) ->
%%--------------------------------------------------------------------
handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_call(send): Sending: ~p",
- [ ?MODULE, Data]),
+ [ ?MODULE, abbrev(Data)]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
case St#st.mode of
tcp -> gen_tcp:send(St#st.sock, Encoded);
@@ -253,52 +256,38 @@ handle_info({tcp, Sock, Data},
#st { ip = undefined } = St) ->
{ok, {IP, Port}} = inet:peername(Sock),
NSt = St#st { ip = inet_parse:ntoa(IP), port = Port },
- ?warning("YESSSS"),
handle_info({tcp, Sock, Data}, NSt);
-handle_info({ssl, Sock, Data},
- #st{ip = IP, port = Port,
- mod = Mod, func = Fun, cs = CS,
- packet_mod = PMod, packet_st = PSt} = State) ->
- ?debug("handle_info(data): ~p", [Data]),
- case PMod:decode(Data, PSt) of
- {ok, Elements, PSt1} ->
- ?debug("~p:handle_info(data complete): Processed: ~p",
- [?MODULE, Elements]),
- Mod:Fun(self(), IP, Port, data, Elements, CS),
+handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port,
+ packet_mod = PMod, packet_st = PSt} = State) ->
+ ?debug("handle_info(data): Data: ~p", [abbrev(Data)]),
+ ?debug("handle_info(data): From: ~p:~p ", [ IP, Port]),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elems(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
ssl:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
- {stop, Reason, State};
- {more, PSt1} ->
- ?debug("~p:handle_info(data incomplete)", [ ?MODULE]),
- ssl:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}}
+ {stop, Reason, State}
end;
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- cs = CS,
packet_mod = PMod,
packet_st = PSt} = State) ->
- ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
- ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]),
-
- case PMod:decode(Data, PSt) of
- {ok, Elements, PSt1} ->
- ?debug("~p:handle_info(data complete): Processed: ~p",
- [?MODULE, Elements]),
- Mod:Fun(self(), IP, Port, data, Elements, CS),
+ ?debug("handle_info(data): Data: ~p", [Data]),
+ ?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
+
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elems(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
- {stop, Reason, State};
- {more, PSt1} ->
- ?debug("~p:handle_info(data incomplete)", [ ?MODULE]),
- inet:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}}
+ ?debug("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
end;
handle_info({tcp_closed, Sock},
@@ -361,23 +350,87 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
do_upgrade(Sock, client, CompSpec) ->
- ssl:connect(Sock, tls_opts(client, CompSpec));
+ Opts = tls_opts(client, CompSpec),
+ ?debug("TLS Opts = ~p", [Opts]),
+ ssl:connect(Sock, Opts);
do_upgrade(Sock, server, CompSpec) ->
- ssl:ssl_accept(Sock, tls_opts(server, CompSpec)).
+ Opts = tls_opts(client, CompSpec),
+ ?debug("TLS Opts = ~p", [Opts]),
+ ssl:ssl_accept(Sock, Opts).
%% FIXME: For now, use the example certs delivered with the OTP SSL appl.
-tls_opts(Role, CompSpec) ->
- Dir = tls_dir(Role),
- {ok, CertFile} = get_config(certfile, filename:join(Dir, "cert.pem"), CompSpec),
- {ok, KeyFile} = get_config(keyfile, filename:join(Dir, "key.pem"), CompSpec),
- [{certfile, CertFile},
- {keyfile, KeyFile}].
-
-tls_dir(Role) when Role==client;
- Role==server ->
- filename:join([code:lib_dir(ssl), "examples", "certs", "etc",
- atom_to_list(Role)]).
-
-get_config(Key, Default, CompSpec) ->
- rvi_common:get_module_config(
- dlink_tls, dlink_tls_rpc, Key, Default, CompSpec).
+tls_opts(Role, _CompSpec) ->
+ {ok, DevCert} = setup:get_env(rvi_core, device_cert),
+ {ok, DevKey} = setup:get_env(rvi_core, device_key),
+ {ok, CACert} = setup:get_env(rvi_core, root_cert),
+ [
+ {verify, verify_peer},
+ {certfile, DevCert},
+ {keyfile, DevKey},
+ {cacertfile, CACert},
+ {verify_fun, {fun verify_fun/3, public_root_key()}},
+ {partial_chain, fun(X) ->
+ partial_chain(Role, X)
+ end}
+ ].
+
+public_root_key() ->
+ authorize_keys:provisioning_key().
+
+verify_fun(Cert, What, St) ->
+ ?debug("verify_fun(~p, ~p, ~p)", [abbrev(Cert), What, abbrev(St)]),
+ verify_fun_(Cert, What, St).
+
+verify_fun_(Cert, {bad_cert, selfsigned_peer}, PubKey) ->
+ ?debug("Verify self-signed cert: ~p", [abbrev(Cert)]),
+ try verify_cert_sig(Cert, PubKey) of
+ true ->
+ ?debug("verified!", []),
+ {valid, PubKey};
+ false ->
+ ?debug("verification FAILED", []),
+ {bad_cert, invalid_signature}
+ catch
+ error:Error ->
+ ?debug("Caught error:~p~n~p", [Error, erlang:get_stacktrace()]),
+ {fail, PubKey}
+ end;
+verify_fun_(_, {bad_cert, Reason}, St) ->
+ ?debug("Bad cert: ~p", [Reason]),
+ {fail, St};
+verify_fun_(_, {extension, _}, St) ->
+ {unknown, St};
+verify_fun_(_, valid, St) ->
+ {valid, St};
+verify_fun_(_, valid_peer, St) ->
+ {valid_peer, St}.
+
+partial_chain(_, Certs) ->
+ ?debug("partial_chain() invoked, length(Certs) = ~w", [length(Certs)]),
+ Decoded = (catch [public_key:der_decode('Certificate', C)
+ || C <- Certs]),
+ ?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]),
+ {trusted_ca, hd(Certs)}.
+
+handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS),
+ ok.
+
+verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS,
+ signature = Sig}, PubKey) ->
+ DER = public_key:pkix_encode('OTPTBSCertificate', TBS, otp),
+ {SignType, _} = signature_algorithm(TBS),
+ public_key:verify(DER, SignType, Sig, PubKey).
+
+signature_algorithm(#'OTPCertificate'{tbsCertificate = TBS}) ->
+ signature_algorithm(TBS);
+signature_algorithm(#'OTPTBSCertificate'{
+ signature = #'SignatureAlgorithm'{
+ algorithm = Algo}}) ->
+ public_key:pkix_sign_types(Algo).
+
+
+abbrev(T) ->
+ authorize_keys:abbrev(T).
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index e42a156..9df8f54 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -100,26 +100,14 @@ start_connection_manager() ->
[],
CompSpec),
?debug("TlsOpts = ~p", [TlsOpts]),
- IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
- Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
-
?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
dlink_tls_connmgr:start_link(),
{ok,Pid} = dlink_tls_listener:start_link(),
- ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
- %% Add listener port.
- case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of
- ok ->
- ?notice("---- RVI Node External Address: ~s",
- [ application:get_env(rvi_core, node_address, undefined)]);
+ setup_initial_listeners(Pid, TlsOpts, CompSpec),
- Err ->
- ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
- ok
- end,
?info("dlink_tls:init_rvi_component(): Setting up persistent connections."),
{ok, PersistentConnections } = rvi_common:get_module_config(data_link,
@@ -130,6 +118,23 @@ start_connection_manager() ->
setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
+setup_initial_listeners(Pid, [], CompSpec) ->
+ ?debug("no initial listeners", []);
+setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) ->
+ IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
+ Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
+ %% Add listener port.
+ ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
+ case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of
+ ok ->
+ ?notice("---- RVI Node External Address: ~s",
+ [ application:get_env(rvi_core, node_address, undefined)]);
+
+ Err ->
+ ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
+ ok
+ end.
+
setup_persistent_connections_([ ], _CompSpec) ->
ok;
@@ -324,7 +329,7 @@ handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) ->
handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]),
- ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]),
+ ?debug("data(): Elems ~p~nCS = ~p", [abbrev(Elems), abbrev(CompSpec)]),
CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
@@ -333,28 +338,28 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
[ RemoteAddress,
RemotePort,
- Signature ] =
+ Credentials ] =
opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
- Signature, CS);
-
- ?DLINK_CMD_CERT_EXCHANGE ->
- ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]),
- [ Certs ] =
- opts([?DLINK_ARG_CERTIFICATES], Elems, undefined),
- ?debug("Certs = ~p", [Certs]),
- log("certs from ~s:~w", [PeerIP, PeerPort], CS),
- authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}),
- case rvi_common:get_value(dlink_tls_role, client, CS) of
- client -> ok;
- server ->
- send_certs(FromPid, CompSpec)
- end,
- ok;
+ Credentials, CS);
+
+ %% ?DLINK_CMD_CRED_EXCHANGE ->
+ %% ?debug("got cred exch ~s:~w", [PeerIP, PeerPort]),
+ %% [ Creds ] =
+ %% opts([?DLINK_ARG_CREDENTIALS], Elems, undefined),
+ %% ?debug("Creds = ~p", [Creds]),
+ %% log("creds from ~s:~w", [PeerIP, PeerPort], CS),
+ %% authorize_rpc:store_creds(CS, Creds, {PeerIP, PeerPort}),
+ %% case rvi_common:get_value(dlink_tls_role, client, CS) of
+ %% client -> ok;
+ %% server ->
+ %% send_creds(FromPid, CompSpec)
+ %% end,
+ %% ok;
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
@@ -364,7 +369,6 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
@@ -644,12 +648,11 @@ status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
- RemotePort, Signature, CompSpec) ->
+ RemotePort, Credentials, CompSpec) ->
?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]),
?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]),
- ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
- { _NRemoteAddress, _NRemotePort} = Conn =
+ { NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -658,33 +661,30 @@ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
{ PeerIP, PeerPort };
_ -> { RemoteAddress, RemotePort}
end,
-
- case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
-
-send_certs(Pid, CompSpec) ->
- ?debug("send_certs (Pid = ~p)", [Pid]),
- {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
- [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE},
- {?DLINK_ARG_ADDRESS, LocalIP},
- {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
- {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)).
+ log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ PeerCert = rvi_common:get_value(dlink_tls_peer_cert, not_found, CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn, PeerCert),
+ connection_authorized(FromPid, Conn, CompSpec).
+
+%% send_creds(Pid, CompSpec) ->
+%% ?debug("send_creds (Pid = ~p)", [Pid]),
+%% {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
+%% dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+%% [{?DLINK_ARG_CMD, ?DLINK_CMD_CRED_EXCHANGE},
+%% {?DLINK_ARG_ADDRESS, LocalIP},
+%% {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+%% {?DLINK_ARG_CREDENTIALS, [get_credentials(CompSpec)]}], CompSpec)).
send_authorize(Pid, CompSpec) ->
- ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]),
+ ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, abbrev(CompSpec)]),
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- JWT = get_authorize_jwt(CompSpec),
+ Creds = get_credentials(CompSpec),
dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
[{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE},
+ {?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION},
{?DLINK_ARG_ADDRESS, LocalIP},
{?DLINK_ARG_PORT, integer_to_list(LocalPort)},
- {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)).
+ {?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)).
%% dlink_tls_conn:send(Pid,
%% term_to_json(
%% {struct,
@@ -708,18 +708,8 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid),
?debug("dlink_tls:authorize(): Sending authorize."),
_Res = send_authorize(FromPid, CompSpec),
- case rvi_common:get_value(dlink_tls_role, server, CompSpec) of
- server -> ok;
- client ->
- send_certs(FromPid, CompSpec)
- end,
- %% ?debug("dlink_tls:upgrade connection", []),
- %% UgRes = dlink_tls_conn:upgrade(FromPid, server),
- %% ?debug("upgrade result: ~p", [UgRes]),
ok;
_ ->
- %% UgRes = dlink_tls_conn:upgrade(FromPid, client),
- %% ?debug("upgrade result: ~p", [UgRes]),
ok
end,
@@ -797,31 +787,31 @@ get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
- [not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
- end.
+%% get_authorize_jwt(CompSpec) ->
+%% case authorize_rpc:get_authorize_jwt(CompSpec) of
+%% [ok, JWT] ->
+%% JWT;
+%% [not_found] ->
+%% ?error("No authorize JWT~n", []),
+%% error(cannot_authorize)
+%% end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-validate_auth_jwt(JWT, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
+%% validate_auth_jwt(JWT, Conn, CompSpec) ->
+%% case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
+%% [ok] ->
+%% true;
+%% [not_found] ->
+%% false
+%% end.
term_to_json(Term) ->
binary_to_list(iolist_to_binary(exo_json:encode(Term))).
@@ -834,7 +824,7 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
Res = [ opt(K, Elems, Def) || K <- Keys],
- ?debug("opts(~p) -> ~p", [Keys, Elems]),
+ ?debug("opts(~p) -> ~p", [Keys, abbrev(Elems)]),
Res.
@@ -848,3 +838,6 @@ start_log(Pfx, Fmt, Args, CS) ->
log(Fmt, Args, CS) ->
rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+
+abbrev(Data) ->
+ authorize_keys:abbrev(Data).
diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl
index f4f4cb8..6ccabd2 100644
--- a/components/rvi_common/include/rvi_dlink_bin.hrl
+++ b/components/rvi_common/include/rvi_dlink_bin.hrl
@@ -8,24 +8,24 @@
%% Commonly used protocol identifiers across dlink implementations
%%
--define(DLINK_CMD_AUTHORIZE, <<"au">>).
--define(DLINK_CMD_CERT_EXCHANGE, <<"crt">>).
--define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>).
--define(DLINK_CMD_RECEIVE, <<"rcv">>).
--define(DLINK_CMD_FRAG, <<"frg">>).
--define(DLINK_CMD_PING, <<"ping">>).
+-define(DLINK_CMD_AUTHORIZE, <<"au">>).
+-define(DLINK_CMD_CRED_EXCHANGE, <<"cre">>).
+-define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>).
+-define(DLINK_CMD_RECEIVE, <<"rcv">>).
+-define(DLINK_CMD_FRAG, <<"frg">>).
+-define(DLINK_CMD_PING, <<"ping">>).
--define(DLINK_ARG_CMD, <<"cmd">>).
--define(DLINK_ARG_TRANSACTION_ID, <<"tid">>).
--define(DLINK_ARG_ADDRESS, <<"addr">>).
--define(DLINK_ARG_PORT, <<"port">>).
--define(DLINK_ARG_VERSION, <<"ver">>).
--define(DLINK_ARG_CERTIFICATE, <<"cert">>).
--define(DLINK_ARG_CERTIFICATES, <<"certs">>).
--define(DLINK_ARG_SIGNATURE, <<"sign">>).
--define(DLINK_ARG_SERVICES, <<"svcs">>).
--define(DLINK_ARG_MODULE, <<"mod">>).
--define(DLINK_ARG_AVAILABLE, <<"av">>).
--define(DLINK_ARG_UNAVAILABLE, <<"un">>).
--define(DLINK_ARG_STATUS, <<"stat">>).
--define(DLINK_ARG_DATA, <<"data">>).
+-define(DLINK_ARG_CMD, <<"cmd">>).
+-define(DLINK_ARG_TRANSACTION_ID, <<"tid">>).
+-define(DLINK_ARG_ADDRESS, <<"addr">>).
+-define(DLINK_ARG_PORT, <<"port">>).
+-define(DLINK_ARG_VERSION, <<"ver">>).
+-define(DLINK_ARG_CREDENTIAL, <<"cred">>).
+-define(DLINK_ARG_CREDENTIALS, <<"creds">>).
+-define(DLINK_ARG_SIGNATURE, <<"sign">>).
+-define(DLINK_ARG_SERVICES, <<"svcs">>).
+-define(DLINK_ARG_MODULE, <<"mod">>).
+-define(DLINK_ARG_AVAILABLE, <<"av">>).
+-define(DLINK_ARG_UNAVAILABLE, <<"un">>).
+-define(DLINK_ARG_STATUS, <<"stat">>).
+-define(DLINK_ARG_DATA, <<"data">>).
diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl
index 31700ca..cc3e7b9 100644
--- a/components/rvi_common/src/exoport_exo_http.erl
+++ b/components/rvi_common/src/exoport_exo_http.erl
@@ -31,6 +31,13 @@ instance(SupMod, AppMod, Opts) ->
handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == 'POST' ->
+ ensure_ready(fun handle_post/4, Socket, Request, Body, AppMod);
+
+handle_body(Socket, _Request, _Body, _AppMod) ->
+ exo_http_server:response(Socket, undefined, 404, "Not Found",
+ "Object not found. Try using POST method.").
+
+handle_post(Socket, Request, Body, AppMod) ->
case Request#http_request.headers of
#http_chdr{content_type = "application/json" ++ _ = T}
when T=="application/json";
@@ -39,11 +46,7 @@ handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == '
handle_post_json(Socket, Request, Body, AppMod);
#http_chdr{content_type = "multipart/" ++ _} ->
handle_post_multipart(Socket, Request, Body, AppMod)
- end;
-
-handle_body(Socket, _Request, _Body, _AppMod) ->
- exo_http_server:response(Socket, undefined, 404, "Not Found",
- "Object not found. Try using POST method.").
+ end.
handle_post_json(Socket, _Request, Body, AppMod) ->
try decode_json(Body) of
@@ -211,3 +214,15 @@ opt(K, L, Def) ->
{_, V} -> V;
false -> Def
end.
+
+
+ensure_ready(F, Socket, Request, Body, AppMod) ->
+ try rvi_server:ensure_ready(_Timeout = 10000),
+ F(Socket, Request, Body, AppMod)
+ catch
+ error:timeout ->
+ ?error("~p timeout waiting for rvi_core", [?MODULE]),
+ exo_http_server:response(Socket, undefined, 501,
+ "Internal Error",
+ "Internal Error")
+ end.
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index c9bbabb..0b54d53 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -191,7 +191,6 @@ notification(Component,
?debug(" Module : ~p", [Module]),
?debug(" Function : ~p", [Function]),
?debug(" InArgPropList : ~p", [InArgPropList]),
- ?debug(" CompSpec : ~p", [CompSpec]),
case get_module_type(Component, Module, CompSpec) of
%% We have a gen_server
@@ -288,6 +287,8 @@ unstruct(X) ->
%% If Path is just a single element, convert to list and try again.
+get_json_element(_, []) ->
+ {error, undefined};
get_json_element(ElemPath, JSON) when is_atom(ElemPath) ->
get_json_element([ElemPath], JSON);
@@ -299,7 +300,7 @@ get_json_element(ElemPath, JSON) when is_tuple(JSON) ->
get_json_element(ElemPath, [T|_] = JSON) when is_tuple(T) ->
get_json_element_(ElemPath, JSON);
-get_json_element(ElemPath, JSON) when is_list(JSON) ->
+get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) ->
case exo_json:decode_string(JSON) of
{ok, Data } ->
get_json_element_(ElemPath, Data);
@@ -311,7 +312,7 @@ get_json_element(ElemPath, JSON) when is_list(JSON) ->
get_json_element(P, J) ->
?warning("get_json_element(): Unknown call structure; Path: ~p | JSON: ~p",
[P, J]),
- {error, call, {P, J}}.
+ {error, {call, {P, J}}}.
get_json_element_(_, undefined) ->
{ error, undefined };
@@ -508,7 +509,7 @@ get_component_config_(Component, Default, CompList) ->
get_component_specification() ->
CS = get_component_specification_(),
- lager:debug("CompSpec = ~p", [CS]),
+ %% lager:debug("CompSpec = ~p", [CS]),
CS.
get_component_specification_() ->
@@ -580,8 +581,8 @@ get_component_modules(_, _) ->
get_module_specification(Component, Module, CompSpec) ->
case get_component_modules(Component, CompSpec) of
undefined ->
- ?debug("get_module_specification(): Missing: rvi_core:component: ~p: ~p",
- [Component, CompSpec]),
+ ?debug("get_module_specification(): Missing: rvi_core:component: ~p",
+ [Component]),
undefined;
Modules ->
@@ -614,7 +615,7 @@ get_module_config(Component, Module, Key, CompSpec) ->
case proplists:get_value(Key, ModConf, undefined ) of
undefined ->
?debug("get_module_config(): Missing component spec: "
- "rvi_core:component:~p:~p:~p{...}: ~p",
+ "~p:~p:~p{...}: ~p",
[Component, Module, Key, ModConf]),
{error, {not_found, Component, Module, Key}};
@@ -764,9 +765,9 @@ start_json_rpc_server(Component, Module, Supervisor, XOpts) ->
Module,
ExoHttpOpts);
Err ->
- ?info("rvi_common:start_json_rpc_server(~p:~p): "
- "No JSON-RPC address setup. skip",
- [ Component, Module ]),
+ ?debug("start_json_rpc_server(~p:~p): "
+ "No JSON-RPC address setup. skip",
+ [ Component, Module ]),
Err
end.
diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl
index 5d0fa8e..962252c 100644
--- a/components/rvi_common/src/rvi_log.erl
+++ b/components/rvi_common/src/rvi_log.erl
@@ -2,15 +2,22 @@
-behaviour(gen_server).
+-compile(export_all).
+
-export([start_link/0,
log/3,
- flog/4,
+ flog/4, flog/5,
new_id/1,
fetch/1,
timestamp/0,
format/2
]).
+-export([entry/3,
+ exit_good/3,
+ exit_warn/3,
+ exit_fail/3]).
+
-export([start_json_server/0,
handle_rpc/2,
handle_notification/2]).
@@ -28,6 +35,7 @@
seq = 1,
node_tag}).
-record(evt, {id,
+ level,
component,
event}).
-define(IDS, rvi_log_ids).
@@ -35,13 +43,35 @@
-define(MAX_LENGTH, 60).
+%% levels
+-define(INFO, 0).
+-define(GOOD, 1).
+-define(WARN, 2).
+-define(FAIL, 3).
+
start_link() ->
create_tabs(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+entry(TID, Component, Event) -> log(TID, ?INFO, Component, Event).
+exit_good(TID, Component, Event) -> log(TID, ?GOOD, Component, Event).
+exit_warn(TID, Component, Event) -> log(TID, ?WARN, Component, Event).
+exit_fail(TID, Component, Event) -> log(TID, ?FAIL, Component, Event).
+
log(TID, Component, Event) when is_binary(TID), is_binary(Component) ->
- gen_server:cast(?MODULE, {log, TID, timestamp(), Component, bin(Event)}).
+ log(TID, ?INFO, Component, Event).
+
+log(TID, Level, Component, Event) when is_binary(TID), is_binary(Component) ->
+ gen_server:cast(?MODULE, {log, TID, level_num(Level), timestamp(), Component, bin(Event)}).
+
+
+level_num(L) when is_integer(L), L >= ?INFO, L =< ?FAIL ->
+ L;
+level_num(info ) -> 0;
+level_num(result ) -> 1;
+level_num(warning) -> 2;
+level_num(error ) -> 3.
new_id(Prefix) ->
gen_server:call(?MODULE, {new_id, bin(Prefix)}).
@@ -63,18 +93,156 @@ trunc_msg(Bin) ->
flog(Fmt, Args, Component, CS) ->
+ flog(0, Fmt, Args, Component, CS).
+
+flog(Level, Fmt, Args, Component, CS) ->
LogTID = rvi_common:get_log_id(CS),
- log(LogTID, Component, format(Fmt, Args)).
+ log(LogTID, Level, Component, format(Fmt, Args)).
timestamp() ->
os:timestamp().
+%% shorthand for the select pattern
+-define(ELEM(A), {element, #evt.A, '$_'}).
+-define(PROD, {{'$1', ?ELEM(level), ?ELEM(component), ?ELEM(event)}}).
+
fetch(Tids) ->
+ fetch(Tids, []).
+
+fetch(Tids, Args) ->
TidSet = select_ids(Tids),
- [{Tid, ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
- component = '$2',
- event = '$3'}, [], [{{'$1', '$2', '$3'}}] }])}
- || Tid <- TidSet].
+ lists:foldr(
+ fun(Tid, Acc) ->
+ case match_events(
+ Args,
+ ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
+ level = '$2',
+ component = '$3',
+ event = '$4'}, [], [?PROD] }])) of
+ [] -> Acc;
+ Events ->
+ [{Tid, Events}|Acc]
+ end
+ end, [], TidSet).
+
+match_events(_, []) ->
+ [];
+match_events(Args, Events) ->
+ lists:foldl(
+ fun({<<"level">>, Str}, Acc) ->
+ filter_by_level(Acc, parse_level_expr(Str));
+ (_, Acc) ->
+ Acc
+ end, Events, Args).
+
+%% The level comparison expressions use the following syntax:
+%% Expr :: Int
+%% | BinOp Expr
+%% | UnaryOp Expr
+%% | '(' Expr ')'
+%%
+%% BinOp :: '==' | '>=' | '=>' | '<=' | '=<' | '!=' | '|' | '&'
+%% UnaryOp :: '!' | 'not'
+%%
+%% The token scanning pass translates the tokens to standard Erlang tokens
+%% The parsing pass first infills variable references where needed, e.g.
+%% "> 1" would be translated to "L > 1" (where L is a variable bound to the
+%% current Event Level at evaluation time), and
+%% "1" would be translated to "L == 1".
+%%
+%% The infill pass is needed in order to produce a legal Erlang grammar. It's followed
+%% by a call to the erlang parser. The short forms are expanded in the parsed form.
+%%
+%% When parsing and evaluating the expressions any exception is translated into a failed
+%% comparison.
+%%
+parse_level_expr(Str) ->
+ try level_grammar(scan_level_expr(Str, []))
+ catch
+ error:_ ->
+ [{atom, 1, false}]
+ end.
+
+scan_level_expr(<<"!=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=/=',1}|Acc]);
+scan_level_expr(<<"==", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=:=',1}|Acc]);
+scan_level_expr(<<"=>", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=<',1}|Acc]);
+scan_level_expr(<<"<=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>=',1}|Acc]);
+scan_level_expr(<<">=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>=',1}|Acc]);
+scan_level_expr(<<"=<", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=<',1}|Acc]);
+scan_level_expr(<<">", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>',1}|Acc]);
+scan_level_expr(<<"<", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'<',1}|Acc]);
+scan_level_expr(<<"|", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'orelse',1}|Acc]);
+scan_level_expr(<<"&", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'andalso',1}|Acc]);
+scan_level_expr(<<I, Rest/binary>>, Acc) when I >= $0, I =< $3 ->
+ scan_level_expr(Rest, [{integer, 1, I-$0} | Acc]);
+scan_level_expr(<<"(", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'(',1}|Acc]);
+scan_level_expr(<<")", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{')',1}|Acc]);
+scan_level_expr(<<"not", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'not',1}|Acc]);
+scan_level_expr(<<"!", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'not',1}|Acc]);
+scan_level_expr(<<"\s", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<"\t", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<"\n", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<>>, Acc) ->
+ lists:reverse(Acc).
+
+level_grammar(Toks) ->
+ case erl_parse:parse_exprs(infill_vars(Toks)) of
+ {ok, Exprs} ->
+ expand_shorthand(Exprs);
+ Error ->
+ error(Error)
+ end.
+
+-define(is_op(H), H=='=:='; H=='=!='; H=='>'; H=='<'; H=='>='; H=='=<').
+
+infill_vars([{O,_}=H|T]) when ?is_op(O) ->
+ [{var,1,'L'},H|infill_vars(T)];
+infill_vars([H|T]) ->
+ [H|infill_vars(T)];
+infill_vars([]) ->
+ [{dot,1}].
+
+expand_shorthand([Expr|Exprs]) ->
+ [expand_shorthand_(Expr)|expand_shorthand(Exprs)];
+expand_shorthand([]) ->
+ [].
+
+expand_shorthand_({integer,_,_} = I) ->
+ {op, 1, '=:=', {var,1,'L'}, I};
+expand_shorthand_({op,Ln,Op,A,B}) when Op=='andalso';
+ Op=='orelse' ->
+ {op,Ln,Op,expand_shorthand_(A), expand_shorthand_(B)};
+expand_shorthand_({op,Ln,'not',E}) ->
+ {op,Ln,'not',expand_shorthand_(E)};
+expand_shorthand_(Expr) ->
+ Expr.
+
+filter_by_level([H|T], Expr) ->
+ Res = try erl_eval:exprs(Expr, [{'L', element(2,H)}]) of
+ {value, true, _} -> true;
+ _ -> false
+ catch
+ _:_ -> false
+ end,
+ if Res ->
+ [H|filter_by_level(T, Expr)];
+ true ->
+ filter_by_level(T, Expr)
+ end.
init(_) ->
{ok, Tag} = rvi_common:get_module_config(
@@ -117,11 +285,11 @@ handle_notification(Other, _Args) ->
ok.
handle_cast(log_start, #st{n = N, node_tag = Tag} = St) ->
- do_log(tid_(<<"rvi_log">>, 0, Tag), timestamp(), <<"rvi_common">>,
+ do_log(tid_(<<"rvi_log">>, 0, Tag), ?INFO, timestamp(), <<"rvi_common">>,
format("Started - Tag = ~s", [Tag]), N),
{noreply, St};
-handle_cast({log, TID, TS, Component, Event}, #st{n = N} = St) ->
- do_log(TID, TS, Component, Event, N),
+handle_cast({log, TID, Level, TS, Component, Event}, #st{n = N} = St) ->
+ do_log(TID, Level, TS, Component, Event, N),
{noreply, St};
handle_cast(_, St) ->
{noreply, St}.
@@ -161,10 +329,10 @@ maybe_new(T, Opts) ->
tid_(Prefix, Seq, Tag) ->
<<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>.
-do_log(Tid, TS, Component, Event, N) ->
+do_log(Tid, Level, TS, Component, Event, N) ->
case ets:member(?IDS, Tid) of
true ->
- store_event(Tid, TS, Component, Event);
+ store_event(Tid, Level, TS, Component, Event);
false ->
case ets:info(?IDS, size) of
Sz when Sz >= N ->
@@ -173,13 +341,14 @@ do_log(Tid, TS, Component, Event, N) ->
ok
end,
ets:insert(?IDS, {Tid}),
- store_event(Tid, TS, Component, Event)
+ store_event(Tid, Level, TS, Component, Event)
end.
-store_event(Tid, TS, Component, Event) ->
- rvi_log_log:info("~-20s ~-12s ~s", [Tid, Component, Event]),
- ?info("RVI_LOG: ~p/~p/~p", [Tid, Component, Event]),
+store_event(Tid, Level, TS, Component, Event) ->
+ rvi_log_log:info("~-20s ~-2w ~-12s ~s", [Tid, Level, Component, Event]),
+ ?info("RVI_LOG: ~p/~w/~p/~p", [Tid, Level, Component, Event]),
ets:insert(?EVENTS, #evt{id = {Tid, TS},
+ level = Level,
component = Component,
event = Event}).
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index b4f14ab..bd7ab54 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -92,7 +92,7 @@ get_modules_by_service(CompSpec, Service) ->
register_services(CompSpec, Services, DataLinkModule) ->
?debug("~p:register_services()", [?MODULE]),
- ?debug(" CompSpec : ~p", [CompSpec]),
+ ?debug(" CompSpec : ~p", [authorize_keys:abbrev(CompSpec)]),
?debug(" Services : ~p", [Services]),
?debug(" DataLinkMod : ~p", [DataLinkModule]),
rvi_common:notification(service_discovery, ?MODULE, register_services,
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index a7c75e0..576dc13 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -236,7 +236,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
-
+ LogId = log_id_json_tail(Params ++ Parameters),
?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]),
?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]),
@@ -244,7 +244,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
case gen_server:call(
?SERVER,
{rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}) of
+ [ SvcName, Timeout, Parameters | LogId ]}) of
[not_found] ->
{ok, [{status, rvi_common:json(not_found)}]};
[Res, TID] ->
@@ -318,9 +318,9 @@ handle_rpc(<<"message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ LogId = log_id_json_tail(Args ++ Parameters),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}),
-
+ [ SvcName, Timeout, Parameters | LogId]}),
{ok, [ { status, rvi_common:json_rpc_status(Res) },
{ transaction_id, TID },
{ method, <<"message">>}
@@ -396,12 +396,12 @@ handle_notification(Other, _Args) ->
%% the only calls invoked by other components, and not the locally
%% connected services that uses the same HTTP port to transmit their
%% register_service, and message calls.
-handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
+handle_call({ rvi, register_local_service, [SvcName, URL | T] }, _From, St) ->
?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]),
?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
FullSvcName = rvi_common:local_service_to_string(SvcName),
- CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs),
+ CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs),
?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]),
ets:insert(?SERVICE_TABLE, #service_entry {
@@ -416,7 +416,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
%% Return ok.
{ reply, [ ok, FullSvcName ], St };
-handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
+handle_call({ rvi, unregister_local_service, [SvcName | T] }, _From, St) ->
?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [SvcName]),
@@ -424,7 +424,7 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
- CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs),
+ CS = start_log(T, "unreg local service: ~s", [SvcName], St#st.cs),
service_discovery_rpc:unregister_services(CS, [SvcName], local),
%% Return ok.
@@ -444,11 +444,11 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
handle_call({ rvi, handle_local_message,
- [SvcName, TimeoutArg, Parameters] }, _From, St) ->
+ [SvcName, TimeoutArg, Parameters | Tail] }, _From, St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
- CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs),
+ CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs),
%%
%% Authorize local message and retrieve a certificate / signature
%% that will be accepted by the receiving node that will deliver
@@ -468,7 +468,7 @@ handle_call({ rvi, handle_local_message,
%% If the timeout is more than 24 hrs old when parsed as unix time,
%% then we are looking at a relative msec timeout. Convert accordingly
%%
- { Mega, Sec, _Micro } = now(),
+ { Mega, Sec, _Micro } = os:timestamp(),
Now = Mega * 1000000 + Sec,
Timeout =
@@ -746,11 +746,34 @@ announce_service_availability(Available, SvcName) ->
end
end, BlockURLs, ?SERVICE_TABLE).
+start_log(Info, Fmt, Args, CS) ->
+ ?debug("start_log(~p,~p,~p,~p)", [Info,Fmt,Args,CS]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Info) of
+ {ok, ID} ->
+ start_log_(ID, Fmt, Args, CS);
+ {error, _} ->
+ start_log(Fmt, Args, CS)
+ end.
+
+start_log(Fmt, Args, CS) ->
+ start_log_(rvi_log:new_id(pfx()), Fmt, Args, CS).
+
+start_log_(ID, Fmt, Args, CS) ->
+ CS1 = rvi_common:set_value(rvi_log_id, ID, CS),
+ log(Fmt, Args, CS1),
+ CS1.
-start_log(Pfx, Fmt, Args, CS) ->
- LogTID = rvi_log:new_id(Pfx),
- rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)),
- rvi_common:set_value(rvi_log_id, LogTID, CS).
+pfx() ->
+ <<"svc_edge">>.
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS).
+ rvi_log:flog(Fmt, Args, pfx(), CS).
+
+log_id_json_tail(Args) ->
+ ?debug("Args = ~p", [Args]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Args) of
+ {ok, ID} ->
+ [{<<"rvi_log_id">>, ID}];
+ {error, _} ->
+ []
+ end.