summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--components/authorize/src/authorize_keys.erl343
-rw-r--r--components/authorize/src/authorize_rpc.erl146
-rw-r--r--components/dlink/src/dlink.app.src6
-rw-r--r--components/dlink/src/dlink_data_json.erl60
-rw-r--r--components/dlink/src/dlink_data_msgpack.erl15
-rw-r--r--components/dlink/src/dlink_data_rvi.erl123
-rw-r--r--components/dlink_bt/src/bt_connection.erl144
-rw-r--r--components/dlink_bt/src/bt_connection_manager.erl1
-rw-r--r--components/dlink_bt/src/bt_listener.erl64
-rw-r--r--components/dlink_bt/src/dlink_bt.app.src2
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl296
-rw-r--r--components/dlink_sms/src/dlink_sms.app.src6
-rw-r--r--components/dlink_tcp/src/connection.erl110
-rw-r--r--components/dlink_tcp/src/dlink_tcp.app.src5
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl207
-rw-r--r--components/dlink_tcp/src/listener.erl4
-rw-r--r--components/dlink_tls/src/dlink_tls.app.src3
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl159
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl167
-rw-r--r--components/rvi_common/include/rvi_dlink_bin.hrl40
-rw-r--r--components/rvi_common/src/exoport_exo_http.erl25
-rw-r--r--components/rvi_common/src/rvi_common.erl21
-rw-r--r--components/rvi_common/src/rvi_log.erl201
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl2
-rw-r--r--components/service_edge/src/service_edge_rpc.erl55
-rw-r--r--deps/exo/src/exo_http_server.erl4
-rw-r--r--deps/exo/src/exo_socket.erl81
-rw-r--r--deps/exo/src/exo_socket_server.erl45
-rw-r--r--deps/exo/src/exo_socket_session.erl122
-rw-r--r--doc/rvi_protocol.md34
-rw-r--r--priv/config/rvi_backend.config6
-rw-r--r--priv/config/rvi_common.config4
-rw-r--r--priv/config/rvi_sample.config14
-rw-r--r--priv/test_config/backend.config12
-rw-r--r--priv/test_config/bt_backend.config2
-rw-r--r--priv/test_config/bt_sample.config2
-rw-r--r--priv/test_config/sample.config12
-rw-r--r--src/rvi_server.erl42
-rw-r--r--test/rvi_core_SUITE.erl136
39 files changed, 1513 insertions, 1208 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index 70bb021..a362c7b 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -2,24 +2,25 @@
-behaviour(gen_server).
-export([get_key_pair/0,
+ get_device_key/0,
get_key_pair_from_pem/2,
get_pub_key/1,
- authorize_jwt/0,
provisioning_key/0,
signed_public_key/2,
save_keys/2,
- save_cert/4]).
--export([get_certificates/0,
- get_certificates/1]).
+ save_cred/5]).
+-export([get_credentials/0,
+ get_credentials/1]).
-export([validate_message/2,
validate_service_call/2]).
-export([filter_by_service/2,
- find_cert_by_service/1]).
+ find_cred_by_service/1]).
-export([public_key_to_json/1,
json_to_public_key/1]).
-export([self_signed_public_key/0]). % just temporary
-export([pp_key/1,
+ abbrev/1,
abbrev_bin/1,
abbrev_payload/1,
abbrev_jwt/1]).
@@ -36,20 +37,28 @@
-include_lib("public_key/include/public_key.hrl").
-record(st, {provisioning_key,
- cert_dir,
- authorize_jwt}).
-
--record(cert, {id,
- register = [],
- invoke = [],
+ dev_cert,
+ cred_dir}).
+
+%% -record(cert, {id,
+%% register = [],
+%% invoke = [],
+%% validity = [],
+%% jwt,
+%% cert}).
+
+-record(cred, {id,
+ right_to_register = [],
+ right_to_invoke = [],
validity = [],
+ device_cert,
jwt,
- cert}).
+ cred}).
-record(key, {id,
key}).
--define(CERTS, authorize_certs).
+-define(CREDS, authorize_creds).
-define(KEYS, authorize_keys).
public_key_to_json(#'RSAPublicKey'{modulus = N, publicExponent = E}) ->
@@ -101,8 +110,15 @@ get_key_pair() ->
get_key_pair_from_pem(openssl, Pem)
end.
-authorize_jwt() ->
- gen_server:call(?MODULE, authorize_jwt).
+get_device_key() ->
+ case get_env(device_key) of
+ undefined ->
+ {undefined, undefined};
+ [_|_] = File ->
+ get_key_pair_from_pem(openssl, File);
+ {openssl_pem, Pem} ->
+ get_key_pair_from_pem(openssl, Pem)
+ end.
validate_message(JWT, Conn) ->
gen_server:call(?MODULE, {validate_message, JWT, Conn}).
@@ -110,17 +126,17 @@ validate_message(JWT, Conn) ->
validate_service_call(Service, Conn) ->
gen_server:call(?MODULE, {validate_service_call, Service, Conn}).
-get_certificates() ->
- get_certificates(local).
+get_credentials() ->
+ get_credentials(local).
-get_certificates(Conn) ->
- gen_server:call(?MODULE, {get_certificates, Conn}).
+get_credentials(Conn) ->
+ gen_server:call(?MODULE, {get_credentials, Conn}).
filter_by_service(Services, Conn) ->
gen_server:call(?MODULE, {filter_by_service, Services, Conn}).
-find_cert_by_service(Service) ->
- gen_server:call(?MODULE, {find_cert_by_service, Service}).
+find_cred_by_service(Service) ->
+ gen_server:call(?MODULE, {find_cred_by_service, Service}).
provisioning_key() ->
gen_server:call(?MODULE, provisioning_key).
@@ -128,8 +144,8 @@ provisioning_key() ->
save_keys(Keys, Conn) ->
gen_server:call(?MODULE, {save_keys, Keys, Conn}).
-save_cert(Cert, JWT, Conn, LogId) ->
- gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn, LogId}).
+save_cred(Cred, JWT, Conn, PeerCert, LogId) ->
+ gen_server:call(?MODULE, {save_cred, Cred, JWT, Conn, PeerCert, LogId}).
%% Gen_server functions
@@ -137,7 +153,7 @@ start_link() ->
create_ets(),
case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of
{ok, Pid} = Ok ->
- ets:give_away(?CERTS, Pid, undefined),
+ ets:give_away(?CREDS, Pid, undefined),
ets:give_away(?KEYS, Pid, undefined),
Ok;
Other ->
@@ -145,18 +161,19 @@ start_link() ->
end.
init([]) ->
- ProvisioningKey = get_pub_key(get_env(provisioning_key)),
+ ProvisioningKey = get_pub_key_from_cert(get_env(root_cert)),
?debug("ProvisioningKey = ~s~n", [pp_key(ProvisioningKey)]),
- CertDir = setup:verify_dir(get_env(cert_dir)),
- {ok, AuthJwt0} = file:read_file(get_env(authorize_jwt)),
- AuthJwt = strip_nl(AuthJwt0),
- ?debug("CertDir = ~p~n", [CertDir]),
- Certs = scan_certs(CertDir, ProvisioningKey),
- ?debug("scan_certs found ~p certificates~n", [length(Certs)]),
- [ets:insert(?CERTS, {{local, C#cert.id}, C}) || C <- Certs],
+ {ok, DevCertBin} = file:read_file(get_env(device_cert)),
+ DevCert = strip_pem_begin_end(DevCertBin),
+ ?debug("Own DevCert = ~w", [abbrev_bin(DevCert)]),
+ CredDir = setup:verify_dir(get_env(cred_dir)),
+ ?debug("CredDir = ~p~n", [CredDir]),
+ Creds = scan_creds(CredDir, ProvisioningKey, DevCert),
+ ?debug("scan_creds found ~p credentials~n", [length(Creds)]),
+ [ets:insert(?CREDS, {{local, C#cred.id}, C}) || C <- Creds],
{ok, #st{provisioning_key = ProvisioningKey,
- cert_dir = CertDir,
- authorize_jwt = AuthJwt}}.
+ dev_cert = DevCert,
+ cred_dir = CredDir}}.
handle_call(Req, From, S) ->
try handle_call_(Req, From, S)
@@ -167,13 +184,11 @@ handle_call(Req, From, S) ->
{reply, error, S}
end.
-handle_call_(authorize_jwt, _, S) ->
- {reply, S#st.authorize_jwt, S};
handle_call_(provisioning_key, _, S) ->
{reply, S#st.provisioning_key, S};
-handle_call_({get_certificates, Conn}, _, S) ->
- Certs = certs_by_conn(Conn),
- {reply, Certs, S};
+handle_call_({get_credentials, Conn}, _, S) ->
+ Creds = creds_by_conn(Conn),
+ {reply, Creds, S};
handle_call_({save_keys, Keys, Conn}, _, S) ->
?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]),
save_keys_(Keys, Conn),
@@ -182,14 +197,14 @@ handle_call_({validate_message, JWT, Conn}, _, S) ->
{reply, validate_message_(JWT, Conn), S};
handle_call_({validate_service_call, Svc, Conn}, _, S) ->
{reply, validate_service_call_(Svc, Conn), S};
-handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) ->
- case process_cert_struct(Cert, JWT) of
+handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) ->
+ case process_cred_struct(Cred, JWT, PeerCert) of
invalid ->
- log(LogId, "cert INVALID Conn=~s:~w", [IP, Port]),
+ log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]),
{reply, {error, invalid}, S};
- #cert{} = C ->
- ets:insert(?CERTS, {{Conn, C#cert.id}, C}),
- log(LogId, "cert stored ~s Conn=~s:~w", [abbrev_bin(C#cert.id), IP, Port]),
+ #cred{} = C ->
+ ets:insert(?CREDS, {{Conn, C#cred.id}, C}),
+ log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]),
{reply, ok, S}
end;
handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
@@ -197,9 +212,9 @@ handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
Filtered = filter_by_service_(Services, Conn),
?debug("Filtered = ~p~n", [Filtered]),
{reply, Filtered, State};
-handle_call_({find_cert_by_service, Service} = R, _From, State) ->
+handle_call_({find_cred_by_service, Service} = R, _From, State) ->
?debug("authorize_keys:handle_call(~p,...)~n", [R]),
- Res = find_cert_by_service_(Service),
+ Res = find_cred_by_service_(Service),
?debug("Res = ~p~n", [case Res of {ok,{A,B}} -> {ok,{A,abbrev_bin(B)}}; _ -> Res end]),
{reply, Res, State};
handle_call_(_, _, S) ->
@@ -219,28 +234,28 @@ code_change(_FromVsn, S, _Extra) ->
%% Local functions
-certs_by_conn(Conn) ->
- ?debug("certs_by_conn(~p)~n", [Conn]),
+creds_by_conn(Conn) ->
+ ?debug("creds_by_conn(~p)~n", [Conn]),
UTC = rvi_common:utc_timestamp(),
- Certs = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{jwt = '$1',
+ Creds = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{jwt = '$1',
validity = '$2',
_='_'}},
[], [{{'$1', '$2'}}] }]),
- ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]),
- [C || {C,V} <- Certs, check_validity(V, UTC)].
+ ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]),
+ [C || {C,V} <- Creds, check_validity(V, UTC)].
-cert_recs_by_conn(Conn) ->
- ?debug("cert_recs_by_conn(~p)~n", [Conn]),
+cred_recs_by_conn(Conn) ->
+ ?debug("cred_recs_by_conn(~p)~n", [Conn]),
UTC = rvi_common:utc_timestamp(),
- Certs = ets:select(?CERTS, [{ {{Conn,'_'}, '$1'},
+ Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'},
[], ['$1'] }]),
- ?debug("rough selection: ~p~n", [[abbrev_bin(C#cert.id) || C <- Certs]]),
- [C || C <- Certs, check_validity(C#cert.validity, UTC)].
+ ?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]),
+ [C || C <- Creds, check_validity(C#cred.validity, UTC)].
filter_by_service_(Services, Conn) ->
- ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]),
- Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1',
- _ = '_'}},
+ ?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]),
+ Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1',
+ _ = '_'}},
[], ['$1'] }]),
?debug("Services by conn (~p) -> ~p~n", [Conn, Invoke]),
filter_svcs_(Services, Invoke).
@@ -260,25 +275,25 @@ filter_svcs_([S|Svcs], Invoke) ->
filter_svcs_([], _) ->
[].
-find_cert_by_service_(Service) ->
+find_cred_by_service_(Service) ->
SvcParts = split_path(strip_prot(Service)),
- LocalCerts = ets:select(?CERTS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]),
- ?debug("find_cert_by_service(~p~nLocalCerts = ~p~n",
- [Service, [{Id,Reg,Inv} || #cert{id = Id,
- invoke = Inv,
- register = Reg} <- LocalCerts]]),
+ LocalCreds = ets:select(?CREDS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]),
+ ?debug("find_creds_by_service(~p~nLocalCreds = ~p~n",
+ [Service, [{Id,Reg,Inv} || #cred{id = Id,
+ right_to_invoke = Inv,
+ right_to_register = Reg} <- LocalCreds]]),
case lists:foldl(
- fun(#cert{register = Register} = C, {Max, _} = Acc) ->
+ fun(#cred{right_to_register = Register} = C, {Max, _} = Acc) ->
case match_length(Register, SvcParts) of
L when L > Max ->
{L, C};
_ ->
Acc
end
- end, {0, none}, LocalCerts) of
+ end, {0, none}, LocalCreds) of
{0, none} ->
{error, not_found};
- {_, #cert{id = Id, jwt = JWT}} ->
+ {_, #cred{id = Id, jwt = JWT}} ->
{ok, {Id, JWT}}
end.
@@ -329,7 +344,7 @@ match_svc_(_, _) ->
false.
get_env(K) ->
- Res = case application:get_env(rvi_core, K) of
+ Res = case setup:get_env(rvi_core, K) of
{ok, V} -> V;
_ -> undefined
end,
@@ -382,8 +397,32 @@ get_openssl_pub_key(File) ->
undefined
end.
+get_pub_key_from_cert(File) ->
+ case file:read_file(File) of
+ {ok, Bin} ->
+ get_pub_key_from_cert_rec(
+ public_key:pem_entry_decode(hd(public_key:pem_decode(Bin))));
+ Error ->
+ error({cannot_read_cert, [Error, {file, File}]})
+ end.
+
+get_pub_key_from_cert_rec(#'Certificate'{
+ tbsCertificate =
+ #'TBSCertificate'{
+ subjectPublicKeyInfo =
+ #'SubjectPublicKeyInfo'{
+ algorithm =
+ #'AlgorithmIdentifier'{
+ algorithm = Algo},
+ subjectPublicKey = Key}}}) ->
+ 'RSAPublicKey' = KeyAlg = pubkey_cert_records:supportedPublicKeyAlgorithms(Algo),
+ public_key:der_decode(KeyAlg, Key).
+
+
+
+
create_ets() ->
- create_ets(?CERTS, #cert.id),
+ create_ets(?CREDS, #cred.id),
create_ets(?KEYS, #key.id).
create_ets(Tab, KeyPos) ->
@@ -396,42 +435,46 @@ create_ets(Tab, KeyPos) ->
true
end.
-scan_certs(Dir, Key) ->
- UTC = rvi_common:utc_timestamp(),
- case file:list_dir(Dir) of
- {ok, Fs} ->
- lists:foldl(
+scan_creds(Dir, Key, Cert) ->
+ case filelib:is_dir(Dir) of
+ true ->
+ UTC = rvi_common:utc_timestamp(),
+ filelib:fold_files(
+ Dir,
+ _FileRegexp = "\\.jwt$",
+ _Recursive = false,
fun(F, Acc) ->
- process_cert(filename:join(Dir, F), Key, UTC, Acc)
- end, [], Fs);
- Error ->
- ?warning("Cannot read certs (~p): ~p~n", [Dir, Error]),
- ok
+ process_cred(F, Key, Cert, UTC, Acc)
+ end,
+ []);
+ false ->
+ ?warning("Cannot read creads: ~p not a directory", [Dir]),
+ []
end.
-process_cert(F, Key, UTC, Acc) ->
+process_cred(F, Key, Cert, UTC, Acc) ->
case file:read_file(F) of
{ok, Bin} ->
try authorize_sig:decode_jwt(strip_nl(Bin), Key) of
- {_, Cert} ->
- ?info("Unpacked Cert ~p:~n~p~n", [F, abbrev_payload(Cert)]),
- case process_cert_struct(Cert, Bin, UTC) of
+ {_, Cred} ->
+ ?info("Unpacked Cred ~p:~n~p~n", [F, abbrev_payload(Cred)]),
+ case process_cred_struct(Cred, Bin, UTC, Cert) of
invalid ->
Acc;
- #cert{} = C ->
+ #cred{} = C ->
[C|Acc]
end;
invalid ->
- ?warning("Invalid cert: ~p~n", [F]),
+ ?warning("Invalid cred: ~p~n", [F]),
Acc
catch
error:Exception ->
- ?warning("Cert validation failure (~p): ~p~n",
+ ?warning("Cred validation failure (~p): ~p~n",
[F, Exception]),
Acc
end;
Error ->
- ?warning("Cannot read cert ~p: ~p~n", [F, Error]),
+ ?warning("Cannot read cred ~p: ~p~n", [F, Error]),
Acc
end.
@@ -441,52 +484,76 @@ strip_nl(Bin) ->
_ -> Bin
end.
-process_cert_struct(Cert, Bin) ->
- process_cert_struct(Cert, Bin, rvi_common:utc_timestamp()).
+strip_pem_begin_end(<<"-----BEGIN CERTIFICATE-----\n", Bin/binary>>) ->
+ re:replace(hd(re:split(Bin, <<"-----END CERTIFICATE-----">>, [])),
+ <<"\\v">>, <<>>, [global,{return,binary}]);
+strip_pem_begin_end(Bin) ->
+ Bin.
+
+
+process_cred_struct(Cred, Bin, Cert) ->
+ process_cred_struct(Cred, Bin, rvi_common:utc_timestamp(), to_pem(Cert)).
+
+to_pem(Cert) when is_binary(Cert) ->
+ %% The Peer cert is assumed to be DER encoded.
+ %% The cert in the cred is PEM-encoded
+ PEMCert = public_key:pem_encode([{'Certificate', Cert, not_encrypted}]),
+ strip_pem_begin_end(PEMCert);
+to_pem(Other) ->
+ Other.
+
-process_cert_struct(Cert, Bin, UTC) ->
- try process_cert_struct_(Cert, Bin, UTC)
+process_cred_struct(Cred, Bin, UTC, Cert) ->
+ try process_cred_struct_(Cred, Bin, UTC, Cert)
catch
error:Err ->
- ?warning("Failure processing Cert ~p~n~p",
- [Cert, {Err, erlang:get_stacktrace()}]),
+ ?warning("Failure processing Cred ~p~n~p",
+ [Cred, {Err, erlang:get_stacktrace()}]),
invalid
end.
-process_cert_struct_(Cert, Bin, UTC) ->
- ID = cert_id(Cert),
+process_cred_struct_(Cred, Bin, UTC, DevCert) ->
+ ID = cred_id(Cred),
{ok, Register} = rvi_common:get_json_element(
- [{'OR', ["sources", "register"]}], Cert),
+ [{'OR', ["right_to_register", "sources", "register"]}], Cred),
{ok, Invoke} = rvi_common:get_json_element(
- [{'OR', ["destinations", "invoke"]}], Cert),
+ [{'OR', ["right_to_invoke", "destinations", "invoke"]}], Cred),
{ok, Start} = rvi_common:get_json_element(
- ["validity", "start"], Cert),
+ ["validity", "start"], Cred),
{ok, Stop} = rvi_common:get_json_element(
- ["validity", "stop"], Cert),
+ ["validity", "stop"], Cred),
+ {ok, Cert} = rvi_common:get_json_element(["device_cert"], Cred),
+ case DevCert == undefined orelse Cert == DevCert of
+ false ->
+ ?warning("Wrong device_cert in cred~n~p~n~p", [Cert, DevCert]),
+ invalid;
+ true ->
+ ok
+ end,
?debug("Start = ~p; Stop = ~p~n", [Start, Stop]),
Validity = {Start, Stop},
case check_validity(Start, Stop, UTC) of
true ->
- #cert{id = ID,
- register = Register,
- invoke = Invoke,
+ #cred{id = ID,
+ right_to_register = Register,
+ right_to_invoke = Invoke,
validity = Validity,
jwt = Bin,
- cert = Cert};
+ cred = Cred};
false ->
- %% Cert outdated
- ?warning("Outdated cert: Validity = ~p; UTC = ~p~n",
+ %% Cred outdated
+ ?warning("Outdated cred: Validity = ~p; UTC = ~p~n",
[Validity, UTC]),
invalid
end.
-cert_id(Cert) ->
- case rvi_common:get_json_element(["id"], Cert) of
+cred_id(Cred) ->
+ case rvi_common:get_json_element(["id"], Cred) of
{ok, Id} ->
Id;
{error, undefined} ->
- ?warning("Cert has no ID: ~p~n", [Cert]),
- erlang:now()
+ ?warning("Cred has no ID: ~p~n", [Cred]),
+ erlang:unique_integer([positive,monotonic])
end.
check_validity({Start, Stop}, UTC) ->
@@ -539,14 +606,14 @@ validate_message_1([], _) ->
error(invalid).
validate_service_call_(Svc, Conn) ->
- case lists:filter(fun(C) -> can_invoke(Svc, C) end, cert_recs_by_conn(Conn)) of
+ case lists:filter(fun(C) -> can_invoke(Svc, C) end, cred_recs_by_conn(Conn)) of
[] ->
invalid;
- [#cert{id = ID}|_] ->
+ [#cred{id = ID}|_] ->
{ok, ID}
end.
-can_invoke(Svc, #cert{invoke = In}) ->
+can_invoke(Svc, #cred{right_to_invoke = In}) ->
lists:any(fun(I) -> match_svc(I, Svc) end, In).
pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) ->
@@ -560,6 +627,11 @@ pp_key(#'RSAPublicKey'{modulus = Mod, publicExponent = Pub}) ->
<<"#{'RSAPublicKey'{modulus = ", (abbrev_bin(M))/binary,
", publicExponent = ", P/binary, ", _ = ...}">>.
+abbrev(B) when is_binary(B) ->
+ abbrev_bin(B);
+abbrev(X) ->
+ abbrev_payload(X).
+
abbrev_bin(B) ->
abbrev_bin(B, 20, 6).
@@ -574,29 +646,58 @@ abbrev_bin(B, Len, Part) ->
end
catch error:_ -> B end.
-abbrev_payload(Payload) ->
+abbrev_payload(Payload) when is_list(Payload) ->
try lists:map(fun abbrev_pl/1, Payload)
- catch error:_ -> Payload end.
+ catch error:_ -> Payload end;
+abbrev_payload(PL) ->
+ try abbrev_pl(PL)
+ catch error:_ -> PL end.
abbrev_jwt({Hdr, Body} = X) ->
try {Hdr, abbrev_payload(Body)}
catch error:_ -> X end.
+abbrev_pl(#cred{} = Payload) ->
+ list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B);
+ ([{_,_}|_]=L) -> abbrev_payload(L);
+ (A) -> A
+ end, tuple_to_list(Payload)));
+abbrev_pl(#'RSAPublicKey'{} = Key) ->
+ pp_key(Key);
+abbrev_pl(#'RSAPrivateKey'{} = Key) ->
+ pp_key(Key);
+abbrev_pl(#'OTPCertificate'{} = Cert) ->
+ abbrev_deep_tuple(Cert);
+abbrev_pl(#'OTPTBSCertificate'{} = Cert) ->
+ abbrev_deep_tuple(Cert);
abbrev_pl({<<"keys">> = K, Ks}) ->
{K, [abbrev_k(Ky) || Ky <- Ks]};
-abbrev_pl({<<"certs">> = C, Cs}) ->
- {C, [abbrev_bin(Cert) || Cert <- Cs]};
+abbrev_pl({<<"creds">> = C, Cs}) ->
+ {C, [abbrev_bin(Cred) || Cred <- Cs]};
abbrev_pl({<<"certificate">> = K, C}) ->
{K, abbrev_bin(C)};
+abbrev_pl({<<"device_cert">> = K, C}) ->
+ {K, abbrev_bin(C)};
+abbrev_pl({<<"credential">> = K, C}) ->
+ {K, abbrev_bin(C)};
abbrev_pl({<<"sign">> = K, S}) when is_binary(S) ->
{K, abbrev_bin(S)};
+abbrev_pl({K, B}) when is_atom(K), is_binary(B), byte_size(B) > 50 ->
+ {K, abbrev_bin(B)};
+abbrev_pl({K,[{_,_}|_]=L}) ->
+ {K, abbrev_pl(L)};
abbrev_pl(B) when is_binary(B) ->
abbrev_bin(B, 40, 10);
abbrev_pl(L) when is_list(L) ->
abbrev_payload(L);
+abbrev_pl(T) when is_tuple(T), tuple_size(T) > 2 ->
+ abbrev_deep_tuple(T);
abbrev_pl(X) ->
X.
+abbrev_deep_tuple(T) when is_tuple(T) ->
+ list_to_tuple([abbrev_pl(E) || E <- tuple_to_list(T)]).
+
abbrev_k(K) ->
try lists:map(fun abbrev_elem/1, K)
catch error:_ -> K end.
@@ -606,7 +707,7 @@ abbrev_elem({<<"n">>, Bin}) ->
abbrev_elem(X) ->
X.
-log([ID], Fmt, Args) ->
- rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
-log(_, _, _) ->
+log([ID], L, Fmt, Args) ->
+ rvi_log:log(ID, L, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _, _) ->
ok.
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index c91b216..2291d21 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -18,19 +18,18 @@
terminate/2, code_change/3]).
-export([start_json_server/0]).
--export([get_authorize_jwt/1,
- get_certificates/1,
+-export([get_credentials/1,
sign_message/2,
validate_message/3,
validate_authorization/3,
validate_authorization/4,
- store_certs/3,
+ store_creds/3,
+ store_creds/4,
authorize_local_message/3,
authorize_remote_message/3]).
-export([filter_by_service/3]).
%% for testing & development
--export([sign/1]).
-export([public_key/0, public_key_json/0,
private_key/0]).
@@ -51,7 +50,7 @@ start_link() ->
init([]) ->
?debug("authorize_rpc:init(): called."),
- {Priv, Pub} = authorize_keys:get_key_pair(),
+ {Priv, Pub} = authorize_keys:get_device_key(),
?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv),
authorize_keys:pp_key(Pub)]),
{ok, #st { cs = rvi_common:get_component_specification(),
@@ -74,15 +73,10 @@ validate_message(CompSpec, JWT, Conn) ->
[{jwt, JWT},
{conn, Conn}], [status, message], CompSpec).
-get_authorize_jwt(CompSpec) ->
- ?debug("authorize_rpc:get_authorize_jwt()~n", []),
- rvi_common:request(authorize, ?MODULE, get_authorize_jwt,
- [], [status, jwt], CompSpec).
-
-get_certificates(CompSpec) ->
- ?debug("authorize_rpc:get_certificates()~n", []),
- rvi_common:request(authorize, ?MODULE, get_certificates,
- [], [status, certs], CompSpec).
+get_credentials(CompSpec) ->
+ ?debug("authorize_rpc:get_credentials()~n", []),
+ rvi_common:request(authorize, ?MODULE, get_credentials,
+ [], [status, creds], CompSpec).
validate_authorization(CompSpec, JWT, Conn) ->
?debug("authorize_rpc:validate_authorization():"
@@ -92,19 +86,24 @@ validate_authorization(CompSpec, JWT, Conn) ->
{conn, Conn}],
[status], CompSpec).
-validate_authorization(CompSpec, JWT, Certs, Conn) ->
+validate_authorization(CompSpec, JWT, Creds, Conn) ->
?debug("authorize_rpc:validate_authorization():"
" Conn = ~p~n", [Conn]),
rvi_common:request(authorize, ?MODULE, validate_authorization,
[{jwt, JWT},
- {certs, Certs},
+ {creds, Creds},
{conn, Conn}],
[status], CompSpec).
-store_certs(CompSpec, Certs, Conn) ->
- rvi_common:request(authorize, ?MODULE, store_certs,
- [{certs, Certs},
- {conn, Conn}],
+store_creds(CompSpec, Creds, Conn) ->
+ store_creds(CompSpec, Creds, Conn, undefined).
+
+store_creds(CompSpec, Creds, Conn, PeerCert) ->
+ ?debug("API: store_creds(), PeerCert = ~p", [authorize_keys:abbrev(PeerCert)]),
+ rvi_common:request(authorize, ?MODULE, store_creds,
+ [{creds, Creds},
+ {conn, Conn},
+ {peer_cert, PeerCert}],
[status], CompSpec).
authorize_local_message(CompSpec, Service, Params) ->
@@ -130,11 +129,6 @@ filter_by_service(CompSpec, Services, Conn) ->
{ conn, Conn }],
[status, services], CompSpec).
-%% For testing while developing cert functionality
-sign(Term) ->
- %% Use private key of authorize_rpc to make a JWT token
- gen_server:call(?SERVER, {sign, Term}).
-
public_key() ->
gen_server:call(?SERVER, public_key).
@@ -163,34 +157,30 @@ handle_rpc("validate_message", Args) ->
gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }),
{ok, [ {status, rvi_common:json_rpc_status(Status)},
{message, Msg} ]};
-handle_rpc("get_authorize_jwt", Args) ->
+handle_rpc("get_credentials", Args) ->
LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_authorize_jwt, [LogId] }),
- {ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
-handle_rpc("get_certificates", Args) ->
- LogId = rvi_common:get_json_log_id(Args),
- [ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_certificates, [LogId] }),
+ gen_server:call(?SERVER, { rvi, get_credentials, [LogId] }),
{ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
handle_rpc("validate_authorization", Args) ->
{ok, JWT} = rvi_common:get_json_element(["jwt"], Args),
{ok, Conn} = rvi_common:get_json_element(["connection"], Args),
LogId = rvi_common:get_json_log_id(Args),
CmdArgs =
- case rvi_common:get_json_element(["certs"], Args) of
- {ok, Certs} -> [JWT, Certs, Conn, LogId];
+ case rvi_common:get_json_element(["creds"], Args) of
+ {ok, Creds} -> [JWT, Creds, Conn, LogId];
{error, _} -> [JWT, Conn, LogId]
end,
[ Status | Rem ] =
gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}),
{ok, [ rvi_common:json_rpc_status(Status) | Rem] };
-handle_rpc("store_certs", Args) ->
- {ok, Certs} = rvi_common:get_json_element(["certs"], Args),
+handle_rpc("store_creds", Args) ->
+ {ok, Creds} = rvi_common:get_json_element(["creds"], Args),
{ok, Conn} = rvi_common:get_json_element(["conn"], Args),
+ {ok, PeerCert} = rvi_common:get_json_element(["peer_cert"], Args),
LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, {rvi, store_certs, [Certs, Conn, LogId]}),
+ gen_server:call(?SERVER, {rvi, store_creds, [Creds, Conn, PeerCert, LogId]}),
{ok, [ rvi_common:json_rpc_status(Status) | Rem]};
handle_rpc("authorize_local_message", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
@@ -236,38 +226,35 @@ handle_notification(Other, _Args) ->
%%
handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) ->
Sign = authorize_sig:encode_jwt(Msg, Key),
- log(LogId, "signed", []),
+ log(LogId, result, "signed", []),
{reply, [ ok, Sign ], State};
handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) ->
try begin Res = authorize_keys:validate_message(JWT, Conn),
- log(LogId, "validated", []),
+ log(LogId, result, "validated", []),
{reply, [ok, Res], State}
end
catch
error:_Err ->
- log(LogId, "validation FAILED", []),
+ log(LogId, error, "validation FAILED", []),
{reply, [not_found], State}
end;
-handle_call({rvi, get_authorize_jwt, [_LogId]}, _From, State) ->
- {reply, [ ok, authorize_keys:authorize_jwt() ], State};
-
-handle_call({rvi, get_certificates, [_LogId]}, _From, State) ->
- {reply, [ ok, authorize_keys:get_certificates() ], State};
+handle_call({rvi, get_credentials, [_LogId]}, _From, State) ->
+ {reply, [ ok, authorize_keys:get_credentials() ], State};
handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) ->
- %% The authorize JWT contains the public key used to sign the cert
+ %% The authorize JWT contains the public key used to sign the cred
?debug(
"authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
[]),
try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
{_Header, Keys} ->
- log(LogId, "auth jwt validated", []),
+ log(LogId, result, "auth jwt validated", []),
KeyStructs = get_json_element(["keys"], Keys, []),
authorize_keys:save_keys(KeyStructs, Conn),
{reply, [ok], State};
invalid ->
?warning("Invalid auth JWT from ~p~n", [Conn]),
- log(LogId, "auth jwt INVALID", []),
+ log(LogId, error, "auth jwt INVALID", []),
{reply, [not_found], State}
catch
error:_Err ->
@@ -275,22 +262,22 @@ handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, Sta
{reply, [not_found], State}
end;
-handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _From, State) ->
- %% The authorize JWT contains the public key used to sign the cert
+handle_call({rvi, validate_authorization, [JWT, Creds, Conn | [_] = LogId] }, _From, State) ->
+ %% The authorize JWT contains the public key used to sign the cred
?debug(
"authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
[]),
try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
{_Header, Keys} ->
- log(LogId, "auth jwt validated", []),
+ log(LogId, result, "auth jwt validated", []),
KeyStructs = get_json_element(["keys"], Keys, []),
?debug("KeyStructs = ~p~n", [KeyStructs]),
authorize_keys:save_keys(KeyStructs, Conn),
- do_store_certs(Certs, Conn, LogId),
+ do_store_creds(Creds, Conn, undefined, LogId),
{reply, [ok], State};
invalid ->
?warning("Invalid auth JWT from ~p~n", [Conn]),
- log(LogId, "auth jwt INVALID", []),
+ log(LogId, error, "auth jwt INVALID", []),
{reply, [not_found], State}
catch
error:_Err ->
@@ -298,21 +285,17 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _F
{reply, [not_found], State}
end;
-handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) ->
- do_store_certs(Certs, Conn, LogId),
+handle_call({rvi, store_creds, [Creds, Conn, PeerCert | LogId]}, _From, State) ->
+ do_store_creds(Creds, Conn, PeerCert, LogId),
{reply, [ok], State};
handle_call({rvi, authorize_local_message, [Service, _Params | LogId] } = R, _From, State) ->
?debug("authorize_rpc:handle_call(~p)~n", [R]),
- case authorize_keys:find_cert_by_service(Service) of
- {ok, {ID, _Cert}} ->
- %% Msg = Params ++ [{<<"certificate">>, Cert}],
- %% ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n",
- %% [authorize_keys:abbrev_payload(Msg)]),
- %% Sig = authorize_sig:encode_jwt(Msg, Key),
- log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]),
+ case authorize_keys:find_cred_by_service(Service) of
+ {ok, {ID, _Cred}} ->
+ log(LogId, result, "auth msg: Cred=~s", [authorize_keys:abbrev_bin(ID)]),
{reply, [ok], State};
_ ->
- log(LogId, "NO CERTS for ~s", [Service]),
+ log(LogId, error, "NO CREDS for ~s", [Service]),
{reply, [ not_found ], State}
end;
@@ -330,11 +313,11 @@ handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]},
?debug("authorize_rpc:authorize_remote_message(): parameters: ~p~n", [Parameters]),
case authorize_keys:validate_service_call(SvcName, {IP, Port}) of
invalid ->
- log(LogId, "remote msg REJECTED", []),
+ log(LogId, error, "remote msg REJECTED", []),
{reply, [ not_found ], State};
- {ok, CertID} ->
- ?debug("validated Cert ID=~p", [CertID]),
- log(LogId, "remote msg allowed: Cert=~s", [CertID]),
+ {ok, CredID} ->
+ ?debug("validated Cred ID=~p", [CredID]),
+ log(LogId, result, "remote msg allowed: Cred=~s", [CredID]),
{reply, [ok], State}
end;
@@ -370,11 +353,12 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-do_store_certs(Certs, Conn, LogId) ->
- ?debug("Storing ~p certs for conn ~p~n", [length(Certs), Conn]),
- lists:foreach(fun(Cert) ->
- store_cert(Cert, Conn, LogId)
- end, Certs).
+do_store_creds(Creds, Conn, PeerCert, LogId) ->
+ ?debug("Storing ~p creds for conn ~p~nPeerCert = ~w",
+ [length(Creds), Conn, authorize_keys:abbrev(PeerCert)]),
+ lists:foreach(fun(Cred) ->
+ store_cred(Cred, Conn, PeerCert, LogId)
+ end, Creds).
get_json_element(Path, JSON, Default) ->
case rvi_common:get_json_element(Path, JSON) of
@@ -384,26 +368,26 @@ get_json_element(Path, JSON, Default) ->
Default
end.
-store_cert(Cert, Conn, LogId) ->
- case authorize_sig:decode_jwt(Cert, authorize_keys:provisioning_key()) of
- {_CHeader, CertStruct} ->
- case authorize_keys:save_cert(CertStruct, Cert, Conn, LogId) of
+store_cred(CredJWT, Conn, PeerCert, LogId) ->
+ case authorize_sig:decode_jwt(CredJWT, authorize_keys:provisioning_key()) of
+ {_CHeader, CredStruct} ->
+ case authorize_keys:save_cred(CredStruct, CredJWT, Conn, PeerCert, LogId) of
ok ->
ok;
{error, Reason} ->
?warning(
- "Couldn't store certificate from ~p: ~p~n",
+ "Couldn't store credential from ~p: ~p~n",
[Conn, Reason]),
ok
end;
invalid ->
- ?warning("Invalid certificate from ~p~n", [Conn]),
+ log(LogId, warning, "credential INVALID (~p)", [Conn]),
ok
end.
-log([ID], Fmt, Args) ->
- rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
-log(_, _, _) ->
+log([ID], Lvl, Fmt, Args) ->
+ rvi_log:log(ID, Lvl, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _, _) ->
ok.
%% check_msg(Checks, Params) ->
diff --git a/components/dlink/src/dlink.app.src b/components/dlink/src/dlink.app.src
index 6666333..7dca4d5 100644
--- a/components/dlink/src/dlink.app.src
+++ b/components/dlink/src/dlink.app.src
@@ -14,7 +14,11 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ authorize,
+ service_edge,
+ service_discovery,
+ schedule
]},
{mod, {dlink_app, []}},
{start_phases, [{announce, []}]},
diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl
index 1ad82b2..6a68e48 100644
--- a/components/dlink/src/dlink_data_json.erl
+++ b/components/dlink/src/dlink_data_json.erl
@@ -1,6 +1,10 @@
-module(dlink_data_json).
--compile(export_all).
+-export([encode/2,
+ decode/3]).
+-export([init/1,
+ port_options/0]).
+
init(_Opts) ->
[].
@@ -8,41 +12,25 @@ init(_Opts) ->
port_options() ->
[list, {packet, 0}].
-decode(Msg, St) ->
- {Msg1, St1} = append(St, Msg),
- try exo_json:decode(St1, Msg1) of
- {done, {ok, {struct, Elems}}, Rest} ->
- {ok, [Elems], Rest};
- {done, {ok, {array, Structs}}, Rest} ->
- {ok, [Str || {struct, Str} <- Structs], Rest};
- {done, {error, Reason}, Rest} ->
- {error, Reason, Rest};
- {more, Cont} ->
- {more, Cont}
- catch
- error:Error ->
- {error, Error, St1};
- exit:Exit ->
- {error, Exit, St1}
- end.
+decode(Msg, F, St) when is_function(F, 1) ->
+ jsx_decode_stream(Msg, F, St).
+
+encode(Msg, St) ->
+ {ok, rvi_common:term_to_json(Msg), St}.
-encode({struct, _} = JSON, St) ->
- try {ok, exo_json:encode(JSON), St}
- catch exit:Error -> erlang:error(Error)
- end;
-encode({array, Structs} = JSON, St) ->
- case lists:all(fun({struct,_}) -> true;
- (_) -> false
- end, Structs) of
- true ->
- {ok, exo_json:encode(JSON), St};
- false ->
- erlang:error(invalid_json_structure)
+jsx_decode_stream(Data, F, St) ->
+ case jsx_decode(Data, St) of
+ {incomplete, Cont} ->
+ {ok, Cont};
+ {with_tail, Elems, <<>>} ->
+ F(Elems),
+ {ok, []};
+ {with_tail, Elems, Rest} ->
+ F(Elems),
+ jsx_decode_stream(Rest, F, [])
end.
-append([], Msg) ->
- {Msg, []};
-append([_|_] = St, Msg) ->
- {St ++ Msg, []};
-append(Cont, Msg) when is_tuple(Cont) ->
- {Msg, Cont}.
+jsx_decode(Data, []) ->
+ jsx:decode(Data, [stream, return_tail]);
+jsx_decode(Data, Cont) when is_function(Cont, 1) ->
+ Cont(Data).
diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl
index 9510a53..253da55 100644
--- a/components/dlink/src/dlink_data_msgpack.erl
+++ b/components/dlink/src/dlink_data_msgpack.erl
@@ -1,6 +1,10 @@
-module(dlink_data_msgpack).
--compile(export_all).
+-export([init/1,
+ decode/3,
+ encode/2]).
+
+-export([port_options/0]).
-record(st, {opts = [{allow_atom, pack},
{enable_str, true},
@@ -13,15 +17,16 @@ port_options() ->
init(_CS) ->
#st{}.
-decode(Msg0, #st{buf = Prev, opts = Opts} = St) ->
+decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) ->
Msg = append(Prev, Msg0),
case msgpack:unpack_stream(Msg, Opts) of
{error, incomplete} ->
- {more, St#st{buf = Msg}};
+ {ok, St#st{buf = Msg}};
{error, E} ->
- {error, E, St};
+ {error, E};
{Decoded, Rest} when is_binary(Rest) ->
- {ok, Decoded, St#st{buf = Rest}}
+ F(Decoded),
+ decode(Rest, F, St#st{buf = <<>>})
end.
encode({struct, Elems}, #st{opts = Opts} = St) ->
diff --git a/components/dlink/src/dlink_data_rvi.erl b/components/dlink/src/dlink_data_rvi.erl
deleted file mode 100644
index 01131be..0000000
--- a/components/dlink/src/dlink_data_rvi.erl
+++ /dev/null
@@ -1,123 +0,0 @@
--module(dlink_data_rvi).
-
--compile(export_all).
-
--record(dlink_data_rvi, {need, buf}).
-
--define(MAX_LINE, 79).
-
-init(_Opts) ->
- undefined.
-
-port_options() ->
- [].
-
-encode(Elems, St) ->
- Bin = encode_(Elems, <<>>),
- Sz = byte_size(Bin),
- {ok, <<"&RVI|",
- (integer_to_binary(Sz, 16))/binary, "\n",
- Bin/binary>>, St}.
-
-encode_([{Key, Val}|T], Acc) ->
- {Type, ValBin} = encode_val(Val),
- Bin = encode_elem(to_bin(Key), Type, ValBin),
- encode_(T, <<Acc/binary, Bin/binary>>);
-encode_([], Acc) ->
- Acc.
-
-encode_val(V) when is_binary(V) -> {$B, V};
-encode_val(V) when is_integer(V) -> {$i, integer_to_binary(V,16)};
-encode_val(V) when is_atom(V) -> {$a, atom_to_binary(V, latin1)};
-encode_val(V) when is_float(V) ->
- Bin = <<V/float>>,
- {$f, Bin};
-encode_val({T,_} = J) when T==array; T==struct ->
- JSON = exo_json:encode(J),
- {$J, iolist_to_binary(JSON)};
-encode_val([T|_] = L) when is_tuple(T) ->
- {$L, encode_(L, <<>>)}.
-
-decode_value($B, Bin) -> Bin;
-decode_value($i, Bin) -> binary_to_integer(Bin, 16);
-decode_value($f, <<F/float>>) -> F;
-decode_value($a, Bin) -> binary_to_existing_atom(Bin, latin1);
-decode_value($J, Bin) ->
- {ok, Obj} = exo_json:decode_string(binary_to_list(Bin)),
- Obj;
-decode_value($L, Bin) ->
- decode_packet(Bin).
-
-encode_elem(Key, Type, Bin) ->
- BSz = byte_size(Bin),
- case byte_size(Key) + BSz of
- Sz when Sz =< 78, Type >= $a, Type =< $z ->
- <<Key/binary, "|", Type, ":", Bin/binary, "\n">>;
- _ ->
- <<Key/binary, "|", Type, "|",
- (integer_to_binary(BSz+1,16))/binary, "\n",
- Bin/binary, "\n">>
- end.
-
-decode(<<"&RVI|", Rest/binary>>, undefined) ->
- case erlang:decode_packet(line, Rest, [{line_length, 79}]) of
- {more, _} ->
- {more, Rest};
- {ok, Ln, Rest1} ->
- LSz = byte_size(Ln),
- LSz1 = LSz-1,
- <<Size:LSz1/binary, "\n">> = Ln,
- Bytes = binary_to_integer(Size, 16),
- case Rest1 of
- <<Pkt:Bytes/binary, Tail/binary>> ->
- {ok, decode_packet(Pkt), Tail};
- _ ->
- {more, #dlink_data_rvi{need = Bytes, buf = Rest1}}
- end
- end;
-decode(Data, #dlink_data_rvi{need = Bytes, buf = Buf} = St) ->
- case <<Buf/binary, Data/binary>> of
- <<Pkt:Bytes/binary, Tail/binary>> ->
- {ok, decode_packet(Pkt), Tail};
- Buf1 ->
- {more, St#dlink_data_rvi{buf = Buf1}}
- end;
-decode(_, _St) ->
- {error, unknown, undefined}.
-
-decode_packet(<<>>) ->
- [];
-decode_packet(P) ->
- {ok, L, Rest} = erlang:decode_packet(line, P, [{line_length, ?MAX_LINE}]),
- case split_line(L) of
- {Key, Type, simple, Data} ->
- [{Key, decode_value(Type, Data)}|decode_packet(Rest)];
- {Key, Type, Size} ->
- Size1 = Size-1,
- <<VBin:Size1/binary, "\n", Rest1/binary>> = Rest,
- [{Key, decode_value(Type, VBin)}|decode_packet(Rest1)]
- end.
-
-to_bin(V) when is_atom(V) -> atom_to_binary(V, latin1);
-to_bin(V) when is_binary(V) -> V;
-to_bin(V) when is_list(V) -> iolist_to_binary(V).
-
-split_line(L) ->
- split_line(L, <<>>).
-
-split_line(<<"\\", $|, Rest/binary>>, Acc) ->
- split_line(Rest, <<Acc/binary, $|>>);
-split_line(<<"|", T, ":", Rest/binary>>, Acc) ->
- {Acc, T, simple, remove_nl(Rest)};
-split_line(<<"|", T, "|", Rest/binary>>, Acc) ->
- SzBin = remove_nl(Rest),
- {Acc, T, binary_to_integer(SzBin, 16)};
-split_line(<<H, T/binary>>, Acc) ->
- split_line(T, <<Acc/binary, H>>).
-
-
-remove_nl(B) ->
- Sz = byte_size(B),
- Sz1 = Sz-1,
- <<V:Sz1/binary, "\n">> = B,
- V.
diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl
index 85a8663..6b3a64e 100644
--- a/components/dlink_bt/src/bt_connection.erl
+++ b/components/dlink_bt/src/bt_connection.erl
@@ -36,16 +36,20 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
remote_addr = "00:00:00:00:00:00",
channel = 0,
- rfcomm_ref = undefined,
- listen_ref = undefined,
+ rfcomm_ref,
+ listen_ref,
+ accept_ref,
mode = bt,
- mod = undefined,
- func = undefined,
- args = undefined
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ mod,
+ func,
+ args
}).
%%%===================================================================
@@ -124,11 +128,15 @@ is_connection_up(Addr, Channel) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) ->
+init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) ->
%% connect will block on rfcomm:open, so cast to self
%% in order to let init return.
+ ?debug("init({connect, ~p, ~p, ~p, ~p, ~p, CS})",
+ [BTAddr, Channel, Mode, Mod, Fun]),
gen_server:cast(self(), connect),
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
+ PktSt = PktMod:init(CS),
{ok, #st{
remote_addr = BTAddr,
channel = Channel,
@@ -136,36 +144,47 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) ->
mode = Mode,
mod = Mod,
func = Fun,
- args = Arg
+ args = CS,
+ packet_mod = PktMod,
+ packet_st = PktSt
}};
-init({accept, Channel, ListenRef, Mode, Mod, Fun, Arg}) ->
- ARef = case Mode of
- tcp ->
- {ok, R} = exo_socket:async_accept(ListenRef),
- R;
- bt ->
- {ok, R} = rfcomm:accept(ListenRef, infinity, self()),
- R
- end,
+init({accept, Channel, ListenRef, Mode, Mod, Fun, CS}) ->
+ ?debug("init({accept, ~p, ~p, ~p, ~p, ~p, CS})",
+ [Channel, ListenRef, Mode, Mod, Fun]),
?debug("bt_connection:init(accept): self(): ~p", [self()]),
?debug("bt_connection:init(accept): Channel: ~p", [Channel]),
?debug("bt_connection:init(accept): ListenRef: ~p", [ListenRef]),
- ?debug("bt_connection:init(accept): AcceptRef: ~p", [ARef]),
?debug("bt_connection:init(accept): Module: ~p", [Mod]),
?debug("bt_connection:init(accept): Function: ~p", [Fun]),
- ?debug("bt_connection:init(accept): Arg: ~p", [Arg]),
-
+ ?debug("bt_connection:init(accept): Arg: ~p", [CS]),
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
+ PktSt = PktMod:init(CS),
+ ARef = case Mode of
+ bt ->
+ %% Expected message is
+ %% {rfcomm, ARef, {accept, Addr, Chan}}
+ {ok, R} = rfcomm:accept(ListenRef, infinity, self()),
+ R;
+ tcp ->
+ %% -1 represents infinity
+ %% Expected message is
+ %% {inet_async,LSock,Ref,{ok,Socket}}
+ {ok, R} = prim_inet:async_accept(ListenRef, -1),
+ R
+ end,
{ok, #st{
channel = Channel,
- rfcomm_ref = ARef,
- listen_ref = ListenRef,
+ rfcomm_ref = undefined,
+ accept_ref = ARef,
mode = Mode,
mod = Mod,
func = Fun,
- args = Arg
+ args = CS,
+ packet_mod = PktMod,
+ packet_st = PktSt
}}.
@@ -219,7 +238,7 @@ handle_cast(connect, #st {
bt ->
rfcomm:open(BTAddr, Channel);
tcp ->
- exo_socket:connect(BTAddr, Channel)
+ gen_tcp:connect(BTAddr, Channel, bt_listener:sock_opts())
end,
case ConnRes of
{ok, ConnRef} ->
@@ -246,13 +265,18 @@ handle_cast(connect, #st {
{ stop, { connect_failed, Error}, St }
end;
-handle_cast({send, Data}, St) ->
- ?debug("~p:handle_call(send): Sending: ~p",
- [ ?MODULE, Data]),
-
- rfcomm:send(St#st.rfcomm_ref, Data),
-
- {noreply, St};
+handle_cast({send, Data}, #st{mode = Mode,
+ rfcomm_ref = Sock,
+ packet_mod = PMod, packet_st = PSt} = St) ->
+ ?debug("handle_cast(send): Sending: ~p", [Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ Res = case Mode of
+ bt -> rfcomm:send(Sock, Encoded);
+ tcp -> gen_tcp:send(Sock, Encoded)
+ end,
+ ?debug("send Res = ~p", [Res]),
+ {noreply, St#st{packet_st = PSt1}};
handle_cast(_Msg, State) ->
?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
@@ -271,35 +295,43 @@ handle_cast(_Msg, State) ->
%% An accept reference we've setup now has accetpted an
%% incoming connection.
-handle_info({rfcomm, _ARef, { accept, BTAddr, _ } },
+handle_info({rfcomm, ARef, { accept, BTAddr, _ } },
#st { mod = Mod,
func = Fun,
args = Arg,
+ listen_ref = LRef,
+ accept_ref = ARef,
channel = Channel } = St) ->
?info("~p:handle_info(): bt_connection from ~w:~w\n",
[?MODULE, BTAddr,Channel]),
-
+ bt_listener:accept_ack(ok, LRef, BTAddr, Channel),
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
{ noreply, St#st { remote_addr = BTAddr,
- channel = Channel } };
+ channel = Channel,
+ accept_ref = undefined} };
handle_info({rfcomm, _ConnRef, {data, Data}},
#st { remote_addr = BTAddr,
channel = Channel,
+ packet_mod = PMod,
+ packet_st = PSt,
mod = Mod,
- func = Fun,
- args = Arg } = State) ->
+ func = Fun } = State) ->
?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]),
?info("~p:handle_info(data): ~p:~p -> ~p:~p",
[ ?MODULE, BTAddr, Channel, Mod, Fun]),
- Self = self(),
- spawn(fun() -> Mod:Fun(Self, BTAddr, Channel,
- data, Data, Arg) end),
-
- {noreply, State};
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed: ~p", [Reason]),
+ {stop, Reason, State}
+ end;
handle_info({rfcomm, ConnRef, closed},
@@ -339,13 +371,32 @@ handle_info({rfcomm, ConnRef, error},
bt_connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
+handle_info({tcp, Sock, Data}, #st{remote_addr = IP,
+ channel = Port,
+ rfcomm_ref = Sock,
+ packet_mod = PMod,
+ packet_st = PSt} = St) ->
+ ?debug("handle_info(data): From: ~p:~p", [IP, Port]),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, St)
+ end, PSt) of
+ {ok, PSt1} ->
+ inet:setopts(Sock, [{active, once}]),
+ {noreply, St#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, St}
+ end;
handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod,
func = Fun,
args = Arg} = St) ->
?debug("~p:handle_info(~p)", [?MODULE, Msg]),
+ inet_db:register_socket(Sock, inet_tcp),
+ inet:setopts(Sock, [{active, once}]),
{ok, {BTAddr, Channel}} = inet:peername(Sock),
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
- {noreply, St};
+ {noreply, St#st{rfcomm_ref = Sock,
+ remote_addr = BTAddr}};
handle_info(_Info, State) ->
?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]),
@@ -380,3 +431,14 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
+
+handle_elements(Elements, #st{remote_addr = BTAddr,
+ channel = Channel,
+ mod = Mod,
+ func = Fun,
+ args = Arg}) ->
+ ?debug("data complete; processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg).
diff --git a/components/dlink_bt/src/bt_connection_manager.erl b/components/dlink_bt/src/bt_connection_manager.erl
index 1b1e049..e86dda3 100644
--- a/components/dlink_bt/src/bt_connection_manager.erl
+++ b/components/dlink_bt/src/bt_connection_manager.erl
@@ -67,6 +67,7 @@ find_connection_by_address(BTAddr, Channel) ->
%% @end
%%--------------------------------------------------------------------
start_link() ->
+ ?debug("start_link()", []),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl
index 3711652..a1f1a49 100644
--- a/components/dlink_bt/src/bt_listener.erl
+++ b/components/dlink_bt/src/bt_listener.erl
@@ -14,13 +14,16 @@
-include_lib("rvi_common/include/rvi_common.hrl").
-export([start_link/1,
add_listener/1,
- remove_listener/1]).
+ remove_listener/1,
+ accept_ack/4]).
+-export([sock_opts/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2]).
-record(st, {listeners = [],
acceptors = [],
+ mode = bt,
cs = #component_spec{}
}).
@@ -34,41 +37,33 @@ add_listener(Channel) ->
remove_listener(Channel) ->
gen_server:call(?MODULE, {remove_listener, Channel}).
+accept_ack(Result, LRef, Addr, Chan) ->
+ ?MODULE ! {accept, self(), LRef, Addr, Chan, Result},
+ ok.
+
+sock_opts() ->
+ [binary, {active, once}, {packet, 0}].
+
init(Mode) ->
{ok, #st {
listeners = [],
acceptors = [],
+ mode = Mode,
cs = rvi_common:set_value(bt_mode, Mode, rvi_common:get_component_specification())
}
}.
-handle_call({add_listener, Channel}, _From, #st{cs = CS} = St) ->
+handle_call({add_listener, Channel}, _From, #st{mode = Mode,
+ listeners = Ls} = St) ->
?info("bt_listener:add_listener(): Setting up listener on channel ~p", [ Channel]),
- Mode = rvi_common:get_value(bt_mode, bt, CS),
case listen(Mode, Channel) of
{ok, ListenRef} ->
?info("bt_listener:add_listener(): ListenRef: ~p", [ ListenRef]),
- {ok, ConnPid} = bt_connection:accept(Channel,
- ListenRef,
- Mode,
- dlink_bt_rpc,
- handle_socket,
- CS),
-
-
-
- %%{ noreply, NSt} = handle_info({accept, ListenRef, Channel, ok}, St),
-
- { reply,
- ok,
- St#st {
- acceptors = [ { Channel, ConnPid } | St#st.acceptors ],
- listeners = [ { ListenRef, Channel } | St#st.listeners ]
- }
- };
+ St1 = St#st{listeners = [{ListenRef, Channel}|Ls]},
+ {reply, ok, start_acceptor(ListenRef, Channel, St1)};
Err ->
?info("bt_listener:add_listener(): Failed: ~p", [ Err]),
@@ -86,12 +81,19 @@ handle_call(_Msg, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) ->
+handle_info({accept, Pid, ListenRef, BTAddr, Channel, ok},
+ #st{listeners = Ls,
+ acceptors = As} = St) ->
%% Fire up a new process to handle the
%% future incoming connection.
?info("bt_listener:accept(): ListenRef: ~p", [ ListenRef]),
?info("bt_listener:accept(): Remote: ~p-~p", [BTAddr, Channel ]),
+ case lists:keyfind(ListenRef, 1, Ls) of
+ {_, Channel} ->
+ As1 = lists:keydelete(Pid, 2, As),
+ {noreply, start_acceptor(ListenRef, Channel,
+ St#st{acceptors = As1})};
%% Must fix multiple acceptors in bt_linux_drv.c
%% {ok, ConnPid} = bt_connection:accept(Channel,
%% ListenRef,
@@ -101,7 +103,9 @@ handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) ->
%% {noreply, St#st {acceptors = [ { Channel, ConnPid } | St#st.acceptors ]}};
- {noreply, St };
+ _ ->
+ {noreply, St }
+ end;
handle_info(_Msg, State) ->
?info("bt_listener:handle_info(): Unknown: ~p", [ _Msg]),
@@ -115,4 +119,16 @@ terminate(_Reason, _State) ->
listen(bt, Channel) ->
rfcomm:listen(Channel);
listen(tcp, Port) ->
- exo_socket:listen(Port).
+ gen_tcp:listen(Port, sock_opts()).
+
+start_acceptor(ListenRef, Channel, #st{mode = Mode,
+ acceptors = As,
+ cs = CS} = St) ->
+ ?debug("start acceptor(~p, ~p, St)", [ListenRef, Channel]),
+ {ok, ConnPid} = bt_connection:accept(Channel,
+ ListenRef,
+ Mode,
+ dlink_bt_rpc,
+ handle_socket,
+ CS),
+ St#st{acceptors = [{Channel, ConnPid}|As]}.
diff --git a/components/dlink_bt/src/dlink_bt.app.src b/components/dlink_bt/src/dlink_bt.app.src
index e63d1ac..ba62655 100644
--- a/components/dlink_bt/src/dlink_bt.app.src
+++ b/components/dlink_bt/src/dlink_bt.app.src
@@ -17,7 +17,7 @@
{applications, [
kernel,
stdlib,
- rvi_common,
+ dlink,
bt
]},
{mod, { dlink_bt_app, []}},
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index ab5f8ac..542fd00 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -34,7 +34,7 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
+-include_lib("rvi_common/include/rvi_dlink_bin.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BT_CHANNEL, 1).
@@ -60,6 +60,7 @@
}).
-record(st, {
+ mode = bt, %% tcp | bt
cs = #component_spec{}
}).
@@ -93,9 +94,11 @@ init([]) ->
{ keypos, #connection_entry.connection }]),
CS = rvi_common:get_component_specification(),
+ Mode = get_mode(CS),
service_discovery_rpc:subscribe(CS, ?MODULE),
{ok, #st {
+ mode = Mode,
cs = CS
}
}.
@@ -105,15 +108,19 @@ start_json_server() ->
start_connection_manager() ->
+ ?debug("start_connection_manager()", []),
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts,
- [],
- CompSpec),
+ ServerOpts = get_server_opts(CompSpec),
+ start_connection_manager(ServerOpts, CompSpec).
+
+start_connection_manager([], _) ->
+ ?debug("No BT server options set; start only the connection manager", []),
+ bt_connection_manager:start_link();
+start_connection_manager(ServerOpts, CompSpec) ->
%% Retrieve the channel we should use
- Mode = get_mode(BertOpts),
- Channel = get_channel(Mode, BertOpts),
+ ?debug("ServerOpts = ~p", [ServerOpts]),
+ Mode = get_mode(ServerOpts),
+ Channel = get_channel(Mode, ServerOpts),
?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]),
@@ -124,6 +131,7 @@ start_connection_manager() ->
bt:start(),
bt:debug(debug);
tcp ->
+ ?debug("Mode == tcp; not starting bt driver", []),
ok
end,
bt_listener:start_link(Mode),
@@ -146,11 +154,21 @@ start_connection_manager() ->
[],
CompSpec),
- setup_persistent_connections_(PersistentConnections, CompSpec),
+ setup_persistent_connections_(PersistentConnections, Mode, CompSpec),
ok.
-get_mode(Opts) ->
+get_server_opts(CS) when element(1, CS) == component_spec ->
+ {ok, ServerOpts } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ server_opts,
+ [],
+ CS),
+ ServerOpts.
+
+get_mode(CS) when element(1, CS) == component_spec ->
+ get_mode(get_server_opts(CS));
+get_mode(Opts) when is_list(Opts) ->
proplists:get_value(test_mode, Opts, bt).
get_channel(tcp, Opts) ->
@@ -159,15 +177,15 @@ get_channel(bt, Opts) ->
proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL).
-setup_persistent_connections_([ ], _CompSpec) ->
+setup_persistent_connections_([ ], _, _CompSpec) ->
ok;
-setup_persistent_connections_([ BTAddress | T], CompSpec) ->
+setup_persistent_connections_([ BTAddress | T], Mode, CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]),
- [ BTAddr, Channel] = string:tokens(BTAddress, "-"),
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
- setup_persistent_connections_(T, CompSpec),
+ [ BTAddr, Channel] = string:tokens(BTAddress, "-:"), %% Addr-Chan | IP:Port
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
+ setup_persistent_connections_(T, Mode, CompSpec),
ok.
@@ -213,7 +231,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
%%
%% Connect to a remote RVI node.
%%
-connect_remote(BTAddr, Channel, CompSpec) ->
+connect_remote(BTAddr, Channel, Mode, CompSpec) ->
case bt_connection_manager:find_connection_by_address(BTAddr, Channel) of
{ ok, _Pid } ->
already_connected;
@@ -225,7 +243,7 @@ connect_remote(BTAddr, Channel, CompSpec) ->
%%FIXME
%% Setup a genserver around the new connection.
- case bt_connection:connect(BTAddr, Channel,
+ case bt_connection:connect(BTAddr, Channel, Mode,
?MODULE, handle_socket, CompSpec ) of
{ ok, Pid } ->
?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p",
@@ -239,45 +257,33 @@ connect_remote(BTAddr, Channel, CompSpec) ->
end
end.
-
-connect_and_retry_remote( BTAddr, Channel, CompSpec) ->
+connect_and_retry_remote( BTAddr, Channel, Mode, CompSpec) ->
?info("dlink_bt:connect_and_retry_remote(): ~p:~p",
[ BTAddr, Channel]),
- case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of
- ok -> ok;
-
+ CS = start_log(<<"conn">>, "connect ~s:~s", [BTAddr, Channel], CompSpec),
+ case connect_remote(BTAddr, list_to_integer(Channel), Mode, CS) of
+ ok ->
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p",
- [BTAddr, Channel, Err]),
-
- ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
- [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec),
-
+ [BTAddr, Channel, Err]),
+ ?notice("dlink_bt:connect_and_retry_remote(~p:~p):"
+ " Will try again in ~p sec",
+ [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
+ setup_reconnect_timer(
+ ?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CS),
not_available
end.
-
-
announce_local_service_(_CompSpec, [], _Service, _Availability) ->
ok;
announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- { struct,
- [
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_TRANSACTION_ID, 3},
- { ?DLINK_ARG_SIGNATURE, JWT }
- ]
- })),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = bt_connection:send(ConnPid, Msg),
?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -299,24 +305,23 @@ announce_local_service_(CompSpec, Service, Availability) ->
process_data(_FromPid, RemoteBTAddr, RemoteChannel, ProtocolMod, Data, CompSpec) ->
?debug("dlink_bt:receive_data(): SetupAddress: {~p, ~p}", [ RemoteBTAddr, RemoteChannel ]),
?debug("dlink_bt:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
- Proto = list_to_atom(ProtocolMod),
- Proto:receive_message(CompSpec, base64:decode_to_string(Data)),
+ Proto = list_to_existing_atom(ProtocolMod),
+ Proto:receive_message(CompSpec, {RemoteBTAddr, RemoteChannel}, Data),
ok.
-availability_msg(Availability, Services) ->
- {struct, [{?DLINK_ARG_STATUS, status_string(Availability)},
- {?DLINK_ARG_SERVICES, {array, Services}}]}.
+availability_msg(Availability, Services, CompSpec) ->
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE},
+ {?DLINK_ARG_STATUS, status_string(Availability)},
+ {?DLINK_ARG_SERVICES, Services}
+ | log_id_tail(CompSpec)].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
- {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg),
- {ok, Svcs} = rvi_common:get_json_element([?DLINK_ARG_SERVICES], Msg),
+process_announce(Avail, Svcs, FromPid, Addr, Channel, CompSpec) ->
?debug("dlink_bt_rpc:service_announce(~p): Address: ~p:~p", [Avail, Addr, Channel]),
- ?debug("dlink_bt_rpc:service_announce(~p): TransactionID: ~p", [Avail, TID]),
?debug("dlink_bt_rpc:service_announce(~p): Services: ~p", [Avail, Svcs]),
case Avail of
?DLINK_ARG_AVAILABLE ->
@@ -327,23 +332,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
end.
-process_authorize(FromPid,
- PeerBTAddr,
- PeerBTChannel,
- TransactionID,
- RemoteAddress,
- RemoteChannel,
- Protocol,
- Certificates,
- Signature,
- CompSpec) ->
-
+process_authorize(FromPid, PeerBTAddr, PeerBTChannel,
+ RemoteAddress, RemoteChannel, Protocol,
+ Credentials, CompSpec) ->
?info("dlink_bt:authorize(): Peer Address: ~p:~p", [PeerBTAddr, PeerBTChannel ]),
?info("dlink_bt:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemoteChannel ]),
- ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
- ?debug("dlink_bt:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_bt:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_bt:authorize(): Signature: ~p", [ Signature ]),
+ ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
+ ?debug("dlink_bt:authorize(): Credentials: ~p", [ Credentials ]),
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the conneciton manager, this is an incoming connection
@@ -351,59 +346,42 @@ process_authorize(FromPid,
%% a service announce
Conn = {RemoteAddress, RemoteChannel},
- case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
- Payload, CompSpec) ->
+ Elems, CompSpec) ->
- {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)),
?debug("dlink_bt:data(): Got ~p", [ Elems ]),
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
+
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemoteChannel,
RVIProtocol,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
- Certificates =
- case CertificatesTmp of
- { array, C} -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerBTAddr, RemoteChannel,
- TransactionID, RemoteAddress, RemoteChannel,
- RVIProtocol, Certificates, Signature, CompSpec);
-
-
+ RemoteAddress, RemoteChannel,
+ RVIProtocol, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
- Conn = {PeerBTAddr, PeerChannel},
- [ TransactionID, Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- process_availability(
- Msg, FromPid, PeerBTAddr, PeerChannel, TransactionID, CompSpec);
- _ ->
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ log("sa from ~s:~w", [PeerBTAddr, PeerChannel], CS),
+ process_announce(Status, Services, FromPid, PeerBTAddr,
+ PeerChannel, CS);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -414,14 +392,14 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
?DLINK_ARG_DATA],
Elems, undefined),
process_data(FromPid, PeerBTAddr, PeerChannel,
- ProtocolMod, Data, CompSpec);
+ ProtocolMod, Data, CS);
?DLINK_CMD_PING ->
?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]),
ok;
undefined ->
- ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
+ ?error("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
ok
end.
@@ -567,7 +545,7 @@ handle_cast(Other, St) ->
{noreply, St}.
-handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
+handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, #st{mode = Mode} = St) ->
%% Do we already have a connection that supchannel service?
case get_connections_by_service(Service) of
[] -> %% Nope
@@ -580,7 +558,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
Addr ->
[ Address, Channel] = string:tokens(Addr, "-"),
- case connect_remote(Address, list_to_integer(Channel), St#st.cs) of
+ case connect_remote(Address, list_to_integer(Channel), Mode, St#st.cs) of
ok ->
{ reply, [ok, 2000], St }; %% 2 second timeout
@@ -613,27 +591,20 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ]})),
-
+ Res = bt_connection:send(
+ ConnPid,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
-
-
handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
- {ok, ServerOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts, [],
- St#st.cs),
+ ServerOpts = get_server_opts(St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec",
@@ -656,12 +627,7 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
case bt_connection:is_connection_up(Pid) of
true ->
?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]),
- bt_connection:send(Pid, term_to_json(
- { struct,
- [ { ?DLINK_ARG_CMD,
- ?DLINK_CMD_PING
- }]})),
-
+ bt_connection:send(Pid, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Channel, Timeout });
@@ -671,8 +637,9 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
{noreply, St};
%% Setup static nodes
-handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, St) ->
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
+handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec },
+ #st{mode = Mode} = St) ->
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
{ noreply, St };
handle_info(Info, St) ->
@@ -688,17 +655,15 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
{ok,[{address, Address }]} = bt_drv:local_info([address]),
bt_connection:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
- { ?DLINK_ARG_PORT, SetupChannel },
- { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
- { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
+ { ?DLINK_ARG_PORT, SetupChannel },
+ { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec)]).
connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ->
+ log("authorized: ~s:~p", [RemoteAddress, RemoteChannel], CompSpec),
case bt_connection_manager:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_bt:authorize(): New connection!"),
@@ -721,15 +686,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
?info("dlink_bt:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteAddress, RemoteChannel]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
- bt_connection:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ]})),
-
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
+ log("sending sa: ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ bt_connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemoteChannel, FromPid }),
ok.
@@ -828,10 +787,6 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-term_to_json(Term) ->
- binary_to_list(iolist_to_binary(exo_json:encode(Term))).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -841,28 +796,25 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
[ opt(K, Elems, Def) || K <- Keys].
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
+log(Fmt, Args, CS) ->
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).
diff --git a/components/dlink_sms/src/dlink_sms.app.src b/components/dlink_sms/src/dlink_sms.app.src
index a394888..d0f86e4 100644
--- a/components/dlink_sms/src/dlink_sms.app.src
+++ b/components/dlink_sms/src/dlink_sms.app.src
@@ -1,8 +1,9 @@
+%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
%%
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -16,7 +17,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_sms_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 77300d9..b24215c 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -36,6 +36,7 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
ip = {0,0,0,0},
@@ -44,7 +45,9 @@
mod = undefined,
func = undefined,
args = undefined,
- pst = undefined %% Payload state
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ cs
}).
%%%===================================================================
@@ -52,8 +55,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, Arg) ->
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of
+setup(IP, Port, Sock, Mod, Fun, CS) ->
+ ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
+ case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -120,7 +124,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, Arg}) ->
+init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> connection_manager:add_connection(IP, Port, self())
@@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- ?debug("connection:init(): Arg: ~p", [Arg]),
- %% Grab socket control
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
+ PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
port = Port,
sock = Sock,
mod = Mod,
func = Fun,
- args = Arg,
- pst = undefined
+ packet_mod = PktMod,
+ packet_st = PktSt,
+ cs = CompSpec
}}.
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
%%--------------------------------------------------------------------
%% @private
@@ -182,13 +189,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({send, Data}, St) ->
+handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ gen_tcp:send(St#st.sock, Encoded),
- gen_tcp:send(St#st.sock, Data),
-
- {noreply, St};
+ {noreply, St#st{packet_st = PSt1}};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data},
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- args = Arg,
- pst = PST} = State) ->
+ packet_mod = PMod,
+ packet_st = PSt} = State) ->
?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
-
- case jsx_decode_stream(Data, PST) of
- { [], NPST } ->
- ?debug("handle_info(data incomplete)", []),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st { pst = NPST} };
-
- { JSONElements, NPST } ->
- ?debug("data complete: Processed: ~p",
- [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]),
- FromPid = self(),
- [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg)
- || SingleElem <- JSONElements],
- inet:setopts(Sock, [ { active, once } ]),
- {noreply, State#st { pst = NPST} }
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
end;
-
-
handle_info({tcp_closed, Sock},
#st { ip = IP,
port = Port,
@@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock},
func = Fun,
args = Arg } = State) ->
?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port,closed, Arg),
+ Mod:Fun(self(), IP, Port, closed, Arg),
gen_tcp:close(Sock),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-jsx_decode_stream(Data, St) ->
- jsx_decode_stream(Data, St, []).
-
-jsx_decode_stream(Data, undefined, Acc) ->
- case jsx:decode(Data, [stream, return_tail]) of
- {incomplete, Cont} ->
- {lists:reverse(Acc), Cont};
- {with_tail, Elems, <<>>} ->
- {lists:reverse([Elems|Acc]), undefined};
- {with_tail, Elems, Rest} ->
- jsx_decode_stream(Rest, undefined, [Elems|Acc])
- end.
+%% jsx_decode_stream(Data, St) ->
+%% jsx_decode_stream(Data, St, []).
+
+%% jsx_decode_stream(Data, undefined, Acc) ->
+%% case jsx:decode(Data, [stream, return_tail]) of
+%% {incomplete, Cont} ->
+%% {lists:reverse(Acc), Cont};
+%% {with_tail, Elems, <<>>} ->
+%% {lists:reverse([Elems|Acc]), undefined};
+%% {with_tail, Elems, Rest} ->
+%% jsx_decode_stream(Rest, undefined, [Elems|Acc])
+%% end.
+
+%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) ->
+%% case PMod:decode(Data, PSt) of
+%% {ok, Elements, PSt1} ->
+%% ?debug("data complete: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% {ok, PSt1};
+%% {more, Elements, Rest, PSt1} ->
+%% ?debug("data complete with Rest: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS);
+%% {more, PSt1} ->
+%% {ok, PSt1};
+%% { ->
+
+handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("data complete: Processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS).
diff --git a/components/dlink_tcp/src/dlink_tcp.app.src b/components/dlink_tcp/src/dlink_tcp.app.src
index 53a32b8..5ba7760 100644
--- a/components/dlink_tcp/src/dlink_tcp.app.src
+++ b/components/dlink_tcp/src/dlink_tcp.app.src
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -16,7 +16,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_tcp_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index b3ec80d..83e0a24 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.0").
+-define(DLINK_TCP_VERSION, "1.1").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) ->
%% Setup a genserver around the new connection.
{ok, Pid } = connection:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
+ ?MODULE, handle_socket, CompSpec ),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- connection:send(
- Pid,
- term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
@@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
case connect_remote(IP, list_to_integer(Port), CS) of
ok ->
- log("connected", [], CS),
+ log(result, "connected", [], CS),
ok;
Err -> %% Failed to connect. Sleep and try again
@@ -266,16 +253,8 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = connection:send(
- ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = connection:send(ConnPid, Msg),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) ->
handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]),
@@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemotePort,
ProtoVersion,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- C when is_list(C) -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CS);
+ RemoteAddress, RemotePort,
+ ProtoVersion, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- ?debug("Service Announce~nMsg = ~p~n", [Msg]),
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- log("sa INVALID", [], CS),
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) ->
{noreply, St};
handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) ->
+ ?debug("handle_socket, Arg (CS) = ~p", [Arg]),
try handle_socket_(FromPid, IP, Port, Event, Arg)
catch
C:E ->
?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]),
+ error("Caught ~p:~p", [C, E]),
ok
end,
{noreply, St};
@@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
Res = connection:send(ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ])),
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
@@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) ->
}) || SvcName <- SvcNameList ],
ok.
-availability_msg(Availability, Services) ->
- [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, Services }].
+availability_msg(Availability, Services, CompSpec) ->
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }
+ | log_id_tail(CompSpec) ].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
-
+ ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
{NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec) ])).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
+ { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec) ]).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
@@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
- connection:send(FromPid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)])),
-
+ connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
ok.
@@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
Proto = list_to_existing_atom(ProtocolMod),
- Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
- base64:decode_to_string(Data)).
+ Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data).
-process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- Svcs ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Services, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]),
- ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]),
+ ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]),
case Avail of
?DLINK_ARG_AVAILABLE ->
- add_services(Svcs, FromPid),
- service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE);
+ add_services(Services, FromPid),
+ service_discovery_rpc:register_services(CompSpec, Services, ?MODULE);
?DLINK_ARG_UNAVAILABLE ->
- delete_services(FromPid, Svcs),
- service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
+ delete_services(FromPid, Services),
+ service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE)
end,
ok.
@@ -837,36 +775,15 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
-
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
-
-term_to_json(Term) ->
- rvi_common:term_to_json(Term).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) ->
rvi_common:set_value(rvi_log_id, LogId, CS).
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 45c0691..4512a59 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -63,7 +63,7 @@ terminate(_Reason, _State) ->
ok.
sock_opts() ->
- [binary, {active, once}, {packet, 4}].
+ [binary, {active, once}, {packet, 0}].
new_connection(IP, Port, Sock, State) ->
?debug("listener:new_connection(): Peer IP: ~p (ignored)", [IP]),
@@ -75,5 +75,5 @@ new_connection(IP, Port, Sock, State) ->
%% Provide component spec as extra arg.
{ok, _P} = connection:setup(undefined, 0, Sock,
dlink_tcp_rpc,
- handle_socket, [gen_nb_server:get_cb_state(State)]),
+ handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.
diff --git a/components/dlink_tls/src/dlink_tls.app.src b/components/dlink_tls/src/dlink_tls.app.src
index 7b53a18..3698775 100644
--- a/components/dlink_tls/src/dlink_tls.app.src
+++ b/components/dlink_tls/src/dlink_tls.app.src
@@ -15,7 +15,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_tls_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index c125ca3..583009c 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-include_lib("lager/include/log.hrl").
-
+-include_lib("public_key/include/public_key.hrl").
%% API
@@ -145,7 +145,6 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- %% Grab socket control
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
{ok, #st{
@@ -194,7 +193,11 @@ handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
?debug("upgrade to TLS succcessful~n", []),
ssl:setopts(NewS, [{active, Last}]),
{ok, {IP, Port}} = ssl:peername(NewS),
- NewCS = rvi_common:set_value(dlink_tls_role, client, CompSpec),
+ {ok, PeerCert} = ssl:peercert(NewS),
+ ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
+ NewCS = rvi_common:set_value(
+ dlink_tls_role, Role,
+ rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)),
{reply, ok, St#st{sock = NewS, mode = tls, role = Role,
ip = inet_parse:ntoa(IP), port = Port,
cs = NewCS}};
@@ -219,7 +222,7 @@ handle_call(_Request, _From, State) ->
%%--------------------------------------------------------------------
handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_call(send): Sending: ~p",
- [ ?MODULE, Data]),
+ [ ?MODULE, abbrev(Data)]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
case St#st.mode of
tcp -> gen_tcp:send(St#st.sock, Encoded);
@@ -253,52 +256,38 @@ handle_info({tcp, Sock, Data},
#st { ip = undefined } = St) ->
{ok, {IP, Port}} = inet:peername(Sock),
NSt = St#st { ip = inet_parse:ntoa(IP), port = Port },
- ?warning("YESSSS"),
handle_info({tcp, Sock, Data}, NSt);
-handle_info({ssl, Sock, Data},
- #st{ip = IP, port = Port,
- mod = Mod, func = Fun, cs = CS,
- packet_mod = PMod, packet_st = PSt} = State) ->
- ?debug("handle_info(data): ~p", [Data]),
- case PMod:decode(Data, PSt) of
- {ok, Elements, PSt1} ->
- ?debug("~p:handle_info(data complete): Processed: ~p",
- [?MODULE, Elements]),
- Mod:Fun(self(), IP, Port, data, Elements, CS),
+handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port,
+ packet_mod = PMod, packet_st = PSt} = State) ->
+ ?debug("handle_info(data): Data: ~p", [abbrev(Data)]),
+ ?debug("handle_info(data): From: ~p:~p ", [ IP, Port]),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elems(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
ssl:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
- {stop, Reason, State};
- {more, PSt1} ->
- ?debug("~p:handle_info(data incomplete)", [ ?MODULE]),
- ssl:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}}
+ {stop, Reason, State}
end;
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- cs = CS,
packet_mod = PMod,
packet_st = PSt} = State) ->
- ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
- ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]),
-
- case PMod:decode(Data, PSt) of
- {ok, Elements, PSt1} ->
- ?debug("~p:handle_info(data complete): Processed: ~p",
- [?MODULE, Elements]),
- Mod:Fun(self(), IP, Port, data, Elements, CS),
+ ?debug("handle_info(data): Data: ~p", [Data]),
+ ?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
+
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elems(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
- {stop, Reason, State};
- {more, PSt1} ->
- ?debug("~p:handle_info(data incomplete)", [ ?MODULE]),
- inet:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}}
+ ?debug("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
end;
handle_info({tcp_closed, Sock},
@@ -361,23 +350,87 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
do_upgrade(Sock, client, CompSpec) ->
- ssl:connect(Sock, tls_opts(client, CompSpec));
+ Opts = tls_opts(client, CompSpec),
+ ?debug("TLS Opts = ~p", [Opts]),
+ ssl:connect(Sock, Opts);
do_upgrade(Sock, server, CompSpec) ->
- ssl:ssl_accept(Sock, tls_opts(server, CompSpec)).
+ Opts = tls_opts(client, CompSpec),
+ ?debug("TLS Opts = ~p", [Opts]),
+ ssl:ssl_accept(Sock, Opts).
%% FIXME: For now, use the example certs delivered with the OTP SSL appl.
-tls_opts(Role, CompSpec) ->
- Dir = tls_dir(Role),
- {ok, CertFile} = get_config(certfile, filename:join(Dir, "cert.pem"), CompSpec),
- {ok, KeyFile} = get_config(keyfile, filename:join(Dir, "key.pem"), CompSpec),
- [{certfile, CertFile},
- {keyfile, KeyFile}].
-
-tls_dir(Role) when Role==client;
- Role==server ->
- filename:join([code:lib_dir(ssl), "examples", "certs", "etc",
- atom_to_list(Role)]).
-
-get_config(Key, Default, CompSpec) ->
- rvi_common:get_module_config(
- dlink_tls, dlink_tls_rpc, Key, Default, CompSpec).
+tls_opts(Role, _CompSpec) ->
+ {ok, DevCert} = setup:get_env(rvi_core, device_cert),
+ {ok, DevKey} = setup:get_env(rvi_core, device_key),
+ {ok, CACert} = setup:get_env(rvi_core, root_cert),
+ [
+ {verify, verify_peer},
+ {certfile, DevCert},
+ {keyfile, DevKey},
+ {cacertfile, CACert},
+ {verify_fun, {fun verify_fun/3, public_root_key()}},
+ {partial_chain, fun(X) ->
+ partial_chain(Role, X)
+ end}
+ ].
+
+public_root_key() ->
+ authorize_keys:provisioning_key().
+
+verify_fun(Cert, What, St) ->
+ ?debug("verify_fun(~p, ~p, ~p)", [abbrev(Cert), What, abbrev(St)]),
+ verify_fun_(Cert, What, St).
+
+verify_fun_(Cert, {bad_cert, selfsigned_peer}, PubKey) ->
+ ?debug("Verify self-signed cert: ~p", [abbrev(Cert)]),
+ try verify_cert_sig(Cert, PubKey) of
+ true ->
+ ?debug("verified!", []),
+ {valid, PubKey};
+ false ->
+ ?debug("verification FAILED", []),
+ {bad_cert, invalid_signature}
+ catch
+ error:Error ->
+ ?debug("Caught error:~p~n~p", [Error, erlang:get_stacktrace()]),
+ {fail, PubKey}
+ end;
+verify_fun_(_, {bad_cert, Reason}, St) ->
+ ?debug("Bad cert: ~p", [Reason]),
+ {fail, St};
+verify_fun_(_, {extension, _}, St) ->
+ {unknown, St};
+verify_fun_(_, valid, St) ->
+ {valid, St};
+verify_fun_(_, valid_peer, St) ->
+ {valid_peer, St}.
+
+partial_chain(_, Certs) ->
+ ?debug("partial_chain() invoked, length(Certs) = ~w", [length(Certs)]),
+ Decoded = (catch [public_key:der_decode('Certificate', C)
+ || C <- Certs]),
+ ?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]),
+ {trusted_ca, hd(Certs)}.
+
+handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS),
+ ok.
+
+verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS,
+ signature = Sig}, PubKey) ->
+ DER = public_key:pkix_encode('OTPTBSCertificate', TBS, otp),
+ {SignType, _} = signature_algorithm(TBS),
+ public_key:verify(DER, SignType, Sig, PubKey).
+
+signature_algorithm(#'OTPCertificate'{tbsCertificate = TBS}) ->
+ signature_algorithm(TBS);
+signature_algorithm(#'OTPTBSCertificate'{
+ signature = #'SignatureAlgorithm'{
+ algorithm = Algo}}) ->
+ public_key:pkix_sign_types(Algo).
+
+
+abbrev(T) ->
+ authorize_keys:abbrev(T).
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index e42a156..9df8f54 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -100,26 +100,14 @@ start_connection_manager() ->
[],
CompSpec),
?debug("TlsOpts = ~p", [TlsOpts]),
- IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
- Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
-
?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
dlink_tls_connmgr:start_link(),
{ok,Pid} = dlink_tls_listener:start_link(),
- ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
- %% Add listener port.
- case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of
- ok ->
- ?notice("---- RVI Node External Address: ~s",
- [ application:get_env(rvi_core, node_address, undefined)]);
+ setup_initial_listeners(Pid, TlsOpts, CompSpec),
- Err ->
- ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
- ok
- end,
?info("dlink_tls:init_rvi_component(): Setting up persistent connections."),
{ok, PersistentConnections } = rvi_common:get_module_config(data_link,
@@ -130,6 +118,23 @@ start_connection_manager() ->
setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
+setup_initial_listeners(Pid, [], CompSpec) ->
+ ?debug("no initial listeners", []);
+setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) ->
+ IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
+ Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
+ %% Add listener port.
+ ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
+ case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of
+ ok ->
+ ?notice("---- RVI Node External Address: ~s",
+ [ application:get_env(rvi_core, node_address, undefined)]);
+
+ Err ->
+ ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
+ ok
+ end.
+
setup_persistent_connections_([ ], _CompSpec) ->
ok;
@@ -324,7 +329,7 @@ handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) ->
handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]),
- ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]),
+ ?debug("data(): Elems ~p~nCS = ~p", [abbrev(Elems), abbrev(CompSpec)]),
CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
@@ -333,28 +338,28 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
[ RemoteAddress,
RemotePort,
- Signature ] =
+ Credentials ] =
opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
- Signature, CS);
-
- ?DLINK_CMD_CERT_EXCHANGE ->
- ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]),
- [ Certs ] =
- opts([?DLINK_ARG_CERTIFICATES], Elems, undefined),
- ?debug("Certs = ~p", [Certs]),
- log("certs from ~s:~w", [PeerIP, PeerPort], CS),
- authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}),
- case rvi_common:get_value(dlink_tls_role, client, CS) of
- client -> ok;
- server ->
- send_certs(FromPid, CompSpec)
- end,
- ok;
+ Credentials, CS);
+
+ %% ?DLINK_CMD_CRED_EXCHANGE ->
+ %% ?debug("got cred exch ~s:~w", [PeerIP, PeerPort]),
+ %% [ Creds ] =
+ %% opts([?DLINK_ARG_CREDENTIALS], Elems, undefined),
+ %% ?debug("Creds = ~p", [Creds]),
+ %% log("creds from ~s:~w", [PeerIP, PeerPort], CS),
+ %% authorize_rpc:store_creds(CS, Creds, {PeerIP, PeerPort}),
+ %% case rvi_common:get_value(dlink_tls_role, client, CS) of
+ %% client -> ok;
+ %% server ->
+ %% send_creds(FromPid, CompSpec)
+ %% end,
+ %% ok;
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
@@ -364,7 +369,6 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
@@ -644,12 +648,11 @@ status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
- RemotePort, Signature, CompSpec) ->
+ RemotePort, Credentials, CompSpec) ->
?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]),
?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]),
- ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
- { _NRemoteAddress, _NRemotePort} = Conn =
+ { NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -658,33 +661,30 @@ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
{ PeerIP, PeerPort };
_ -> { RemoteAddress, RemotePort}
end,
-
- case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
-
-send_certs(Pid, CompSpec) ->
- ?debug("send_certs (Pid = ~p)", [Pid]),
- {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
- [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE},
- {?DLINK_ARG_ADDRESS, LocalIP},
- {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
- {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)).
+ log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ PeerCert = rvi_common:get_value(dlink_tls_peer_cert, not_found, CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn, PeerCert),
+ connection_authorized(FromPid, Conn, CompSpec).
+
+%% send_creds(Pid, CompSpec) ->
+%% ?debug("send_creds (Pid = ~p)", [Pid]),
+%% {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
+%% dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+%% [{?DLINK_ARG_CMD, ?DLINK_CMD_CRED_EXCHANGE},
+%% {?DLINK_ARG_ADDRESS, LocalIP},
+%% {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+%% {?DLINK_ARG_CREDENTIALS, [get_credentials(CompSpec)]}], CompSpec)).
send_authorize(Pid, CompSpec) ->
- ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]),
+ ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, abbrev(CompSpec)]),
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- JWT = get_authorize_jwt(CompSpec),
+ Creds = get_credentials(CompSpec),
dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
[{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE},
+ {?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION},
{?DLINK_ARG_ADDRESS, LocalIP},
{?DLINK_ARG_PORT, integer_to_list(LocalPort)},
- {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)).
+ {?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)).
%% dlink_tls_conn:send(Pid,
%% term_to_json(
%% {struct,
@@ -708,18 +708,8 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid),
?debug("dlink_tls:authorize(): Sending authorize."),
_Res = send_authorize(FromPid, CompSpec),
- case rvi_common:get_value(dlink_tls_role, server, CompSpec) of
- server -> ok;
- client ->
- send_certs(FromPid, CompSpec)
- end,
- %% ?debug("dlink_tls:upgrade connection", []),
- %% UgRes = dlink_tls_conn:upgrade(FromPid, server),
- %% ?debug("upgrade result: ~p", [UgRes]),
ok;
_ ->
- %% UgRes = dlink_tls_conn:upgrade(FromPid, client),
- %% ?debug("upgrade result: ~p", [UgRes]),
ok
end,
@@ -797,31 +787,31 @@ get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
- [not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
- end.
+%% get_authorize_jwt(CompSpec) ->
+%% case authorize_rpc:get_authorize_jwt(CompSpec) of
+%% [ok, JWT] ->
+%% JWT;
+%% [not_found] ->
+%% ?error("No authorize JWT~n", []),
+%% error(cannot_authorize)
+%% end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-validate_auth_jwt(JWT, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
+%% validate_auth_jwt(JWT, Conn, CompSpec) ->
+%% case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
+%% [ok] ->
+%% true;
+%% [not_found] ->
+%% false
+%% end.
term_to_json(Term) ->
binary_to_list(iolist_to_binary(exo_json:encode(Term))).
@@ -834,7 +824,7 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
Res = [ opt(K, Elems, Def) || K <- Keys],
- ?debug("opts(~p) -> ~p", [Keys, Elems]),
+ ?debug("opts(~p) -> ~p", [Keys, abbrev(Elems)]),
Res.
@@ -848,3 +838,6 @@ start_log(Pfx, Fmt, Args, CS) ->
log(Fmt, Args, CS) ->
rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+
+abbrev(Data) ->
+ authorize_keys:abbrev(Data).
diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl
index f4f4cb8..6ccabd2 100644
--- a/components/rvi_common/include/rvi_dlink_bin.hrl
+++ b/components/rvi_common/include/rvi_dlink_bin.hrl
@@ -8,24 +8,24 @@
%% Commonly used protocol identifiers across dlink implementations
%%
--define(DLINK_CMD_AUTHORIZE, <<"au">>).
--define(DLINK_CMD_CERT_EXCHANGE, <<"crt">>).
--define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>).
--define(DLINK_CMD_RECEIVE, <<"rcv">>).
--define(DLINK_CMD_FRAG, <<"frg">>).
--define(DLINK_CMD_PING, <<"ping">>).
+-define(DLINK_CMD_AUTHORIZE, <<"au">>).
+-define(DLINK_CMD_CRED_EXCHANGE, <<"cre">>).
+-define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>).
+-define(DLINK_CMD_RECEIVE, <<"rcv">>).
+-define(DLINK_CMD_FRAG, <<"frg">>).
+-define(DLINK_CMD_PING, <<"ping">>).
--define(DLINK_ARG_CMD, <<"cmd">>).
--define(DLINK_ARG_TRANSACTION_ID, <<"tid">>).
--define(DLINK_ARG_ADDRESS, <<"addr">>).
--define(DLINK_ARG_PORT, <<"port">>).
--define(DLINK_ARG_VERSION, <<"ver">>).
--define(DLINK_ARG_CERTIFICATE, <<"cert">>).
--define(DLINK_ARG_CERTIFICATES, <<"certs">>).
--define(DLINK_ARG_SIGNATURE, <<"sign">>).
--define(DLINK_ARG_SERVICES, <<"svcs">>).
--define(DLINK_ARG_MODULE, <<"mod">>).
--define(DLINK_ARG_AVAILABLE, <<"av">>).
--define(DLINK_ARG_UNAVAILABLE, <<"un">>).
--define(DLINK_ARG_STATUS, <<"stat">>).
--define(DLINK_ARG_DATA, <<"data">>).
+-define(DLINK_ARG_CMD, <<"cmd">>).
+-define(DLINK_ARG_TRANSACTION_ID, <<"tid">>).
+-define(DLINK_ARG_ADDRESS, <<"addr">>).
+-define(DLINK_ARG_PORT, <<"port">>).
+-define(DLINK_ARG_VERSION, <<"ver">>).
+-define(DLINK_ARG_CREDENTIAL, <<"cred">>).
+-define(DLINK_ARG_CREDENTIALS, <<"creds">>).
+-define(DLINK_ARG_SIGNATURE, <<"sign">>).
+-define(DLINK_ARG_SERVICES, <<"svcs">>).
+-define(DLINK_ARG_MODULE, <<"mod">>).
+-define(DLINK_ARG_AVAILABLE, <<"av">>).
+-define(DLINK_ARG_UNAVAILABLE, <<"un">>).
+-define(DLINK_ARG_STATUS, <<"stat">>).
+-define(DLINK_ARG_DATA, <<"data">>).
diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl
index 31700ca..cc3e7b9 100644
--- a/components/rvi_common/src/exoport_exo_http.erl
+++ b/components/rvi_common/src/exoport_exo_http.erl
@@ -31,6 +31,13 @@ instance(SupMod, AppMod, Opts) ->
handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == 'POST' ->
+ ensure_ready(fun handle_post/4, Socket, Request, Body, AppMod);
+
+handle_body(Socket, _Request, _Body, _AppMod) ->
+ exo_http_server:response(Socket, undefined, 404, "Not Found",
+ "Object not found. Try using POST method.").
+
+handle_post(Socket, Request, Body, AppMod) ->
case Request#http_request.headers of
#http_chdr{content_type = "application/json" ++ _ = T}
when T=="application/json";
@@ -39,11 +46,7 @@ handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == '
handle_post_json(Socket, Request, Body, AppMod);
#http_chdr{content_type = "multipart/" ++ _} ->
handle_post_multipart(Socket, Request, Body, AppMod)
- end;
-
-handle_body(Socket, _Request, _Body, _AppMod) ->
- exo_http_server:response(Socket, undefined, 404, "Not Found",
- "Object not found. Try using POST method.").
+ end.
handle_post_json(Socket, _Request, Body, AppMod) ->
try decode_json(Body) of
@@ -211,3 +214,15 @@ opt(K, L, Def) ->
{_, V} -> V;
false -> Def
end.
+
+
+ensure_ready(F, Socket, Request, Body, AppMod) ->
+ try rvi_server:ensure_ready(_Timeout = 10000),
+ F(Socket, Request, Body, AppMod)
+ catch
+ error:timeout ->
+ ?error("~p timeout waiting for rvi_core", [?MODULE]),
+ exo_http_server:response(Socket, undefined, 501,
+ "Internal Error",
+ "Internal Error")
+ end.
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index c9bbabb..0b54d53 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -191,7 +191,6 @@ notification(Component,
?debug(" Module : ~p", [Module]),
?debug(" Function : ~p", [Function]),
?debug(" InArgPropList : ~p", [InArgPropList]),
- ?debug(" CompSpec : ~p", [CompSpec]),
case get_module_type(Component, Module, CompSpec) of
%% We have a gen_server
@@ -288,6 +287,8 @@ unstruct(X) ->
%% If Path is just a single element, convert to list and try again.
+get_json_element(_, []) ->
+ {error, undefined};
get_json_element(ElemPath, JSON) when is_atom(ElemPath) ->
get_json_element([ElemPath], JSON);
@@ -299,7 +300,7 @@ get_json_element(ElemPath, JSON) when is_tuple(JSON) ->
get_json_element(ElemPath, [T|_] = JSON) when is_tuple(T) ->
get_json_element_(ElemPath, JSON);
-get_json_element(ElemPath, JSON) when is_list(JSON) ->
+get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) ->
case exo_json:decode_string(JSON) of
{ok, Data } ->
get_json_element_(ElemPath, Data);
@@ -311,7 +312,7 @@ get_json_element(ElemPath, JSON) when is_list(JSON) ->
get_json_element(P, J) ->
?warning("get_json_element(): Unknown call structure; Path: ~p | JSON: ~p",
[P, J]),
- {error, call, {P, J}}.
+ {error, {call, {P, J}}}.
get_json_element_(_, undefined) ->
{ error, undefined };
@@ -508,7 +509,7 @@ get_component_config_(Component, Default, CompList) ->
get_component_specification() ->
CS = get_component_specification_(),
- lager:debug("CompSpec = ~p", [CS]),
+ %% lager:debug("CompSpec = ~p", [CS]),
CS.
get_component_specification_() ->
@@ -580,8 +581,8 @@ get_component_modules(_, _) ->
get_module_specification(Component, Module, CompSpec) ->
case get_component_modules(Component, CompSpec) of
undefined ->
- ?debug("get_module_specification(): Missing: rvi_core:component: ~p: ~p",
- [Component, CompSpec]),
+ ?debug("get_module_specification(): Missing: rvi_core:component: ~p",
+ [Component]),
undefined;
Modules ->
@@ -614,7 +615,7 @@ get_module_config(Component, Module, Key, CompSpec) ->
case proplists:get_value(Key, ModConf, undefined ) of
undefined ->
?debug("get_module_config(): Missing component spec: "
- "rvi_core:component:~p:~p:~p{...}: ~p",
+ "~p:~p:~p{...}: ~p",
[Component, Module, Key, ModConf]),
{error, {not_found, Component, Module, Key}};
@@ -764,9 +765,9 @@ start_json_rpc_server(Component, Module, Supervisor, XOpts) ->
Module,
ExoHttpOpts);
Err ->
- ?info("rvi_common:start_json_rpc_server(~p:~p): "
- "No JSON-RPC address setup. skip",
- [ Component, Module ]),
+ ?debug("start_json_rpc_server(~p:~p): "
+ "No JSON-RPC address setup. skip",
+ [ Component, Module ]),
Err
end.
diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl
index 5d0fa8e..962252c 100644
--- a/components/rvi_common/src/rvi_log.erl
+++ b/components/rvi_common/src/rvi_log.erl
@@ -2,15 +2,22 @@
-behaviour(gen_server).
+-compile(export_all).
+
-export([start_link/0,
log/3,
- flog/4,
+ flog/4, flog/5,
new_id/1,
fetch/1,
timestamp/0,
format/2
]).
+-export([entry/3,
+ exit_good/3,
+ exit_warn/3,
+ exit_fail/3]).
+
-export([start_json_server/0,
handle_rpc/2,
handle_notification/2]).
@@ -28,6 +35,7 @@
seq = 1,
node_tag}).
-record(evt, {id,
+ level,
component,
event}).
-define(IDS, rvi_log_ids).
@@ -35,13 +43,35 @@
-define(MAX_LENGTH, 60).
+%% levels
+-define(INFO, 0).
+-define(GOOD, 1).
+-define(WARN, 2).
+-define(FAIL, 3).
+
start_link() ->
create_tabs(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+entry(TID, Component, Event) -> log(TID, ?INFO, Component, Event).
+exit_good(TID, Component, Event) -> log(TID, ?GOOD, Component, Event).
+exit_warn(TID, Component, Event) -> log(TID, ?WARN, Component, Event).
+exit_fail(TID, Component, Event) -> log(TID, ?FAIL, Component, Event).
+
log(TID, Component, Event) when is_binary(TID), is_binary(Component) ->
- gen_server:cast(?MODULE, {log, TID, timestamp(), Component, bin(Event)}).
+ log(TID, ?INFO, Component, Event).
+
+log(TID, Level, Component, Event) when is_binary(TID), is_binary(Component) ->
+ gen_server:cast(?MODULE, {log, TID, level_num(Level), timestamp(), Component, bin(Event)}).
+
+
+level_num(L) when is_integer(L), L >= ?INFO, L =< ?FAIL ->
+ L;
+level_num(info ) -> 0;
+level_num(result ) -> 1;
+level_num(warning) -> 2;
+level_num(error ) -> 3.
new_id(Prefix) ->
gen_server:call(?MODULE, {new_id, bin(Prefix)}).
@@ -63,18 +93,156 @@ trunc_msg(Bin) ->
flog(Fmt, Args, Component, CS) ->
+ flog(0, Fmt, Args, Component, CS).
+
+flog(Level, Fmt, Args, Component, CS) ->
LogTID = rvi_common:get_log_id(CS),
- log(LogTID, Component, format(Fmt, Args)).
+ log(LogTID, Level, Component, format(Fmt, Args)).
timestamp() ->
os:timestamp().
+%% shorthand for the select pattern
+-define(ELEM(A), {element, #evt.A, '$_'}).
+-define(PROD, {{'$1', ?ELEM(level), ?ELEM(component), ?ELEM(event)}}).
+
fetch(Tids) ->
+ fetch(Tids, []).
+
+fetch(Tids, Args) ->
TidSet = select_ids(Tids),
- [{Tid, ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
- component = '$2',
- event = '$3'}, [], [{{'$1', '$2', '$3'}}] }])}
- || Tid <- TidSet].
+ lists:foldr(
+ fun(Tid, Acc) ->
+ case match_events(
+ Args,
+ ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
+ level = '$2',
+ component = '$3',
+ event = '$4'}, [], [?PROD] }])) of
+ [] -> Acc;
+ Events ->
+ [{Tid, Events}|Acc]
+ end
+ end, [], TidSet).
+
+match_events(_, []) ->
+ [];
+match_events(Args, Events) ->
+ lists:foldl(
+ fun({<<"level">>, Str}, Acc) ->
+ filter_by_level(Acc, parse_level_expr(Str));
+ (_, Acc) ->
+ Acc
+ end, Events, Args).
+
+%% The level comparison expressions use the following syntax:
+%% Expr :: Int
+%% | BinOp Expr
+%% | UnaryOp Expr
+%% | '(' Expr ')'
+%%
+%% BinOp :: '==' | '>=' | '=>' | '<=' | '=<' | '!=' | '|' | '&'
+%% UnaryOp :: '!' | 'not'
+%%
+%% The token scanning pass translates the tokens to standard Erlang tokens
+%% The parsing pass first infills variable references where needed, e.g.
+%% "> 1" would be translated to "L > 1" (where L is a variable bound to the
+%% current Event Level at evaluation time), and
+%% "1" would be translated to "L == 1".
+%%
+%% The infill pass is needed in order to produce a legal Erlang grammar. It's followed
+%% by a call to the erlang parser. The short forms are expanded in the parsed form.
+%%
+%% When parsing and evaluating the expressions any exception is translated into a failed
+%% comparison.
+%%
+parse_level_expr(Str) ->
+ try level_grammar(scan_level_expr(Str, []))
+ catch
+ error:_ ->
+ [{atom, 1, false}]
+ end.
+
+scan_level_expr(<<"!=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=/=',1}|Acc]);
+scan_level_expr(<<"==", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=:=',1}|Acc]);
+scan_level_expr(<<"=>", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=<',1}|Acc]);
+scan_level_expr(<<"<=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>=',1}|Acc]);
+scan_level_expr(<<">=", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>=',1}|Acc]);
+scan_level_expr(<<"=<", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'=<',1}|Acc]);
+scan_level_expr(<<">", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'>',1}|Acc]);
+scan_level_expr(<<"<", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'<',1}|Acc]);
+scan_level_expr(<<"|", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'orelse',1}|Acc]);
+scan_level_expr(<<"&", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'andalso',1}|Acc]);
+scan_level_expr(<<I, Rest/binary>>, Acc) when I >= $0, I =< $3 ->
+ scan_level_expr(Rest, [{integer, 1, I-$0} | Acc]);
+scan_level_expr(<<"(", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'(',1}|Acc]);
+scan_level_expr(<<")", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{')',1}|Acc]);
+scan_level_expr(<<"not", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'not',1}|Acc]);
+scan_level_expr(<<"!", Rest/binary>>, Acc) ->
+ scan_level_expr(Rest, [{'not',1}|Acc]);
+scan_level_expr(<<"\s", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<"\t", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<"\n", Rest/binary>>, Acc) -> scan_level_expr(Rest, Acc);
+scan_level_expr(<<>>, Acc) ->
+ lists:reverse(Acc).
+
+level_grammar(Toks) ->
+ case erl_parse:parse_exprs(infill_vars(Toks)) of
+ {ok, Exprs} ->
+ expand_shorthand(Exprs);
+ Error ->
+ error(Error)
+ end.
+
+-define(is_op(H), H=='=:='; H=='=!='; H=='>'; H=='<'; H=='>='; H=='=<').
+
+infill_vars([{O,_}=H|T]) when ?is_op(O) ->
+ [{var,1,'L'},H|infill_vars(T)];
+infill_vars([H|T]) ->
+ [H|infill_vars(T)];
+infill_vars([]) ->
+ [{dot,1}].
+
+expand_shorthand([Expr|Exprs]) ->
+ [expand_shorthand_(Expr)|expand_shorthand(Exprs)];
+expand_shorthand([]) ->
+ [].
+
+expand_shorthand_({integer,_,_} = I) ->
+ {op, 1, '=:=', {var,1,'L'}, I};
+expand_shorthand_({op,Ln,Op,A,B}) when Op=='andalso';
+ Op=='orelse' ->
+ {op,Ln,Op,expand_shorthand_(A), expand_shorthand_(B)};
+expand_shorthand_({op,Ln,'not',E}) ->
+ {op,Ln,'not',expand_shorthand_(E)};
+expand_shorthand_(Expr) ->
+ Expr.
+
+filter_by_level([H|T], Expr) ->
+ Res = try erl_eval:exprs(Expr, [{'L', element(2,H)}]) of
+ {value, true, _} -> true;
+ _ -> false
+ catch
+ _:_ -> false
+ end,
+ if Res ->
+ [H|filter_by_level(T, Expr)];
+ true ->
+ filter_by_level(T, Expr)
+ end.
init(_) ->
{ok, Tag} = rvi_common:get_module_config(
@@ -117,11 +285,11 @@ handle_notification(Other, _Args) ->
ok.
handle_cast(log_start, #st{n = N, node_tag = Tag} = St) ->
- do_log(tid_(<<"rvi_log">>, 0, Tag), timestamp(), <<"rvi_common">>,
+ do_log(tid_(<<"rvi_log">>, 0, Tag), ?INFO, timestamp(), <<"rvi_common">>,
format("Started - Tag = ~s", [Tag]), N),
{noreply, St};
-handle_cast({log, TID, TS, Component, Event}, #st{n = N} = St) ->
- do_log(TID, TS, Component, Event, N),
+handle_cast({log, TID, Level, TS, Component, Event}, #st{n = N} = St) ->
+ do_log(TID, Level, TS, Component, Event, N),
{noreply, St};
handle_cast(_, St) ->
{noreply, St}.
@@ -161,10 +329,10 @@ maybe_new(T, Opts) ->
tid_(Prefix, Seq, Tag) ->
<<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>.
-do_log(Tid, TS, Component, Event, N) ->
+do_log(Tid, Level, TS, Component, Event, N) ->
case ets:member(?IDS, Tid) of
true ->
- store_event(Tid, TS, Component, Event);
+ store_event(Tid, Level, TS, Component, Event);
false ->
case ets:info(?IDS, size) of
Sz when Sz >= N ->
@@ -173,13 +341,14 @@ do_log(Tid, TS, Component, Event, N) ->
ok
end,
ets:insert(?IDS, {Tid}),
- store_event(Tid, TS, Component, Event)
+ store_event(Tid, Level, TS, Component, Event)
end.
-store_event(Tid, TS, Component, Event) ->
- rvi_log_log:info("~-20s ~-12s ~s", [Tid, Component, Event]),
- ?info("RVI_LOG: ~p/~p/~p", [Tid, Component, Event]),
+store_event(Tid, Level, TS, Component, Event) ->
+ rvi_log_log:info("~-20s ~-2w ~-12s ~s", [Tid, Level, Component, Event]),
+ ?info("RVI_LOG: ~p/~w/~p/~p", [Tid, Level, Component, Event]),
ets:insert(?EVENTS, #evt{id = {Tid, TS},
+ level = Level,
component = Component,
event = Event}).
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index b4f14ab..bd7ab54 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -92,7 +92,7 @@ get_modules_by_service(CompSpec, Service) ->
register_services(CompSpec, Services, DataLinkModule) ->
?debug("~p:register_services()", [?MODULE]),
- ?debug(" CompSpec : ~p", [CompSpec]),
+ ?debug(" CompSpec : ~p", [authorize_keys:abbrev(CompSpec)]),
?debug(" Services : ~p", [Services]),
?debug(" DataLinkMod : ~p", [DataLinkModule]),
rvi_common:notification(service_discovery, ?MODULE, register_services,
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index a7c75e0..576dc13 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -236,7 +236,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
{ ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
-
+ LogId = log_id_json_tail(Params ++ Parameters),
?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]),
?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
?debug("service_edge_rpc:handle_websocket(~p) parameters: ~p", [ WSock, Parameters ]),
@@ -244,7 +244,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
case gen_server:call(
?SERVER,
{rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}) of
+ [ SvcName, Timeout, Parameters | LogId ]}) of
[not_found] ->
{ok, [{status, rvi_common:json(not_found)}]};
[Res, TID] ->
@@ -318,9 +318,9 @@ handle_rpc(<<"message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
{ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ LogId = log_id_json_tail(Args ++ Parameters),
[ Res, TID ] = gen_server:call(?SERVER, { rvi, handle_local_message,
- [ SvcName, Timeout, Parameters]}),
-
+ [ SvcName, Timeout, Parameters | LogId]}),
{ok, [ { status, rvi_common:json_rpc_status(Res) },
{ transaction_id, TID },
{ method, <<"message">>}
@@ -396,12 +396,12 @@ handle_notification(Other, _Args) ->
%% the only calls invoked by other components, and not the locally
%% connected services that uses the same HTTP port to transmit their
%% register_service, and message calls.
-handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
+handle_call({ rvi, register_local_service, [SvcName, URL | T] }, _From, St) ->
?debug("service_edge_rpc:register_local_service(): service: ~p ", [SvcName]),
?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
FullSvcName = rvi_common:local_service_to_string(SvcName),
- CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs),
+ CS = start_log(T, "reg local service: ~s", [FullSvcName], St#st.cs),
?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]),
ets:insert(?SERVICE_TABLE, #service_entry {
@@ -416,7 +416,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
%% Return ok.
{ reply, [ ok, FullSvcName ], St };
-handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
+handle_call({ rvi, unregister_local_service, [SvcName | T] }, _From, St) ->
?debug("service_edge_rpc:unregister_local_service(): service: ~p ", [SvcName]),
@@ -424,7 +424,7 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
- CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs),
+ CS = start_log(T, "unreg local service: ~s", [SvcName], St#st.cs),
service_discovery_rpc:unregister_services(CS, [SvcName], local),
%% Return ok.
@@ -444,11 +444,11 @@ handle_call({rvi, get_available_services, []}, _From, St) ->
%% 13:48:12.943 [debug] service_edge_rpc:local_msg: parameters: [{struct,[{"a","b"}]}]
handle_call({ rvi, handle_local_message,
- [SvcName, TimeoutArg, Parameters] }, _From, St) ->
+ [SvcName, TimeoutArg, Parameters | Tail] }, _From, St) ->
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
- CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs),
+ CS = start_log(Tail, "local_message: ~s", [SvcName], St#st.cs),
%%
%% Authorize local message and retrieve a certificate / signature
%% that will be accepted by the receiving node that will deliver
@@ -468,7 +468,7 @@ handle_call({ rvi, handle_local_message,
%% If the timeout is more than 24 hrs old when parsed as unix time,
%% then we are looking at a relative msec timeout. Convert accordingly
%%
- { Mega, Sec, _Micro } = now(),
+ { Mega, Sec, _Micro } = os:timestamp(),
Now = Mega * 1000000 + Sec,
Timeout =
@@ -746,11 +746,34 @@ announce_service_availability(Available, SvcName) ->
end
end, BlockURLs, ?SERVICE_TABLE).
+start_log(Info, Fmt, Args, CS) ->
+ ?debug("start_log(~p,~p,~p,~p)", [Info,Fmt,Args,CS]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Info) of
+ {ok, ID} ->
+ start_log_(ID, Fmt, Args, CS);
+ {error, _} ->
+ start_log(Fmt, Args, CS)
+ end.
+
+start_log(Fmt, Args, CS) ->
+ start_log_(rvi_log:new_id(pfx()), Fmt, Args, CS).
+
+start_log_(ID, Fmt, Args, CS) ->
+ CS1 = rvi_common:set_value(rvi_log_id, ID, CS),
+ log(Fmt, Args, CS1),
+ CS1.
-start_log(Pfx, Fmt, Args, CS) ->
- LogTID = rvi_log:new_id(Pfx),
- rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)),
- rvi_common:set_value(rvi_log_id, LogTID, CS).
+pfx() ->
+ <<"svc_edge">>.
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS).
+ rvi_log:flog(Fmt, Args, pfx(), CS).
+
+log_id_json_tail(Args) ->
+ ?debug("Args = ~p", [Args]),
+ case rvi_common:get_json_element([<<"rvi_log_id">>], Args) of
+ {ok, ID} ->
+ [{<<"rvi_log_id">>, ID}];
+ {error, _} ->
+ []
+ end.
diff --git a/deps/exo/src/exo_http_server.erl b/deps/exo/src/exo_http_server.erl
index ae19faa..15c8d06 100644
--- a/deps/exo/src/exo_http_server.erl
+++ b/deps/exo/src/exo_http_server.erl
@@ -68,7 +68,9 @@ start(Port, ServerOptions) ->
[{active,once},{reuseaddr,true},
{verify, verify_none},
{keyfile, filename:join(Dir, "host.key")},
- {certfile, filename:join(Dir, "host.cert")}],
+ {certfile, filename:join(Dir, "host.cert")},
+ {upgrade_timeout, 5000},
+ {accept_timeout, 5000}],
?MODULE, ServerOptions).
%%-----------------------------------------------------------------------------
diff --git a/deps/exo/src/exo_socket.erl b/deps/exo/src/exo_socket.erl
index dc961ac..6967a40 100644
--- a/deps/exo/src/exo_socket.erl
+++ b/deps/exo/src/exo_socket.erl
@@ -9,7 +9,7 @@
%%%---- END COPYRIGHT ---------------------------------------------------------
%%% @author Tony Rogvall <tony@rogvall.se>
%%% @doc
-%%% EXO socket
+%%% EXO socket
%%% @end
%%% Created : 15 Dec 2011 by Tony Rogvall <tony@rogvall.se>
@@ -17,7 +17,7 @@
-export([listen/1, listen/2, listen/3]).
--export([accept/1, accept/2]).
+-export([accept/1, accept/2, accept/3]).
-export([async_accept/1, async_accept/2]).
-export([connect/2, connect/3, connect/4, connect/5]).
%% -export([async_connect/2, async_connect/3, async_connect/4]).
@@ -85,7 +85,7 @@ listen(Port, Protos=[tcp|_], Opts0) ->
Error
end.
-%%
+%%
%%
%%
connect(Host, Port) ->
@@ -108,7 +108,7 @@ connect(Host, Port, Protos=[tcp|_], Opts0, Timeout) ->
TcpConnectOpts = [{active,false},{packet,0},{mode,binary}|TcpOpts1],
case gen_tcp:connect(Host, Port, TcpConnectOpts, Timeout) of
{ok, S} ->
- X =
+ X =
#exo_socket { mdata = gen_tcp,
mctl = inet,
protocol = Protos,
@@ -214,11 +214,11 @@ connect_upgrade(X, Protos0, Timeout) ->
{SSLOpts0,Opts1} = split_options(ssl_connect_opts(),Opts),
{_,SSLOpts} = split_options([ssl_imp], SSLOpts0),
?dbg("SSL upgrade, options = ~w\n", [SSLOpts]),
- ?dbg("exo_socket: before ssl:connect opts=~w\n",
+ ?dbg("exo_socket: before ssl:connect opts=~w\n",
[getopts(X, [active,packet,mode])]),
case ssl_connect(X#exo_socket.socket, SSLOpts, Timeout) of
{ok,S1} ->
- ?dbg("exo_socket: ssl:connect opt=~w\n",
+ ?dbg("exo_socket: ssl:connect opt=~w\n",
[ssl:getopts(S1, [active,packet,mode])]),
X1 = X#exo_socket { socket=S1,
mdata = ssl,
@@ -227,25 +227,25 @@ connect_upgrade(X, Protos0, Timeout) ->
tags={ssl,ssl_closed,ssl_error}},
connect_upgrade(X1, Protos1, Timeout);
Error={error,_Reason} ->
- ?dbg("exo_socket: ssl:connect error=~w\n",
+ ?dbg("exo_socket: ssl:connect error=~w\n",
[_Reason]),
Error
end;
[http|Protos1] ->
{_, Close,Error} = X#exo_socket.tags,
- X1 = X#exo_socket { packet = http,
+ X1 = X#exo_socket { packet = http,
tags = {http, Close, Error }},
connect_upgrade(X1, Protos1, Timeout);
[] ->
setopts(X, [{mode,X#exo_socket.mode},
{packet,X#exo_socket.packet},
{active,X#exo_socket.active}]),
- ?dbg("exo_socket: after upgrade opts=~w\n",
+ ?dbg("exo_socket: after upgrade opts=~w\n",
[getopts(X, [active,packet,mode])]),
{ok,X}
end.
-
-ssl_connect(Socket, Options, Timeout) ->
+
+ssl_connect(Socket, Options, Timeout) ->
case ssl:connect(Socket, Options, Timeout) of
{error, ssl_not_started} ->
ssl:start(),
@@ -288,28 +288,39 @@ async_socket(Listen, Socket, AuthOpts)
ok ->
{ok,Mod} = inet_db:lookup_socket(Listen#exo_socket.socket),
inet_db:register_socket(Socket, Mod),
- X = Listen#exo_socket { transport=Socket, socket=Socket },
- maybe_auth(
- accept_upgrade(X, tl(X#exo_socket.protocol), infinity),
- server,
- X#exo_socket.opts ++ AuthOpts);
+ {ok, Listen#exo_socket { transport=Socket, socket=Socket,
+ opts = Listen#exo_socket.opts ++ AuthOpts }};
Error ->
prim_inet:close(Socket),
Error
end;
Error ->
+ ?debug("getopts() -> ~p", [Error]),
prim_inet:close(Socket),
Error
end.
-
accept(X) when is_record(X, exo_socket) ->
- accept_upgrade(X, X#exo_socket.protocol, infinity).
-
-accept(X, Timeout) when
- is_record(X, exo_socket),
+ Timeout = proplists:get_value(accept_timeout, X#exo_socket.opts, infinity),
+ maybe_auth(
+ accept_upgrade(X, X#exo_socket.protocol, Timeout),
+ server,
+ X#exo_socket.opts).
+
+accept(X, Timeout) when
+ is_record(X, exo_socket),
(Timeout =:= infnity orelse (is_integer(Timeout) andalso Timeout >= 0)) ->
- accept_upgrade(X, X#exo_socket.protocol, Timeout).
+ maybe_auth(
+ accept_upgrade(X, X#exo_socket.protocol, Timeout),
+ server,
+ X#exo_socket.opts).
+
+accept(X, Protos, Timeout) when
+ is_record(X, exo_socket) ->
+ maybe_auth(
+ accept_upgrade(X, Protos, Timeout),
+ server,
+ X#exo_socket.opts).
accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
?dbg("exo_socket: accept protos=~w\n", [Protos0]),
@@ -327,11 +338,11 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
{SSLOpts0,Opts1} = split_options(ssl_listen_opts(),Opts),
{_,SSLOpts} = split_options([ssl_imp], SSLOpts0),
?dbg("SSL upgrade, options = ~w\n", [SSLOpts]),
- ?dbg("exo_socket: before ssl_accept opt=~w\n",
+ ?dbg("exo_socket: before ssl_accept opt=~w\n",
[getopts(X, [active,packet,mode])]),
case ssl_accept(X#exo_socket.socket, SSLOpts, Timeout) of
{ok,S1} ->
- ?dbg("exo_socket: ssl_accept opt=~w\n",
+ ?dbg("exo_socket: ssl_accept opt=~w\n",
[ssl:getopts(S1, [active,packet,mode])]),
X1 = X#exo_socket{socket=S1,
mdata = ssl,
@@ -340,7 +351,7 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
tags={ssl,ssl_closed,ssl_error}},
accept_upgrade(X1, Protos1, Timeout);
Error={error,_Reason} ->
- ?dbg("exo_socket: ssl:ssl_accept error=~w\n",
+ ?dbg("exo_socket: ssl:ssl_accept error=~w\n",
[_Reason]),
Error
end;
@@ -348,14 +359,14 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
accept_probe_ssl(X,Protos1,Timeout);
[http|Protos1] ->
{_, Close,Error} = X#exo_socket.tags,
- X1 = X#exo_socket { packet = http,
+ X1 = X#exo_socket { packet = http,
tags = {http, Close, Error }},
accept_upgrade(X1,Protos1,Timeout);
[] ->
setopts(X, [{mode,X#exo_socket.mode},
{packet,X#exo_socket.packet},
{active,X#exo_socket.active}]),
- ?dbg("exo_socket: after upgrade opts=~w\n",
+ ?dbg("exo_socket: after upgrade opts=~w\n",
[getopts(X, [active,packet,mode])]),
{ok,X}
end.
@@ -393,7 +404,7 @@ accept_probe_ssl(X=#exo_socket { mdata=M, socket=S,
Error
end.
-ssl_accept(Socket, Options, Timeout) ->
+ssl_accept(Socket, Options, Timeout) ->
case ssl:ssl_accept(Socket, Options, Timeout) of
{error, ssl_not_started} ->
ssl:start(),
@@ -417,7 +428,7 @@ request_type(<<ContentType:8, _Version:16, _Length:16, _/binary>>) ->
end;
request_type(_) ->
undefined.
-
+
%%
%% exo_socket wrapper for socket operations
%%
@@ -426,7 +437,7 @@ close(#exo_socket { mdata = M, socket = S}) ->
shutdown(#exo_socket { mdata = M, socket = S}, How) ->
M:shutdown(S, How).
-
+
send(#exo_socket { mdata = M,socket = S, mauth = A,auth_state = Sa} = X, Data) ->
if A == undefined ->
M:send(S, Data);
@@ -520,11 +531,11 @@ tcp_connect_options() ->
ssl_listen_opts() ->
- [versions, verify, verify_fun,
+ [versions, verify, verify_fun,
fail_if_no_peer_cert, verify_client_once,
- depth, cert, certfile, key, keyfile,
+ depth, cert, certfile, key, keyfile,
password, cacerts, cacertfile, dh, dhfile, cihpers,
- %% deprecated soon
+ %% deprecated soon
ssl_imp, %% always new!
%% server
verify_client_once,
@@ -533,9 +544,9 @@ ssl_listen_opts() ->
debug, hibernate_after, erl_dist ].
ssl_connect_opts() ->
- [versions, verify, verify_fun,
+ [versions, verify, verify_fun,
fail_if_no_peer_cert,
- depth, cert, certfile, key, keyfile,
+ depth, cert, certfile, key, keyfile,
password, cacerts, cacertfile, dh, dhfile, cihpers,
debug].
diff --git a/deps/exo/src/exo_socket_server.erl b/deps/exo/src/exo_socket_server.erl
index 08a3e4c..db67c5a 100644
--- a/deps/exo/src/exo_socket_server.erl
+++ b/deps/exo/src/exo_socket_server.erl
@@ -20,7 +20,7 @@
%%
%% methods
-%% init(Socket, Args) ->
+%% init(Socket, Args) ->
%% {ok, State'}
%% {stop, Reason, State'}
%%
@@ -30,7 +30,7 @@
%%
%% close(Socket, State) ->
%% {ok, State'}
-%%
+%%
%% error(Socket, Error, State) ->
%% {ok, State'}
%% {stop, Reason, State'}
@@ -53,7 +53,7 @@
%% -define(debug(Fmt,Args), ok).
%% -define(error(Fmt,Args), error_logger:format(Fmt, Args)).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(state, {
listen, %% #exo_socket{}
@@ -102,15 +102,15 @@
-spec behaviour_info(callbacks) -> list().
behaviour_info(callbacks) ->
[
- {init, 2}, %% init(Socket::socket(), Args::[term()]
+ {init, 2}, %% init(Socket::socket(), Args::[term()]
%% -> {ok,state()} | {stop,reason(),state()}
- {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state())
+ {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state())
%% -> {ok,state()}|{close,state()}|{stop,reason(),state()}
{close, 2}, %% close(Socket::socket(), State::state())
%% -> {ok,state()}
{error, 3}, %% error(Socket::socket(),Error::error(), State:state())
%% -> {ok,state()} | {stop,reason(),state()}
- {control, 4} %% control(Socket::socket(), Request::term(),
+ {control, 4} %% control(Socket::socket(), Request::term(),
%% From::term(), State:state())
%% -> {reply, Reply::term(),state()} | {noreply, state()} |
%% {ignore, state()} | {send, Bin::binary(), state()} |
@@ -175,15 +175,15 @@ init([Port,Protos,Options,Module,Args] = _X) ->
{ok,Listen} ->
case exo_socket:async_accept(Listen) of
{ok, Ref} ->
- {ok, #state{ listen = Listen,
- active = Active,
+ {ok, #state{ listen = Listen,
+ active = Active,
socket_reuse = Reuse,
ref=Ref,
- module=Module,
+ module=Module,
args=Args
}};
{error, Reason} ->
- {stop,Reason}
+ {stop,Reason}
end;
{error,Reason} ->
{stop,Reason}
@@ -196,8 +196,8 @@ init([Port,Protos,Options,Module,Args] = _X) ->
%%
%% @end
%%--------------------------------------------------------------------
--spec handle_call(Request::term(),
- From::{pid(), Tag::term()},
+-spec handle_call(Request::term(),
+ From::{pid(), Tag::term()},
State::#state{}) ->
{reply, Reply::term(), State::#state{}} |
{noreply, State::#state{}} |
@@ -271,28 +271,34 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when
+handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when
(State#state.listen)#exo_socket.socket =:= LSocket,
Ref =:= State#state.ref ->
?debug("<-- ~p~n", [_Msg]),
Listen = State#state.listen,
+ AcceptTimeout = proplists:get_value(accept_timeout, Listen#exo_socket.opts, infinity),
NewAccept = exo_socket:async_accept(Listen),
- case exo_socket:async_socket(Listen, Socket, [delay_auth]) of
+ case exo_socket:async_socket(Listen, Socket, [{delay_auth, true}]) of
{ok, XSocket} ->
- case exo_socket_session:start(XSocket,
+ F = fun() ->
+ exo_socket:accept(
+ XSocket, tl(XSocket#exo_socket.protocol), AcceptTimeout)
+ end,
+ XSocketFun = {XSocket, F},
+ case exo_socket_session:start(XSocketFun,
State#state.module,
State#state.args) of
- {ok,Pid} ->
+ {ok, Pid} ->
exo_socket:controlling_process(XSocket, Pid),
gen_server:cast(Pid, {activate,State#state.active});
_Error ->
exo_socket:close(XSocket)
end;
- _Error ->
- error
+ _Error ->
+ error
end,
case NewAccept of
- {ok,Ref1} ->
+ {ok, Ref1} ->
{noreply, State#state { ref = Ref1 }};
{error, Reason} ->
{stop, Reason, State}
@@ -436,4 +442,3 @@ send_reuse_message(Host, Port, Args, M, MyPort, XSocket, RUSt) ->
ReuseMsg = exo_socket_session:encode_reuse(
MyPort, ReuseOpts),
exo_socket:send(XSocket, ReuseMsg).
-
diff --git a/deps/exo/src/exo_socket_session.erl b/deps/exo/src/exo_socket_session.erl
index 623b301..f4518db 100644
--- a/deps/exo/src/exo_socket_session.erl
+++ b/deps/exo/src/exo_socket_session.erl
@@ -19,21 +19,21 @@
-behaviour(gen_server).
%% API
--export([start/3,
+-export([start/3,
start_link/3]).
%% gen_server callbacks
--export([init/1,
- handle_call/3,
- handle_cast/2,
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
handle_info/2,
- terminate/2,
+ terminate/2,
code_change/3]).
--export([encode_reuse/2,
+-export([encode_reuse/2,
decode_reuse_config/1]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(state, {
module,
@@ -90,6 +90,7 @@ start(XSocket, Module, Args) ->
%% @end
%%--------------------------------------------------------------------
init([XSocket, Module, Args]) ->
+ ?debug("init(~p, ~p, ~p)", [XSocket, Module, Args]),
{ok, #state{ socket=XSocket,
module=Module,
args=Args,
@@ -104,23 +105,23 @@ init([XSocket, Module, Args]) ->
%%
%% @end
%%--------------------------------------------------------------------
--spec handle_call(Request::term(),
- From::{pid(), Tag::term()},
+-spec handle_call(Request::term(),
+ From::{pid(), Tag::term()},
State::#state{}) ->
{reply, Reply::term(), State::#state{}} |
{noreply, State::#state{}} |
{stop, Reason::atom(), Reply::term(), State::#state{}}.
%% No 'local' handle_call
-handle_call(Request, From,
+handle_call(Request, From,
State=#state{module = M, state = MSt, socket = Socket}) ->
?dbg("handle_call: ~p", [Request]),
try M:control(Socket, Request, From, MSt) of
- Result ->
+ Result ->
?dbg("handle_call: reply ~p", [Result]),
mod_reply(Result, From, State)
catch
- error:_Error ->
+ error:_Error ->
?dbg("handle_call: catch reason ~p", [_Error]),
ret({reply, {error, unknown_call}, State})
end.
@@ -172,40 +173,24 @@ send_(Bin, From, #state{socket = S, pending = P} = State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({activate,Active}, State0) ->
+handle_cast({activate,Active}, #state{socket = XSocket0} = State0) ->
?dbg("activate~n", []),
- try exo_socket:authenticate(State0#state.socket) of
- {ok, S} ->
- ?dbg("authentication done~n", []),
- State = State0#state{socket = S},
- case apply(State#state.module, init,
- [State#state.socket,State#state.args]) of
- Ok when element(1, Ok) == ok ->
- CSt0 = element(2, Ok),
- %% enable active mode here (if ever wanted) once is handled,
- %% automatically anyway. exit_on_close is default and
- %% allow session statistics retrieval in the close callback
- SessionOpts = [{active,Active},{exit_on_close, false}],
-
- _Res = exo_socket:setopts(State#state.socket, SessionOpts),
- ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]),
- State1 = State#state { active = Active, state = CSt0 },
- case Ok of
- {_, _, Timeout} ->
- ret({noreply, State1, Timeout});
- {_, _} ->
- ret({noreply, State1})
- end;
- {stop,Reason,CSt1} ->
- {stop, Reason, State#state { state = CSt1 }}
+ case XSocket0 of
+ {#exo_socket{}, Fun} when is_function(Fun, 0) ->
+ try Fun() of
+ {ok, XSocket} ->
+ activate_(Active, State0#state{socket = XSocket});
+ {error, _} = Error ->
+ ?debug("socket fun -> ~p", [Error]),
+ {stop, Error, State0}
+ catch
+ Cat:Exception ->
+ ?debug("caught ~p:~p from socket fun", [Cat, Exception]),
+ {stop, Exception, State0}
end;
- {error, Reason} ->
- {stop, {auth_failure, Reason}, State0}
- catch
- error:Crash ->
- {stop, {auth_failure, Crash}, State0}
+ #exo_socket{}->
+ activate_(Active, State0)
end;
-
handle_cast(_Msg, State) ->
ret({noreply, State}).
@@ -223,9 +208,9 @@ handle_info(timeout, State) ->
exo_socket:shutdown(State#state.socket, write),
?dbg("exo_socket_session: idle_timeout~p~n", [self()]),
{stop, normal, State};
-handle_info({Tag,Socket,Data0}, State) when
+handle_info({Tag,Socket,Data0}, State) when
%% FIXME: put socket tag in State for correct matching
- (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http),
+ (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http),
Socket =:= (State#state.socket)#exo_socket.socket ->
?dbg("exo_socket_session: got data ~p\n", [{Tag,Socket,Data0}]),
try exo_socket:auth_incoming(State#state.socket, Data0) of
@@ -247,7 +232,7 @@ handle_info({Tag,Socket}, State) when
{ok,CSt1} ->
{stop, normal, State#state { state = CSt1 }}
end;
-handle_info({Tag,Socket,Error}, State) when
+handle_info({Tag,Socket,Error}, State) when
(Tag =:= tcp_error orelse Tag =:= ssl_error),
Socket =:= (State#state.socket)#exo_socket.socket ->
?dbg("exo_socket_session: got error ~p\n", [{Tag,Socket,Error}]),
@@ -258,7 +243,7 @@ handle_info({Tag,Socket,Error}, State) when
{stop,Reason,CSt1} ->
{stop, Reason, State#state { state = CSt1 }}
end;
-
+
handle_info(_Info, State) ->
?dbg("Got info: ~p\n", [_Info]),
ret({noreply, State}).
@@ -275,9 +260,14 @@ handle_info(_Info, State) ->
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- exo_socket:close(State#state.socket),
+ socket_close(State#state.socket),
ok.
+socket_close({#exo_socket{} = S, _}) ->
+ exo_socket:close(S);
+socket_close(#exo_socket{} = S) ->
+ exo_socket:close(S).
+
%%--------------------------------------------------------------------
%% @private
%% @doc
@@ -293,6 +283,40 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+activate_(Active, State0) ->
+ try exo_socket:authenticate(State0#state.socket) of
+ {ok, S} ->
+ ?dbg("authentication done~n", []),
+ State = State0#state{socket = S},
+ case apply(State#state.module, init,
+ [State#state.socket,State#state.args]) of
+ Ok when element(1, Ok) == ok ->
+ CSt0 = element(2, Ok),
+ %% enable active mode here (if ever wanted) once is handled,
+ %% automatically anyway. exit_on_close is default and
+ %% allow session statistics retrieval in the close callback
+ SessionOpts = [{active,Active},{exit_on_close, false}],
+
+ _Res = exo_socket:setopts(State#state.socket, SessionOpts),
+ ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]),
+ State1 = State#state { active = Active, state = CSt0 },
+ case Ok of
+ {_, _, Timeout} ->
+ ret({noreply, State1, Timeout});
+ {_, _} ->
+ ret({noreply, State1})
+ end;
+ {stop,Reason,CSt1} ->
+ {stop, Reason, State#state { state = CSt1 }}
+ end;
+ {error, Reason} ->
+ {stop, {auth_failure, Reason}, State0}
+ catch
+ error:Crash ->
+ {stop, {auth_failure, Crash}, State0}
+ end.
+
+
ret({noreply, #state{idle_timeout = T} = S}) ->
if T==undefined -> {noreply, S};
true -> {noreply, S, T}
@@ -333,7 +357,7 @@ handle_reuse_data(Rest, #state{module = M, state = MSt} = State) ->
handle_socket_data(Data, State) ->
CSt0 = State#state.state,
- ModResult = apply(State#state.module, data,
+ ModResult = apply(State#state.module, data,
[State#state.socket,Data,CSt0]),
?dbg("handle_socket_data: result ~p", [ModResult]),
handle_module_result(ModResult, State).
diff --git a/doc/rvi_protocol.md b/doc/rvi_protocol.md
index ab1de75..e59c0ac 100644
--- a/doc/rvi_protocol.md
+++ b/doc/rvi_protocol.md
@@ -134,7 +134,7 @@ functions are (for now) outside the scope of the RVI Core protocol.
## Double connect resolution
There is a risk that two parties try to initiate a connection to each
other in a race condition, creating two connections between them, as
-shown below.
+shown below.
Connection 1 | Connection 2
:------:|:------:
@@ -402,6 +402,8 @@ Parameter | Required | Description
--device\_cert | Yes | The PEM-encoded device X.509 certificate to embed into the credential as the device_cert member.
--invoke | Yes | Space separated list (within quotes) of RVI service prefixes that the owner of the credential has the right to invoke.
--register | Yes | Space separated list (within quotes) of RVI service prefixes that the owner of the credential has the right to register for others to call (with the right credential).
+--start | No | The Unix timestamps when the credential becomes active.
+--stop | No | The Unix timestamps when the credential becomes inactive.
The generated ```insecure_credential.json```
and ```insecure_credential.jwt``` are checked into ```priv/sample_credentials```.
@@ -616,7 +618,7 @@ certificates with a private root key. Devices with the corresponding
public root key will be able to authenticate signed device keys and
authorize signed certificates.
-### Root key
+### Root key
A root key, a 4096+ bit RSA key pair, is generated once for an issuer
of certificates. The private key is stored in the certificate
issuer's servers and is not shared. The public key is manually
@@ -641,12 +643,12 @@ and is not shared. The public part of the key is used in two ways:
Embdded in the authenticate packet are one or more certificates
proving the sending RVI node's right to register and invoke
services. The certificate, signed by the private root key of the
- issuer, contains the public key of the sending device encoded as
+ issuer, contains the public key of the sending device encoded as
JWT structure. This public device key can be used by a
receiver to verify the signature of a service invocation requests
sent by the remote RVI node.
-### Certificate
+### Certificate
A certificate is a JSON Web Token, signed by the private root key of
the certificate issuer, that proves that the RVI node with a given
@@ -782,7 +784,7 @@ private root key stored in ```my_root_key_priv.pem```, generated above.
The RVI node itself is never aware of that file.
The RVI node does need the public root key, stored in ```my_root_key_pub.pem```,
-is referenced from the RVI's configuration file stored
+is referenced from the RVI's configuration file stored
as ```{ rvi_core, { provisioning_key, "..../my_root_key_pub.pem" }}```.
@@ -812,20 +814,20 @@ Once executed, three files will be created:
1. **```my_device_key_priv.pem```**<br>
This file contains the private/public key pair that must never leave
- the device's trusted environment. It will be used to sign
+ the device's trusted environment. It will be used to sign
outgoing service invocation request.
2. **```my_device_key_pub.pem```**<br>
- This file contains the public-only key that is to be added to
+ This file contains the public-only key that is to be added to
certificates issued for the device by a certificate issuer.
-
+
3. **```my_device_key_pub_sign.jwt```**<br>
This file contains the public-only key, signed by the root key,
that is to be provided as authentication when an RVI node identifies
itself toward another. The file is stored in JSON Web Token format.
-#### Configuring RVI to use a device key
+#### Configuring RVI to use a device key
The RVI node needs the device private/public key root key, stored in
```my_device_key_priv.pem```, is referenced from the RVI's configuration
@@ -901,9 +903,9 @@ Once executed, one mandatory and one optional file will be created:
This file contains the generated certificate, signed by the
private root key specified by ```--root_key=```. The content
of this file will be provided by an RVI node to prove its righ
- to register and invoke services toward remote RVI nodes
+ to register and invoke services toward remote RVI nodes
+
-
2. **```my_cert.json```**<br>
Only created if ```--cert_out=``` has been give. Contains a human
readable JSON form of the generated root key.
@@ -931,10 +933,10 @@ the control of the provisioning server.
An device-specific key pair is generated by device and stored locally.
The app has one pre-provisioned certificate, signed by the
-root server, allowing it to invoke ```jlr.com/provisioning/init_setup```
+root server, allowing it to invoke ```jlr.com/provisioning/init_setup```
and ```jlr.com/provisioning/request_provisioning```. The certificate also
-provides the right to register ```jlr.com/mobile/*/dm/cert_provision```
-and ```jlr.com/mobile/*/dm/signed_pub_key_provision```
+provides the right to register ```jlr.com/mobile/*/dm/cert_provision```
+and ```jlr.com/mobile/*/dm/signed_pub_key_provision```
The certificate keys section, normally holding public device
keys, is empty.
@@ -954,7 +956,7 @@ setup an initial contact.
The command contains no key, only a single pre-provisioned node certificate giving
the device the right to invoke and register the functions listed in
above.<br>
-
+
3. Server sends authenticate to device<br>
The server's public device key, signed by the root private key, is
sent together with no node certificates, thus giving the server no
@@ -1116,7 +1118,7 @@ state through a invocation to ```jlr.com/mobile/1234/confirm_unlock```
### Thwarting self-provisioning process - Replay TBD.
-The provisioning server, having matched the side band address (MSISDN) against an internal database of devices and their access rights, will create a specific certificate only for that device. 
+The provisioning server, having matched the side band address (MSISDN) against an internal database of devices and their access rights, will create a specific certificate only for that device.
Given that the side band network has not been compromised, I can't see how a MITM / replay attack can give a remote remote attacker the ability to gain access of the root-signed public device key and/or use a certificate.
diff --git a/priv/config/rvi_backend.config b/priv/config/rvi_backend.config
index 4c2ed57..9d5279f 100644
--- a/priv/config/rvi_backend.config
+++ b/priv/config/rvi_backend.config
@@ -42,12 +42,6 @@ LogLevel = Env("RVI_LOGLEVEL", notice).
[
{rvi_core,
[
-
- {key_pair, {openssl_pem, "test_keys/insecure_device_key_priv.pem"}},
- {provisioning_key, "test_keys/insecure_root_key_pub.pem"},
- {authorize_jwt, "test_keys/insecure_device_key_pub_sign.jwt"},
- {cert_dir, "test_certs"},
-
{ node_address, IPPort(MyIP, MyPort+7) },
{ node_service_prefix, "jlr.com/backend"},
diff --git a/priv/config/rvi_common.config b/priv/config/rvi_common.config
index a01d57d..87563e7 100644
--- a/priv/config/rvi_common.config
+++ b/priv/config/rvi_common.config
@@ -115,6 +115,10 @@ LogLevel = Env("RVI_LOGLEVEL", info).
%% },
{rvi_core,
[
+ {device_key, "$PRIV_DIR/sample_keys/insecure_device_key.pem"},
+ {device_cert, "$PRIV_DIR/sample_certificates/insecure_device_cert.crt"},
+ {root_cert, "$PRIV_DIR/sample_certificates/insecure_root_cert.crt"},
+ {cred_dir, "$PRIV_DIR/sample_credentials"}
]}
]}
].
diff --git a/priv/config/rvi_sample.config b/priv/config/rvi_sample.config
index 6e7863a..95cd5f3 100644
--- a/priv/config/rvi_sample.config
+++ b/priv/config/rvi_sample.config
@@ -23,14 +23,14 @@ Env = fun(V, Def) ->
Str -> Str
end
end.
+IPPort = fun(IP, Port) ->
+ IP ++ ":" ++ integer_to_list(Port)
+ end.
MyPort = Env("RVI_PORT", 9000).
MyIP = Env("RVI_MYIP", "127.0.0.1").
-MyNodeAddr = Env("RVI_MY_NODE_ADDR", "0.0.0.0:0").
+MyNodeAddr = Env("RVI_MY_NODE_ADDR", IPPort(MyIP, MyPort)).
BackendIP = Env("RVI_BACKEND", "38.129.64.31").
BackendPort = Env("RVI_BACKEND_PORT", 8807).
-IPPort = fun(IP, Port) ->
- IP ++ ":" ++ integer_to_list(Port)
- end.
LogLevel = Env("RVI_LOGLEVEL", notice).
[
@@ -112,12 +112,6 @@ LogLevel = Env("RVI_LOGLEVEL", notice).
{rvi_core,
[
-
- {key_pair, {openssl_pem, "rvi_core/priv/sample_keys/insecure_device_bkey_priv.pem"}},
- {provisioning_key, "rvi_core/priv/sample_keys/insecure_root_key_pub.pem"},
- {authorize_jwt, "rvi_core/priv/sample_keys/insecure_device_key_pub_sign.jwt"},
- {cert_dir, "rvi_core/priv/sample_certs"},
-
%% Specify the node address that data_link uses to listen to
%% incoming traffic from other rvi nodes.
%%
diff --git a/priv/test_config/backend.config b/priv/test_config/backend.config
index fe60836..0f49784 100644
--- a/priv/test_config/backend.config
+++ b/priv/test_config/backend.config
@@ -5,13 +5,11 @@
{set_env,
[
{rvi_core,
- [{key_pair, {openssl_pem, filename:join(
- CurDir, "../basic_backend_keys/dev_priv.pem")}},
- {provisioning_key,
- {openssl_pem, filename:join(CurDir, "../root_keys/root_pub.pem")}},
- {authorize_jwt, filename:join(
- CurDir, "../basic_backend_keys/dev_pub_sign.jwt")},
- {cert_dir, filename:join(CurDir, "../basic_backend_certs")}
+ [{device_key, "$HOME/../../basic_backend_keys/device_key.pem"},
+ {provisioning_key, "$HOME/../../root_keys/root_key.pem"},
+ {root_cert, "$HOME/../../root_keys/root_cert.crt"},
+ {device_cert, "$HOME/../../basic_backend_keys/device_cert.crt"},
+ {cred_dir, "$HOME/../../basic_backend_creds"}
]}
]}
].
diff --git a/priv/test_config/bt_backend.config b/priv/test_config/bt_backend.config
index dfa7645..e5d207f 100644
--- a/priv/test_config/bt_backend.config
+++ b/priv/test_config/bt_backend.config
@@ -7,7 +7,7 @@
[
{rvi_core,
[
- { [routing_rules, ""], [{proto_json, dlink_bt_rpc}] },
+ { [routing_rules, ""], [{proto_json_rpc, dlink_bt_rpc}] },
{ [components, data_link], [{dlink_bt_rpc, gen_server,
[{server_opts, [{test_mode, tcp},
{port, 8007}]}]}]}
diff --git a/priv/test_config/bt_sample.config b/priv/test_config/bt_sample.config
index 0159992..962f8a3 100644
--- a/priv/test_config/bt_sample.config
+++ b/priv/test_config/bt_sample.config
@@ -6,7 +6,7 @@
[
{rvi_core,
[
- { [routing_rules, ""], {proto_json, dlink_bt_rpc} },
+ { [routing_rules, ""], {proto_json_rpc, dlink_bt_rpc} },
{ [components, data_link], [{dlink_bt_rpc, gen_server,
[{server_opts, [{test_mode, tcp},
{port, 9007}]},
diff --git a/priv/test_config/sample.config b/priv/test_config/sample.config
index 6a7c41c..ea84a30 100644
--- a/priv/test_config/sample.config
+++ b/priv/test_config/sample.config
@@ -2,19 +2,15 @@
{ok, CurDir} = file:get_cwd().
[
{include_lib, "rvi_core/priv/config/rvi_sample.config"},
- {remove_apps, [bt, dlink_bt]},
{set_env,
[
{rvi_core,
[
{node_service_prefix, "jlr.com/vin/abc"},
- {key_pair, {openssl_pem, filename:join(
- CurDir, "../basic_sample_keys/dev_priv.pem")}},
- {provisioning_key,
- {openssl_pem, filename:join(CurDir, "../root_keys/root_pub.pem")}},
- {authorize_jwt, filename:join(
- CurDir, "../basic_sample_keys/dev_pub_sign.jwt")},
- {cert_dir, filename:join(CurDir, "../basic_sample_certs")}
+ {device_key, "$HOME/../../basic_sample_keys/device_key.pem"},
+ {root_cert, "$HOME/../../root_keys/root_cert.crt"},
+ {device_cert, "$HOME/../../basic_sample_keys/device_cert.crt"},
+ {cred_dir, "$HOME/../../basic_sample_creds"}
]}
]}
].
diff --git a/src/rvi_server.erl b/src/rvi_server.erl
index 6aa0d8c..683ce32 100644
--- a/src/rvi_server.erl
+++ b/src/rvi_server.erl
@@ -1,6 +1,9 @@
-module(rvi_server).
-behaviour(gen_server).
+-export([ensure_ready/1,
+ ensure_ready/2]).
+
-export([start_link/0,
await/0,
info/0,
@@ -17,17 +20,24 @@
-include_lib("lager/include/log.hrl").
-record(st, {wait_for = [],
+ tref,
ready = []}).
+ensure_ready(Timeout) ->
+ ensure_ready(rvi_core, Timeout).
+
+ensure_ready(App, Timeout) ->
+ gproc:await({n,l,App}, Timeout).
+
start_link() ->
- gen_server:start_link({local,?MODULE}, ?MODULE, [], [{debug,[trace]}]).
+ gen_server:start_link({local,?MODULE}, ?MODULE, [], []).
init([]) ->
WaitFor = lists:flatmap(
fun({_, Names}) ->
Names
end, setup:find_env_vars(rvi_core_await)),
- {ok, #st{wait_for = WaitFor}}.
+ {ok, start_timer(#st{wait_for = WaitFor})}.
await() ->
call(await).
@@ -60,12 +70,22 @@ handle_cast(_, S) ->
handle_info({gproc, _, registered, {Key, _, _}}, #st{wait_for = WF,
ready = Ready} = S) ->
WF1 = WF -- [Key],
+ S1 = S#st{ready = [Key | Ready], wait_for = WF1},
if WF1 == [] andalso WF =/= [] ->
- rvi_common:announce({n, l, rvi_core});
+ rvi_common:announce({n, l, rvi_core}),
+ {noreply, cancel_timer(S1)};
true ->
- ok
- end,
- {noreply, S#st{ready = [Key | Ready], wait_for = WF1}};
+ {noreply, S1}
+ end;
+handle_info({timeout, TRef, timeout}, #st{tref = TRef,
+ wait_for = WF} = S) ->
+ case WF of
+ [] ->
+ {noreply, S#st{tref = undefined}};
+ [_|_] ->
+ ?warning("Still waiting for ~p", [WF]),
+ {noreply, start_timer(S)}
+ end;
handle_info(_, S) ->
{noreply, S}.
@@ -78,3 +98,13 @@ code_change(_FromVsn, S, _Extra) ->
call(Req) ->
gen_server:call(?MODULE, Req).
+
+start_timer(S) ->
+ TRef = erlang:start_timer(timer:seconds(10), self(), timeout),
+ S#st{tref = TRef}.
+
+cancel_timer(#st{tref = undefined} = S) ->
+ S;
+cancel_timer(#st{tref = TRef} = S) ->
+ erlang:cancel_timer(TRef),
+ S#st{tref = undefined}.
diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl
index 41aa15a..9b13f28 100644
--- a/test/rvi_core_SUITE.erl
+++ b/test/rvi_core_SUITE.erl
@@ -143,18 +143,20 @@ end_per_testcase(Case, _Config) ->
t_backend_keys_and_cert(Config) ->
RootKeyDir = ensure_dir(root_keys()),
- cmd([scripts(),"/rvi_create_root_key.sh -o ",
- RootKeyDir, "/root -b 2048"]),
+ cmd(["openssl genrsa -out ", RootKeyDir, "/root_key.pem 1024"]),
+ cmd(["openssl req -x509 -batch -new -nodes -batch"
+ " -key ", RootKeyDir, "/root_key.pem",
+ " -days 365 -out ", RootKeyDir, "/root_cert.crt"]),
Dir = ensure_dir("basic_backend_keys"),
generate_device_keys(Dir, Config),
- generate_cert(backend, Dir, ensure_dir("basic_backend_certs"), Config).
+ generate_cred(backend, Dir, ensure_dir("basic_backend_creds"), Config).
t_sample_keys_and_cert(Config) ->
Dir = ensure_dir("basic_sample_keys"),
generate_device_keys(Dir, Config),
- generate_cert(sample, Dir, ensure_dir("basic_sample_certs"), Config).
+ generate_cred(sample, Dir, ensure_dir("basic_sample_creds"), Config).
-t_install_backend_node(Config) ->
+t_install_backend_node(_Config) ->
install_rvi_node("basic_backend", env(),
[root(), "/priv/test_config/basic_backend.config"]).
@@ -184,34 +186,69 @@ t_install_bt_sample_node(_Config) ->
install_sample_node("bt_sample", "bt_sample.config").
t_start_basic_backend(_Config) ->
- cmd(["./basic_backend/rvi.sh -s basic_backend -l ./basic_backend/rvi/log -d ./basic_backend -c ./basic_backend/priv/test_config/basic_backend.config start"]),
+ cmd([env(),
+ " ./basic_backend/rvi.sh"
+ " -s basic_backend"
+ " -l ./basic_backend/rvi/log"
+ " -d ./basic_backend"
+ " -c ./basic_backend/priv/test_config/basic_backend.config"
+ " start"]),
await_started("basic_backend"),
ok.
t_start_basic_sample(_Config) ->
- cmd(["./basic_sample/rvi.sh -s basic_sample -l ./basic_sample/rvi/log -d ./basic_sample -c ./basic_sample/priv/test_config/basic_sample.config start"]),
+ cmd([env(),
+ " ./basic_sample/rvi.sh"
+ " -s basic_sample"
+ " -l ./basic_sample/rvi/log"
+ " -d ./basic_sample"
+ " -c ./basic_sample/priv/test_config/basic_sample.config"
+ " start"]),
await_started("basic_sample"),
ok.
t_start_bt_backend(_Config) ->
- cmd(["./bt_backend/rvi.sh -s bt_backend -l ./bt_backend/rvi/log -d ./bt_backend -c ./bt_backend/priv/test_config/bt_backend.config start"]),
+ cmd([env(),
+ " ./bt_backend/rvi.sh -s bt_backend"
+ " -l ./bt_backend/rvi/log"
+ " -d ./bt_backend"
+ " -c ./bt_backend/priv/test_config/bt_backend.config"
+ " start"]),
await_started("bt_backend"),
ok.
t_start_bt_sample(_Config) ->
- cmd(["./bt_sample/rvi.sh -s bt_sample -l ./bt_sample/rvi/log -d ./bt_sample -c ./bt_sample/priv/test_config/bt_sample.config start"]),
+ cmd([env(),
+ " ./bt_sample/rvi.sh"
+ " -s bt_sample"
+ " -l ./bt_sample/rvi/log"
+ " -d ./bt_sample"
+ " -c ./bt_sample/priv/test_config/bt_sample.config"
+ " start"]),
await_started("bt_sample"),
ok.
t_start_tls_backend(_Config) ->
- cmd(["./tls_backend/rvi.sh -s tls_backend -l ./tls_backend/rvi/log -d ./tls_backend -c ./tls_backend/priv/test_config/tls_backend.config start"]),
+ cmd([env(),
+ " ./tls_backend/rvi.sh"
+ " -s tls_backend"
+ " -l ./tls_backend/rvi/log"
+ " -d ./tls_backend"
+ " -c ./tls_backend/priv/test_config/tls_backend.config"
+ " start"]),
await_started("tls_backend"),
ok.
t_start_tls_sample(_Config) ->
- cmd(["./tls_sample/rvi.sh -s tls_sample -l ./tls_sample/rvi/log -d ./tls_sample -c ./tls_sample/priv/test_config/tls_sample.config start"]),
+ cmd([env(),
+ " ./tls_sample/rvi.sh"
+ " -s tls_sample"
+ " -l ./tls_sample/rvi/log"
+ " -d ./tls_sample"
+ " -c ./tls_sample/priv/test_config/tls_sample.config"
+ " start"]),
await_started("tls_sample"),
- ok.
+ ok.
t_register_lock_service(_Config) ->
Pid =
@@ -235,7 +272,7 @@ t_call_lock_service(_Config) ->
verify_call_res(join_stdout_msgs(CallRes)),
ct:log("CallRes = ~p~n", [CallRes]).
-t_remote_call_lock_service(Config) ->
+t_remote_call_lock_service(_Config) ->
CallPid = spawn_cmd(
[python(),
"/rvi_call.py -n ", service_edge("backend"),
@@ -351,42 +388,59 @@ try_match(Pat, Data) ->
ct:log("try_match(S, ~p) -> ~p~nS=~s~n", [Pat, Res, Data]),
Res.
-generate_device_keys(Dir, Config) ->
+generate_device_keys(Dir, _Config) ->
ensure_dir(Dir),
- cmd([scripts(),"/rvi_create_device_key.py ",
- "-p ", root_keys(), "/root_priv.pem -o ", Dir, "/dev -b 2048"]).
-
-generate_cert(sample, KeyDir, CertDir, Config) ->
- %% Don't put lock_cert.json in the certs directory, since rvi_core
+ RootKeyDir = root_keys(),
+ cmd(["openssl genrsa -out ", Dir, "/device_key.pem 2014"]),
+ cmd(["openssl req -new -batch"
+ " -subj ", subj(),
+ " -key ", Dir, "/device_key.pem"
+ " -out ", Dir, "/device_cert.csr"]),
+ cmd(["openssl x509 -req -days 365"
+ " -in ", Dir, "/device_cert.csr"
+ " -CA ", root_keys(), "/root_cert.crt"
+ " -CAkey ", RootKeyDir, "/root_key.pem"
+ " -set_serial 01"
+ " -out ", Dir, "/device_cert.crt"]).
+
+generate_cred(sample, KeyDir, CredDir, _Config) ->
+ %% Don't put lock_cred.json in the certs directory, since rvi_core
%% will report a parse failure for it.
UUID = uuid(),
{Start, Stop} = start_stop(),
- cmd([scripts(), "/rvi_create_certificate.py"
+ cmd([scripts(), "/rvi_create_credential.py"
" --id=", UUID,
- " --device_key=", KeyDir, "/dev_pub.pem",
+ " --issuer=GENIVI"
+ " --device_cert=", KeyDir, "/device_cert.crt",
" --start='", Start, "'"
" --stop='", Stop, "'"
- " --root_key=", root_keys(), "/root_priv.pem"
+ " --root_key=", root_keys(), "/root_key.pem"
" --register='jlr.com/vin/abc/unlock jlr.com/vin/abc/lock'"
" --invoke='jlr.com/backend/set_state'"
- " --jwt_out=", CertDir, "/lock_cert.jwt"
- " --cert_out=", KeyDir, "/lock_cert.json"]),
+ " --jwt_out=", CredDir, "/lock_cred.jwt"
+ " --cred_out=", KeyDir, "/lock_cred.json"]),
ok;
-generate_cert(backend, KeyDir, CertDir, Config) ->
+generate_cred(backend, KeyDir, CertDir, _Config) ->
UUID = uuid(),
{Start, Stop} = start_stop(),
- cmd([scripts(), "/rvi_create_certificate.py"
+ cmd([scripts(), "/rvi_create_credential.py"
" --id=", UUID,
- " --device_key=", KeyDir, "/dev_pub.pem",
+ " --issuer=GENIVI"
+ " --device_cert=", KeyDir, "/device_cert.crt",
" --start='", Start, "'"
" --stop='", Stop, "'"
- " --root_key=", root_keys(), "/root_priv.pem"
+ " --root_key=", root_keys(), "/root_key.pem"
" --register='jlr.com'"
" --invoke='jlr.com'"
- " --jwt_out=", CertDir, "/backend_cert.jwt"
- " --cert_out=", KeyDir, "/backend_cert.json"]),
+ " --jwt_out=", CertDir, "/backend_cred.jwt"
+ " --cred_out=", KeyDir, "/backend_cred.json"]),
ok.
+subj() ->
+ "/C=US/ST=OR/O=JLR/localityName=Portland/organizationalUnitName=Ostc".
+
+
+
start_stop() ->
DT = erlang:localtime(),
GS = calendar:datetime_to_gregorian_seconds(DT),
@@ -405,7 +459,7 @@ ensure_dir(Dir) ->
Dir.
env() ->
- "RVI_LOGLEVEL=debug RVI_MYIP=127.0.0.1".
+ "RVI_LOGLEVEL=debug RVI_MYIP=127.0.0.1 RVI_BACKEND=127.0.0.1".
root() ->
code:lib_dir(rvi_core).
@@ -422,7 +476,7 @@ root_keys() ->
service_edge("backend") -> "http://localhost:8801";
service_edge("sample" ) -> "http://localhost:9001".
-install_rvi_node(Name, Env, ConfigF) ->
+install_rvi_node(Name, Env, _ConfigF) ->
Root = code:lib_dir(rvi_core),
Scripts = filename:join(Root, "scripts"),
ct:log("Root = ~p", [Root]),
@@ -433,7 +487,7 @@ install_rvi_node(Name, Env, ConfigF) ->
Res = cmd(Cmd),
ct:log("install_rvi_node/1 -> ~p", [Res]),
-
+
Res1 = cmd(lists:flatten(["install -d --mode 0755 ./", Name])),
ct:log("install_rvi_node/2 -> ~p", [Res1]),
@@ -449,16 +503,16 @@ install_sample_node(Name, ConfigF) ->
install_rvi_node(Name, Env,
[root(), "/priv/test_config/", ConfigF]).
-in_priv_dir(F, Cfg) ->
- %% PrivDir = ?config(priv_dir, Cfg),
- %% in_dir(PrivDir, F, Cfg).
- F(Cfg).
+%% in_priv_dir(F, Cfg) ->
+%% %% PrivDir = ?config(priv_dir, Cfg),
+%% %% in_dir(PrivDir, F, Cfg).
+%% F(Cfg).
cmd(C) ->
cmd(C, []).
cmd(C, Opts) ->
- {ok, Res} = cmd_(C, Opts).
+ {ok, _Res} = cmd_(C, Opts).
cmd_(C0, Opts) ->
C = binary_to_list(iolist_to_binary(C0)),
@@ -474,7 +528,7 @@ cmd_(C0, Opts) ->
CmdRes.
cmd_res({_, L}) ->
- {Err,L1} = take(stderr, L, ""),
+ {Err,_L1} = take(stderr, L, ""),
{Out,L2} = take(stdout, L, ""),
{Out, Err, L2}.
@@ -515,8 +569,8 @@ await_started(Name) ->
save_ospid(Node) ->
save({Node,pid}, rpc:call(Node, os, getpid, [])).
-get_ospid(Node) ->
- lookup({Node, pid}).
+%% get_ospid(Node) ->
+%% lookup({Node, pid}).
stop_nodes() ->
Nodes = ets:select(?DATA, [{ {{'$1',pid},'$2'}, [], [{{'$1','$2'}}] }]),