summaryrefslogtreecommitdiff
path: root/components
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-10-28 21:23:05 +0100
committerUlf Wiger <ulf@feuerlabs.com>2015-11-20 13:46:13 -0800
commit1b44c2448344a10ae63904a796b6211c40a3f212 (patch)
treece8f7dda870a5c454ffbe9e2c0bc7035b34f0f4b /components
parent34aa86b5a2e97650fe6299ccf794d5eb5d052d91 (diff)
downloadrvi_core-1b44c2448344a10ae63904a796b6211c40a3f212.tar.gz
Lots of changes to make dlink_bt (simulated) and dlink_tls runtime tests pass
* Introduced high-level logging (rvi_log) * Upgraded to new lager version, customized debug output * Thread rvi_log IDs between nodes and components * Introduce simplified protocol for dlink_tls * Use msgpack encoding for dlink_tls * dlink_bt can use TCP instead of Bluetooth for testing purposes * Bug fixes and additions to the test suite
Diffstat (limited to 'components')
-rw-r--r--components/authorize/src/authorize_keys.erl93
-rw-r--r--components/authorize/src/authorize_rpc.erl204
-rw-r--r--components/authorize/src/authorize_sig.erl11
-rw-r--r--components/dlink/src/dlink_data_msgpack.erl2
-rw-r--r--components/dlink_bt/src/bt_connection.erl103
-rw-r--r--components/dlink_bt/src/bt_listener.erl59
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl269
-rw-r--r--components/dlink_tcp/src/connection.erl18
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl109
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl95
-rw-r--r--components/dlink_tls/src/dlink_tls_listener.erl18
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl254
-rw-r--r--components/rvi_common/include/rvi_common.hrl13
-rw-r--r--components/rvi_common/include/rvi_dlink.hrl1
-rw-r--r--components/rvi_common/include/rvi_dlink_bin.hrl1
-rw-r--r--components/rvi_common/src/rvi_common.app.src5
-rw-r--r--components/rvi_common/src/rvi_common.erl107
-rw-r--r--components/rvi_common/src/rvi_common_app.erl5
-rw-r--r--components/rvi_common/src/rvi_log.erl200
-rw-r--r--components/schedule/src/rvi_routing.erl26
-rw-r--r--components/schedule/src/schedule_rpc.erl2
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl184
-rw-r--r--components/service_edge/src/service_edge_rpc.erl107
23 files changed, 1232 insertions, 654 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index daf7dc2..834935e 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -8,7 +8,7 @@
provisioning_key/0,
signed_public_key/2,
save_keys/2,
- save_cert/3]).
+ save_cert/4]).
-export([get_certificates/0,
get_certificates/1]).
-export([validate_message/2]).
@@ -18,6 +18,10 @@
json_to_public_key/1]).
-export([self_signed_public_key/0]). % just temporary
+-export([pp_key/1,
+ abbrev_bin/1,
+ abbrev_payload/1,
+ abbrev_jwt/1]).
-export([start_link/0,
init/1,
@@ -120,8 +124,8 @@ provisioning_key() ->
save_keys(Keys, Conn) ->
gen_server:call(?MODULE, {save_keys, Keys, Conn}).
-save_cert(Cert, JWT, Conn) ->
- gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn}).
+save_cert(Cert, JWT, Conn, LogId) ->
+ gen_server:call(?MODULE, {save_cert, Cert, JWT, Conn, LogId}).
%% Gen_server functions
@@ -138,7 +142,7 @@ start_link() ->
init([]) ->
ProvisioningKey = get_pub_key(get_env(provisioning_key)),
- ?debug("ProvisioningKey = ~p~n", [ProvisioningKey]),
+ ?debug("ProvisioningKey = ~s~n", [pp_key(ProvisioningKey)]),
CertDir = setup:verify_dir(get_env(cert_dir)),
{ok, AuthJwt0} = file:read_file(get_env(authorize_jwt)),
AuthJwt = strip_nl(AuthJwt0),
@@ -167,17 +171,19 @@ handle_call_({get_certificates, Conn}, _, S) ->
Certs = certs_by_conn(Conn),
{reply, Certs, S};
handle_call_({save_keys, Keys, Conn}, _, S) ->
- ?debug("save_keys: Keys=~p, Conn=~p~n", [Keys, Conn]),
+ ?debug("save_keys: Keys=~p, Conn=~p~n", [abbrev_k(Keys), Conn]),
save_keys_(Keys, Conn),
{reply, ok, S};
handle_call_({validate_message, JWT, Conn}, _, S) ->
{reply, validate_message_(JWT, Conn), S};
-handle_call_({save_cert, Cert, JWT, Conn}, _, S) ->
+handle_call_({save_cert, Cert, JWT, {IP, Port} = Conn, LogId}, _, S) ->
case process_cert_struct(Cert, JWT) of
invalid ->
+ log(LogId, "cert INVALID Conn=~s:~w", [IP, Port]),
{reply, {error, invalid}, S};
#cert{} = C ->
ets:insert(?CERTS, {{Conn, C#cert.id}, C}),
+ log(LogId, "cert stored ~s Conn=~s:~w", [abbrev_bin(C#cert.id), IP, Port]),
{reply, ok, S}
end;
handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
@@ -188,7 +194,7 @@ handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
handle_call_({find_cert_by_service, Service} = R, _From, State) ->
?debug("authorize_keys:handle_call(~p,...)~n", [R]),
Res = find_cert_by_service_(Service),
- ?debug("Res = ~p~n", [Res]),
+ ?debug("Res = ~p~n", [case Res of {ok,{A,B}} -> {ok,{A,abbrev_bin(B)}}; _ -> Res end]),
{reply, Res, State};
handle_call_(_, _, S) ->
{reply, error, S}.
@@ -214,7 +220,7 @@ certs_by_conn(Conn) ->
validity = '$2',
_='_'}},
[], [{{'$1', '$2'}}] }]),
- ?debug("rough selection: ~p~n", [Certs]),
+ ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Certs]]),
[C || {C,V} <- Certs, check_validity(V, UTC)].
filter_by_service_(Services, Conn) ->
@@ -258,8 +264,8 @@ find_cert_by_service_(Service) ->
end, {0, none}, LocalCerts) of
{0, none} ->
{error, not_found};
- {_, Found} ->
- {ok, Found#cert.jwt}
+ {_, #cert{id = Id, jwt = JWT}} ->
+ {ok, {Id, JWT}}
end.
match_length(Invoke, Svc) ->
@@ -394,7 +400,7 @@ process_cert(F, Key, UTC, Acc) ->
{ok, Bin} ->
try authorize_sig:decode_jwt(strip_nl(Bin), Key) of
{_, Cert} ->
- ?info("Unpacked Cert ~p:~n~p~n", [F, Cert]),
+ ?info("Unpacked Cert ~p:~n~p~n", [F, abbrev_payload(Cert)]),
case process_cert_struct(Cert, Bin, UTC) of
invalid ->
Acc;
@@ -492,7 +498,7 @@ save_key(K, Conn) ->
{ok, ID} -> {Conn, ID};
_ -> {Conn, make_ref()}
end,
- ?debug("Saving key ~p, PubKey = ~p~n", [KeyID, PubKey]),
+ ?debug("Saving key ~p, PubKey = ~p~n", [KeyID, pp_key(PubKey)]),
ets:insert(?KEYS, #key{id = KeyID, key = PubKey})
end.
@@ -517,3 +523,66 @@ validate_message_1([{_,K}|T], JWT) ->
end;
validate_message_1([], _) ->
error(invalid).
+
+
+pp_key(#'RSAPrivateKey'{modulus = Mod, publicExponent = Pub}) ->
+ P = integer_to_binary(Pub),
+ M = integer_to_binary(Mod),
+ <<"#{'RSAPrivateKey'{modulus = ", (abbrev_bin(M))/binary,
+ ", publicExponent = ", P/binary, ", _ = ...}">>;
+pp_key(#'RSAPublicKey'{modulus = Mod, publicExponent = Pub}) ->
+ P = integer_to_binary(Pub),
+ M = integer_to_binary(Mod),
+ <<"#{'RSAPublicKey'{modulus = ", (abbrev_bin(M))/binary,
+ ", publicExponent = ", P/binary, ", _ = ...}">>.
+
+abbrev_bin(B) ->
+ abbrev_bin(B, 20, 6).
+
+abbrev_bin(B, Len, Part) ->
+ try case byte_size(B) of
+ Sz when Sz > Len ->
+ Part1 = erlang:binary_part(B, {0,Part}),
+ Part2 = erlang:binary_part(B, {Sz,-Part}),
+ <<Part1/binary, "...", Part2/binary>>;
+ _ ->
+ B
+ end
+ catch error:_ -> B end.
+
+abbrev_payload(Payload) ->
+ try lists:map(fun abbrev_pl/1, Payload)
+ catch error:_ -> Payload end.
+
+abbrev_jwt({Hdr, Body} = X) ->
+ try {Hdr, abbrev_payload(Body)}
+ catch error:_ -> X end.
+
+abbrev_pl({<<"keys">> = K, Ks}) ->
+ {K, [abbrev_k(Ky) || Ky <- Ks]};
+abbrev_pl({<<"certs">> = C, Cs}) ->
+ {C, [abbrev_bin(Cert) || Cert <- Cs]};
+abbrev_pl({<<"certificate">> = K, C}) ->
+ {K, abbrev_bin(C)};
+abbrev_pl({<<"sign">> = K, S}) when is_binary(S) ->
+ {K, abbrev_bin(S)};
+abbrev_pl(B) when is_binary(B) ->
+ abbrev_bin(B, 40, 10);
+abbrev_pl(L) when is_list(L) ->
+ abbrev_payload(L);
+abbrev_pl(X) ->
+ X.
+
+abbrev_k(K) ->
+ try lists:map(fun abbrev_elem/1, K)
+ catch error:_ -> K end.
+
+abbrev_elem({<<"n">>, Bin}) ->
+ {<<"n">>, authorize_keys:abbrev_bin(Bin)};
+abbrev_elem(X) ->
+ X.
+
+log([ID], Fmt, Args) ->
+ rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _) ->
+ ok.
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index 9dc82a8..54a9657 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -22,7 +22,9 @@
get_certificates/1,
sign_message/2,
validate_message/3,
+ validate_authorization/3,
validate_authorization/4,
+ store_certs/3,
authorize_local_message/3,
authorize_remote_message/3]).
-export([filter_by_service/3]).
@@ -49,8 +51,9 @@ start_link() ->
init([]) ->
?debug("authorize_rpc:init(): called."),
- {Priv, Pub} = KeyPair = authorize_keys:get_key_pair(),
- ?debug("KeyPair = ~p~n", [KeyPair]),
+ {Priv, Pub} = authorize_keys:get_key_pair(),
+ ?debug("KeyPair = {~s, ~s}~n", [authorize_keys:pp_key(Priv),
+ authorize_keys:pp_key(Pub)]),
{ok, #st { cs = rvi_common:get_component_specification(),
private_key = Priv,
public_key = Pub} }.
@@ -81,6 +84,14 @@ get_certificates(CompSpec) ->
rvi_common:request(authorize, ?MODULE, get_certificates,
[], [status, certs], CompSpec).
+validate_authorization(CompSpec, JWT, Conn) ->
+ ?debug("authorize_rpc:validate_authorization():"
+ " Conn = ~p~n", [Conn]),
+ rvi_common:request(authorize, ?MODULE, validate_authorization,
+ [{jwt, JWT},
+ {conn, Conn}],
+ [status], CompSpec).
+
validate_authorization(CompSpec, JWT, Certs, Conn) ->
?debug("authorize_rpc:validate_authorization():"
" Conn = ~p~n", [Conn]),
@@ -90,6 +101,12 @@ validate_authorization(CompSpec, JWT, Certs, Conn) ->
{conn, Conn}],
[status], CompSpec).
+store_certs(CompSpec, Certs, Conn) ->
+ rvi_common:request(authorize, ?MODULE, store_certs,
+ [{certs, Certs},
+ {conn, Conn}],
+ [status], CompSpec).
+
authorize_local_message(CompSpec, Service, Params) ->
?debug("authorize_rpc:authorize_local_msg(): params: ~p ~n", [Params]),
rvi_common:request(authorize, ?MODULE, authorize_local_message,
@@ -131,8 +148,9 @@ private_key() ->
%% CAlled by local exo http server
handle_rpc("sign_message", Args) ->
{ok, Message} = rvi_common:get_json_element(["message"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
[ Status, JWT ] =
- gen_server:call(?SERVER, { rvi, sign_message, [Message] }),
+ gen_server:call(?SERVER, { rvi, sign_message, [Message, LogId] }),
?debug("Message signature = ~p~n", [JWT]),
{ok, [ {status, rvi_common:json_rpc_status(Status)},
{jwt, JWT} ]};
@@ -140,32 +158,47 @@ handle_rpc("validate_message", Args) ->
?debug("validate_message; Args = ~p~n", [Args]),
{ok, JWT} = rvi_common:get_json_element(["jwt"], Args),
{ok, Conn} = rvi_common:get_json_element(["conn"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
[ Status, Msg ] =
- gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn] }),
+ gen_server:call(?SERVER, { rvi, validate_message, [JWT, Conn, LogId] }),
{ok, [ {status, rvi_common:json_rpc_status(Status)},
{message, Msg} ]};
-handle_rpc("get_authorize_jwt", []) ->
+handle_rpc("get_authorize_jwt", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_authorize_jwt, [] }),
+ gen_server:call(?SERVER, { rvi, get_authorize_jwt, [LogId] }),
{ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
-handle_rpc("get_certificates", []) ->
+handle_rpc("get_certificates", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, get_certificates, [] }),
+ gen_server:call(?SERVER, { rvi, get_certificates, [LogId] }),
{ok, [ rvi_common:json_rpc_status(Status) | Rem ] };
handle_rpc("validate_authorization", Args) ->
{ok, JWT} = rvi_common:get_json_element(["jwt"], Args),
- {ok, Certs} = rvi_common:get_json_element(["certs"], Args),
{ok, Conn} = rvi_common:get_json_element(["connection"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
+ CmdArgs =
+ case rvi_common:get_json_element(["certs"], Args) of
+ {ok, Certs} -> [JWT, Certs, Conn, LogId];
+ {error, _} -> [JWT, Conn, LogId]
+ end,
[ Status | Rem ] =
- gen_server:call(?SERVER, { rvi, validate_authorization,
- [JWT, Certs, Conn] }),
+ gen_server:call(?SERVER, {rvi, validate_authorization, CmdArgs}),
{ok, [ rvi_common:json_rpc_status(Status) | Rem] };
+handle_rpc("store_certs", Args) ->
+ {ok, Certs} = rvi_common:get_json_element(["certs"], Args),
+ {ok, Conn} = rvi_common:get_json_element(["conn"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
+ [ Status | Rem ] =
+ gen_server:call(?SERVER, {rvi, store_certs, [Certs, Conn, LogId]}),
+ {ok, [ rvi_common:json_rpc_status(Status) | Rem]};
handle_rpc("authorize_local_message", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
{ok, Params} = rvi_common:get_json_element(["parameters"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
[ Status | Rem ] =
gen_server:call(?SERVER, { rvi, authorize_local_message,
- [Service, Params]}),
+ [Service, Params, LogId]}),
{ ok, [ rvi_common:json_rpc_status(Status) | Rem] };
@@ -173,17 +206,19 @@ handle_rpc("authorize_local_message", Args) ->
handle_rpc("authorize_remote_message", Args) ->
{ok, Service} = rvi_common:get_json_element(["service"], Args),
{ok, Params} = rvi_common:get_json_element(["parameters"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
[ Status ] = gen_server:call(?SERVER, { rvi, authorize_remote_message,
- [Service, Params]}),
+ [Service, Params, LogId]}),
{ ok, rvi_common:json_rpc_status(Status)};
handle_rpc("filter_by_service", Args) ->
?debug("authorize_rpc:handle_rpc(\"filter_by_service\", ~p)~n", [Args]),
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, Conn} = rvi_common:get_json_element(["conn"], Args),
+ LogId = rvi_common:get_json_log_id(Args),
[ Status, FilteredServices ] =
gen_server:call(?SERVER, { rvi, filter_by_service,
- [Services, Conn] }),
+ [Services, Conn, LogId] }),
{ok, [{status, rvi_common:json_rpc_status(Status)},
{services, FilteredServices}]};
@@ -199,31 +234,40 @@ handle_notification(Other, _Args) ->
%%
%% Genserver implementation
%%
-handle_call({rvi, sign_message, [Msg]}, _, #st{private_key = Key} = State) ->
- {reply, [ ok, authorize_sig:encode_jwt(Msg, Key) ], State};
-handle_call({rvi, validate_message, [JWT, Conn]}, _, State) ->
- try {reply, [ok, authorize_keys:validate_message(JWT, Conn)], State}
+handle_call({rvi, sign_message, [Msg | LogId]}, _, #st{private_key = Key} = State) ->
+ Sign = authorize_sig:encode_jwt(Msg, Key),
+ log(LogId, "signed", []),
+ {reply, [ ok, Sign ], State};
+handle_call({rvi, validate_message, [JWT, Conn | LogId]}, _, State) ->
+ try begin Res = authorize_keys:validate_message(JWT, Conn),
+ log(LogId, "validated", []),
+ {reply, [ok, Res], State}
+ end
catch
error:_Err ->
- {reply, [not_found], State}
+ log(LogId, "validation FAILED", []),
+ {reply, [not_found], State}
end;
-handle_call({rvi, get_authorize_jwt, []}, _From, State) ->
+handle_call({rvi, get_authorize_jwt, [_LogId]}, _From, State) ->
{reply, [ ok, authorize_keys:authorize_jwt() ], State};
-handle_call({rvi, get_certificates, []}, _From, State) ->
+handle_call({rvi, get_certificates, [_LogId]}, _From, State) ->
{reply, [ ok, authorize_keys:get_certificates() ], State};
-handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) ->
+handle_call({rvi, validate_authorization, [JWT, Conn | [_] = LogId]}, _From, State) ->
%% The authorize JWT contains the public key used to sign the cert
?debug(
"authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
[]),
try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
{_Header, Keys} ->
- store_certs(Certs, Keys, JWT, Conn),
+ log(LogId, "auth jwt validated", []),
+ KeyStructs = get_json_element(["keys"], Keys, []),
+ authorize_keys:save_keys(KeyStructs, Conn),
{reply, [ok], State};
invalid ->
?warning("Invalid auth JWT from ~p~n", [Conn]),
+ log(LogId, "auth jwt INVALID", []),
{reply, [not_found], State}
catch
error:_Err ->
@@ -231,20 +275,49 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) ->
{reply, [not_found], State}
end;
-handle_call({rvi, authorize_local_message, [Service, Params] } = R, _From,
+handle_call({rvi, validate_authorization, [JWT, Certs, Conn | [_] = LogId] }, _From, State) ->
+ %% The authorize JWT contains the public key used to sign the cert
+ ?debug(
+ "authorize_rpc:handle_call({rvi, validate_authorization, [_,_,_]})~n",
+ []),
+ try authorize_sig:decode_jwt(JWT, authorize_keys:provisioning_key()) of
+ {_Header, Keys} ->
+ log(LogId, "auth jwt validated", []),
+ KeyStructs = get_json_element(["keys"], Keys, []),
+ ?debug("KeyStructs = ~p~n", [KeyStructs]),
+ authorize_keys:save_keys(KeyStructs, Conn),
+ do_store_certs(Certs, Conn, LogId),
+ {reply, [ok], State};
+ invalid ->
+ ?warning("Invalid auth JWT from ~p~n", [Conn]),
+ log(LogId, "auth jwt INVALID", []),
+ {reply, [not_found], State}
+ catch
+ error:_Err ->
+ ?warning("Auth validation exception: ~p~n", [_Err]),
+ {reply, [not_found], State}
+ end;
+
+handle_call({store_certs, [Certs, Conn | LogId]}, _From, State) ->
+ do_store_certs(Certs, Conn, LogId),
+ {reply, [ok], State};
+handle_call({rvi, authorize_local_message, [Service, Params | LogId] } = R, _From,
#st{private_key = Key} = State) ->
?debug("authorize_rpc:handle_call(~p)~n", [R]),
case authorize_keys:find_cert_by_service(Service) of
- {ok, Cert} ->
+ {ok, {ID, Cert}} ->
Msg = Params ++ [{<<"certificate">>, Cert}],
- ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", [Msg]),
+ ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n",
+ [authorize_keys:abbrev_payload(Msg)]),
Sig = authorize_sig:encode_jwt(Msg, Key),
+ log(LogId, "auth msg: Cert=~s", [authorize_keys:abbrev_bin(ID)]),
{reply, [ok, Sig], State};
_ ->
+ log(LogId, "NO CERTS for ~s", [Service]),
{reply, [ not_found ], State}
end;
-handle_call({rvi, authorize_remote_message, [_Service, Params]},
+handle_call({rvi, authorize_remote_message, [_Service, Params | LogId]},
_From, State) ->
IP = proplists:get_value(remote_ip, Params),
Port = proplists:get_value(remote_port, Params),
@@ -261,27 +334,22 @@ handle_call({rvi, authorize_remote_message, [_Service, Params]},
case authorize_keys:validate_message(
iolist_to_binary(Signature), {IP, Port}) of
invalid ->
+ log(LogId, "signature INVALID", []),
{reply, [ not_found ], State};
Msg ->
- {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg),
- {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg),
- {ok, Params1} = rvi_common:get_json_element(["parameters"], Msg),
- ?debug("authorize_rpc:authorize_remote_message(): timeout1: ~p~n", [Timeout1]),
- ?debug("authorize_rpc:authorize_remote_message(): service_name1: ~p~n", [SvcName1]),
- ?debug("authorize_rpc:authorize_remote_message(): parameters1: ~p~n", [Params1]),
-
- if Timeout =:= Timeout1,
- SvcName =:= SvcName1,
- Parameters =:= Params1 ->
- ?debug("Remote message authorized.~n", []),
- {reply, [ ok ], State};
- true ->
- ?debug("Remote message NOT authorized.~n", []),
- {reply, [ not_found ], State}
+ case check_msg([{"timeout", Timeout},
+ {"service_name", SvcName},
+ {"parameters", Parameters}], Msg) of
+ ok ->
+ log(LogId, "params verified", []),
+ {reply, [ok], State};
+ {error, {mismatch, Bad}} ->
+ log(LogId, "params MISMATCH: ~p", [Bad]),
+ {reply, [not_found], State}
end
end;
-handle_call({rvi, filter_by_service, [Services, Conn]}, _From, State) ->
+handle_call({rvi, filter_by_service, [Services, Conn | _LogId]}, _From, State) ->
Filtered = authorize_keys:filter_by_service(Services, Conn),
{reply, [ok, Filtered], State};
@@ -313,13 +381,10 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-store_certs(Certs, Keys, JWT, Conn) ->
+do_store_certs(Certs, Conn, LogId) ->
?debug("Storing ~p certs for conn ~p~n", [length(Certs), Conn]),
- KeyStructs = get_json_element(["keys"], Keys, []),
- authorize_keys:save_keys(KeyStructs, Conn),
- ?debug("KeyStructs = ~p~n", [KeyStructs]),
lists:foreach(fun(Cert) ->
- store_cert(Cert, KeyStructs, JWT, Conn)
+ store_cert(Cert, Conn, LogId)
end, Certs).
get_json_element(Path, JSON, Default) ->
@@ -330,11 +395,10 @@ get_json_element(Path, JSON, Default) ->
Default
end.
-store_cert(Cert, Keys, JWT, Conn) ->
+store_cert(Cert, Conn, LogId) ->
case authorize_sig:decode_jwt(Cert, authorize_keys:provisioning_key()) of
{_CHeader, CertStruct} ->
- authorize_keys:save_keys(Keys, Conn),
- case authorize_keys:save_cert(CertStruct, JWT, Conn) of
+ case authorize_keys:save_cert(CertStruct, Cert, Conn, LogId) of
ok ->
ok;
{error, Reason} ->
@@ -347,3 +411,41 @@ store_cert(Cert, Keys, JWT, Conn) ->
?warning("Invalid certificate from ~p~n", [Conn]),
ok
end.
+
+log([ID], Fmt, Args) ->
+ rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _) ->
+ ok.
+
+check_msg(Checks, Params) ->
+ check_msg(Checks, Params, []).
+
+ %% {ok, Timeout1} = rvi_common:get_json_element(["timeout"], Msg),
+ %% {ok, SvcName1} = rvi_common:get_json_element(["service_name"], Msg),
+ %% {ok, Params1} = rvi_common:get_json_element(["parameters"], Msg),
+ %% ?debug("authorize_rpc:authorize_remote_message(): timeout1: ~p~n", [Timeout1]),
+ %% ?debug("authorize_rpc:authorize_remote_message(): service_name1: ~p~n", [SvcName1]),
+ %% ?debug("authorize_rpc:authorize_remote_message(): parameters1: ~p~n", [Params1]),
+
+ %% if Timeout =:= Timeout1 * 1000,
+ %% SvcName =:= SvcName1,
+ %% Parameters =:= Params1 ->
+ %% ?debug("Remote message authorized.~n", []),
+ %% {reply, [ ok ], State};
+ %% true ->
+ %% ?debug("Remote message NOT authorized.~n", []),
+ %% {reply, [ not_found ], State}
+ %% end
+ %% end;
+
+check_msg([], _, []) ->
+ ok;
+check_msg([{Key, Expect}|T], Msg, Acc) ->
+ case rvi_common:get_json_element([Key], Msg) of
+ {ok, Expect} ->
+ check_msg(T, Msg, Acc);
+ _ ->
+ check_msg(T, Msg, [Key|Acc])
+ end;
+check_msg([], _, [_|_] = Acc) ->
+ {error, {mismatch, lists:reverse(Acc)}}.
diff --git a/components/authorize/src/authorize_sig.erl b/components/authorize/src/authorize_sig.erl
index 9868218..c69bbd0 100644
--- a/components/authorize/src/authorize_sig.erl
+++ b/components/authorize/src/authorize_sig.erl
@@ -12,11 +12,11 @@ decode_jwt(JWT, PubKey) when is_list(JWT)->
decode_jwt(list_to_binary(JWT), PubKey);
decode_jwt(JWT, PubKey) when is_binary(JWT)->
- ?debug("authorize_sig:decode_jwt(JWT, PubKey=~p)~n", [PubKey]),
+ ?debug("authorize_sig:decode_jwt(JWT, PubKey=~s)~n", [authorize_keys:pp_key(PubKey)]),
[H, P, S] = binary:split(JWT, <<".">>, [global]),
Header = decode_json(base64url:decode(H)),
Payload = decode_json(base64url:decode(P)),
- ?debug("JWT Header = ~p~nPayload: ~p~n", [Header, Payload]),
+ ?debug("JWT Header = ~p~nPayload: ~p~n", [Header, authorize_keys:abbrev_payload(Payload)]),
Signature = base64url:decode(S),
SigningInput = <<H/binary, ".", P/binary>>,
Res = case public_key:verify(
@@ -27,15 +27,14 @@ decode_jwt(JWT, PubKey) when is_binary(JWT)->
true ->
{Header, Payload}
end,
- ?debug("decoded JWT = ~p~n", [Res]),
+ ?debug("decoded JWT = ~p~n", [authorize_keys:abbrev_jwt(Res)]),
Res.
encode_jwt(JSON, PrivKey) ->
encode_jwt(JSON, header(), PrivKey).
encode_jwt(Payload0, Header0, PrivKey) ->
- ?debug("encode_jwt(~p,~p,_)~n", [catch ensure_json(Payload0),
- catch ensure_json(Header0)]),
+ ?debug("encode_jwt()", []),
Header = base64url:encode(ensure_json(Header0)),
Payload = base64url:encode(ensure_json(Payload0)),
SigningInput = <<Header/binary, ".", Payload/binary>>,
@@ -56,7 +55,7 @@ ensure_json([_|_] = JSON) ->
%% Since there may be atoms
{ok, Normalized} = msgpack:unpack(msgpack:pack(JSON, [jsx,
{allow_atom,pack}]), [jsx]),
- ?debug("Normalized = ~p~n", [Normalized]),
+ ?debug("Normalized = ~p~n", [authorize_keys:abbrev_payload(Normalized)]),
jsx:encode(Normalized).
decode_json(JSON) when is_list(JSON) ->
diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl
index 69576b6..9510a53 100644
--- a/components/dlink/src/dlink_data_msgpack.erl
+++ b/components/dlink/src/dlink_data_msgpack.erl
@@ -10,7 +10,7 @@
port_options() ->
[binary, {packet, 0}].
-init(_Opts) ->
+init(_CS) ->
#st{}.
decode(Msg0, #st{buf = Prev, opts = Opts} = St) ->
diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl
index a5fc3c5..85a8663 100644
--- a/components/dlink_bt/src/bt_connection.erl
+++ b/components/dlink_bt/src/bt_connection.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -25,8 +25,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([connect/5]).
--export([accept/5]).
+-export([connect/6]).
+-export([accept/6]).
-export([send/2]).
-export([send/3]).
-export([is_connection_up/1]).
@@ -35,13 +35,14 @@
-export([terminate_connection/2]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(st, {
remote_addr = "00:00:00:00:00:00",
channel = 0,
rfcomm_ref = undefined,
listen_ref = undefined,
+ mode = bt,
mod = undefined,
func = undefined,
args = undefined
@@ -52,29 +53,30 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-connect(Addr, Channel, Mod, Fun, Arg) ->
- gen_server:start_link(?MODULE,
- {connect, Addr, Channel, Mod, Fun, Arg },
+connect(Addr, Channel, Mode, Mod, Fun, Arg) ->
+ gen_server:start_link(?MODULE,
+ {connect, Addr, Channel, Mode, Mod, Fun, Arg },
[]).
-accept(Channel, ListenRef, Mod, Fun, Arg) ->
- gen_server:start_link(?MODULE, {accept,
- Channel,
- ListenRef,
+accept(Channel, ListenRef, Mode, Mod, Fun, Arg) ->
+ gen_server:start_link(?MODULE, {accept,
+ Channel,
+ ListenRef,
+ Mode,
Mod,
- Fun,
+ Fun,
Arg},[]).
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
-
+
send(Addr, Channel, Data) ->
case bt_connection_manager:find_connection_by_address(Addr, Channel) of
{ok, Pid} ->
gen_server:cast(Pid, {send, Data});
- _Err ->
- ?info("connection:send(): Connection ~p:~p not found for data: ~p",
+ _Err ->
+ ?info("connection:send(): Connection ~p:~p not found for data: ~p",
[ Addr, Channel, Data]),
not_found
@@ -82,7 +84,7 @@ send(Addr, Channel, Data) ->
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
-
+
terminate_connection(Addr, Channel) ->
case bt_connection_manager:find_connection_by_address(Addr, Channel) of
{ok, Pid} ->
@@ -100,10 +102,10 @@ is_connection_up(Addr, Channel) ->
{ok, Pid} ->
is_connection_up(Pid);
- _Err ->
+ _Err ->
false
end.
-
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -122,8 +124,8 @@ is_connection_up(Addr, Channel) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({connect, BTAddr, Channel, Mod, Fun, Arg}) ->
-
+init({connect, BTAddr, Channel, Mode, Mod, Fun, Arg}) ->
+
%% connect will block on rfcomm:open, so cast to self
%% in order to let init return.
gen_server:cast(self(), connect),
@@ -131,6 +133,7 @@ init({connect, BTAddr, Channel, Mod, Fun, Arg}) ->
remote_addr = BTAddr,
channel = Channel,
rfcomm_ref = undefined,
+ mode = Mode,
mod = Mod,
func = Fun,
args = Arg
@@ -138,8 +141,15 @@ init({connect, BTAddr, Channel, Mod, Fun, Arg}) ->
-init({accept, Channel, ListenRef, Mod, Fun, Arg}) ->
- { ok, ARef } = rfcomm:accept(ListenRef, infinity, self()),
+init({accept, Channel, ListenRef, Mode, Mod, Fun, Arg}) ->
+ ARef = case Mode of
+ tcp ->
+ {ok, R} = exo_socket:async_accept(ListenRef),
+ R;
+ bt ->
+ {ok, R} = rfcomm:accept(ListenRef, infinity, self()),
+ R
+ end,
?debug("bt_connection:init(accept): self(): ~p", [self()]),
?debug("bt_connection:init(accept): Channel: ~p", [Channel]),
?debug("bt_connection:init(accept): ListenRef: ~p", [ListenRef]),
@@ -152,6 +162,7 @@ init({accept, Channel, ListenRef, Mod, Fun, Arg}) ->
channel = Channel,
rfcomm_ref = ARef,
listen_ref = ListenRef,
+ mode = Mode,
mod = Mod,
func = Fun,
args = Arg
@@ -173,7 +184,7 @@ init({accept, Channel, ListenRef, Mod, Fun, Arg}) ->
%% @end
%%--------------------------------------------------------------------
handle_call(terminate_connection, _From, St) ->
- ?debug("~p:handle_call(terminate_connection): Terminating: ~p",
+ ?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.remote_addr, St#st.channel}]),
{stop, Reason, NSt} = handle_info({tcp_closed, St#st.rfcomm_ref}, St),
@@ -197,13 +208,20 @@ handle_call(_Request, _From, State) ->
handle_cast(connect, #st {
remote_addr = BTAddr,
channel = Channel,
+ mode = Mode,
mod = Mod,
func = Fun,
args = Arg
} = St) ->
%% Looong call that blocks for ever.
- case rfcomm:open(BTAddr, Channel) of
+ ConnRes = case Mode of
+ bt ->
+ rfcomm:open(BTAddr, Channel);
+ tcp ->
+ exo_socket:connect(BTAddr, Channel)
+ end,
+ case ConnRes of
{ok, ConnRef} ->
?debug("bt_connection:init(connect): self(): ~p", [self()]),
?debug("bt_connection:init(connect): BTAddr: ~p", [BTAddr]),
@@ -229,7 +247,7 @@ handle_cast(connect, #st {
end;
handle_cast({send, Data}, St) ->
- ?debug("~p:handle_call(send): Sending: ~p",
+ ?debug("~p:handle_call(send): Sending: ~p",
[ ?MODULE, Data]),
rfcomm:send(St#st.rfcomm_ref, Data),
@@ -253,21 +271,21 @@ handle_cast(_Msg, State) ->
%% An accept reference we've setup now has accetpted an
%% incoming connection.
-handle_info({rfcomm, _ARef, { accept, BTAddr, _ } },
+handle_info({rfcomm, _ARef, { accept, BTAddr, _ } },
#st { mod = Mod,
func = Fun,
args = Arg,
channel = Channel } = St) ->
- ?info("~p:handle_info(): bt_connection from ~w:~w\n",
+ ?info("~p:handle_info(): bt_connection from ~w:~w\n",
[?MODULE, BTAddr,Channel]),
-
+
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
- { noreply, St#st { remote_addr = BTAddr,
+ { noreply, St#st { remote_addr = BTAddr,
channel = Channel } };
-handle_info({rfcomm, _ConnRef, {data, Data}},
+handle_info({rfcomm, _ConnRef, {data, Data}},
#st { remote_addr = BTAddr,
channel = Channel,
mod = Mod,
@@ -275,16 +293,16 @@ handle_info({rfcomm, _ConnRef, {data, Data}},
args = Arg } = State) ->
?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]),
- ?info("~p:handle_info(data): ~p:~p -> ~p:~p",
+ ?info("~p:handle_info(data): ~p:~p -> ~p:~p",
[ ?MODULE, BTAddr, Channel, Mod, Fun]),
Self = self(),
- spawn(fun() -> Mod:Fun(Self, BTAddr, Channel,
+ spawn(fun() -> Mod:Fun(Self, BTAddr, Channel,
data, Data, Arg) end),
{noreply, State};
-handle_info({rfcomm, ConnRef, closed},
+handle_info({rfcomm, ConnRef, closed},
#st { remote_addr = BTAddr,
channel = Channel,
listen_ref = ListenRef,
@@ -295,20 +313,20 @@ handle_info({rfcomm, ConnRef, closed},
Mod:Fun(self(), BTAddr, Channel, closed, Arg),
bt_connection_manager:delete_connection_by_pid(self()),
rfcomm:close(ConnRef),
-
+
%% Fire up a new accept process to take care of the next incomign connectionX
- gen_server:start_link(?MODULE, {accept,
- Channel,
- ListenRef,
+ gen_server:start_link(?MODULE, {accept,
+ Channel,
+ ListenRef,
Mod,
- Fun,
+ Fun,
Arg},[]),
%% Stop this process.
{stop, normal, State};
-handle_info({rfcomm, ConnRef, error},
+handle_info({rfcomm, ConnRef, error},
#st { remote_addr = BTAddr,
channel = Channel,
mod = Mod,
@@ -321,6 +339,13 @@ handle_info({rfcomm, ConnRef, error},
bt_connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
+handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod,
+ func = Fun,
+ args = Arg} = St) ->
+ ?debug("~p:handle_info(~p)", [?MODULE, Msg]),
+ {ok, {BTAddr, Channel}} = inet:peername(Sock),
+ Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
+ {noreply, St};
handle_info(_Info, State) ->
?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]),
diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl
index b0c08ab..e6129de 100644
--- a/components/dlink_bt/src/bt_listener.erl
+++ b/components/dlink_bt/src/bt_listener.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -12,7 +12,7 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--export([start_link/0,
+-export([start_link/1,
add_listener/1,
remove_listener/1]).
@@ -25,8 +25,8 @@
}).
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(Mode) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, Mode, []).
add_listener(Channel) ->
gen_server:call(?MODULE, {add_listener, Channel}).
@@ -34,35 +34,37 @@ add_listener(Channel) ->
remove_listener(Channel) ->
gen_server:call(?MODULE, {remove_listener, Channel}).
-init([]) ->
+init(Mode) ->
- {ok, #st {
- listeners = [],
+ {ok, #st {
+ listeners = [],
acceptors = [],
- cs = rvi_common:get_component_specification()
+ cs = rvi_common:set_value(bt_mode, Mode, rvi_common:get_component_specification())
}
}.
-handle_call({add_listener, Channel}, _From, St) ->
+handle_call({add_listener, Channel}, _From, #st{cs = CS} = St) ->
?info("bt_listener:add_listener(): Setting up listener on channel ~p", [ Channel]),
- case rfcomm:listen(Channel) of
+ Mode = rvi_common:get_value(bt_mode, bt, CS),
+ case listen(Mode, Channel) of
{ok, ListenRef} ->
?info("bt_listener:add_listener(): ListenRef: ~p", [ ListenRef]),
- {ok, ConnPid} = bt_connection:accept(Channel,
- ListenRef,
- dlink_bt_rpc,
- handle_socket,
- St#st.cs),
+ {ok, ConnPid} = bt_connection:accept(Channel,
+ ListenRef,
+ Mode,
+ dlink_bt_rpc,
+ handle_socket,
+ CS),
+
-
%%{ noreply, NSt} = handle_info({accept, ListenRef, Channel, ok}, St),
- { reply,
- ok,
- St#st {
+ { reply,
+ ok,
+ St#st {
acceptors = [ { Channel, ConnPid } | St#st.acceptors ],
listeners = [ { ListenRef, Channel } | St#st.listeners ]
}
@@ -89,25 +91,28 @@ handle_info({accept, ListenRef, BTAddr, Channel, ok} , St) ->
%% future incoming connection.
?info("bt_listener:accept(): ListenRef: ~p", [ ListenRef]),
?info("bt_listener:accept(): Remote: ~p-~p", [BTAddr, Channel ]),
-
+
%% Must fix multiple acceptors in bt_linux_drv.c
- %% {ok, ConnPid} = bt_connection:accept(Channel,
- %% ListenRef,
- %% dlink_bt_rpc,
- %% handle_socket,
+ %% {ok, ConnPid} = bt_connection:accept(Channel,
+ %% ListenRef,
+ %% dlink_bt_rpc,
+ %% handle_socket,
%% []),
-
+
%% {noreply, St#st {acceptors = [ { Channel, ConnPid } | St#st.acceptors ]}};
{noreply, St };
handle_info(_Msg, State) ->
?info("bt_listener:handle_info(): Unknown: ~p", [ _Msg]),
-
+
{noreply, State}.
terminate(_Reason, _State) ->
ok.
-
+listen(bt, Channel) ->
+ rfcomm:listen(Channel);
+listen(tcp, Port) ->
+ gen_tcp:listen(Port).
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index c36c997..168d13e 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -40,7 +40,7 @@
-define(DEFAULT_BT_CHANNEL, 1).
-define(DEFAULT_RECONNECT_INTERVAL, 1000).
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-define(CONNECTION_TABLE, rvi_dlink_bt_connections).
-define(SERVICE_TABLE, rvi_dlink_bt_services).
@@ -58,7 +58,7 @@
services = [] %% List of service names available through this connection
}).
--record(st, {
+-record(st, {
cs = #component_spec{}
}).
@@ -71,30 +71,30 @@ tohex(V) when V < 16 ->
tohex(V) ->
integer_to_list(V, 16).
-
+
bt_address_to_string({A1, A2, A3, A4, A5, A6}) ->
tohex(A1) ++ ":" ++
tohex(A2) ++ ":" ++
tohex(A3) ++ ":" ++
- tohex(A4) ++ ":" ++
+ tohex(A4) ++ ":" ++
tohex(A5) ++ ":" ++
tohex(A6).
-
+
init([]) ->
?info("dlink_bt:init(): Called"),
%% Dig out the bert rpc server setup
- ets:new(?SERVICE_TABLE, [ set, public, named_table,
+ ets:new(?SERVICE_TABLE, [ set, public, named_table,
{ keypos, #service_entry.service }]),
- ets:new(?CONNECTION_TABLE, [ set, public, named_table,
+ ets:new(?CONNECTION_TABLE, [ set, public, named_table,
{ keypos, #connection_entry.connection }]),
CS = rvi_common:get_component_specification(),
service_discovery_rpc:subscribe(CS, ?MODULE),
- {ok, #st {
+ {ok, #st {
cs = CS
}
}.
@@ -105,44 +105,59 @@ start_json_server() ->
start_connection_manager() ->
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts,
- [],
+ {ok, BertOpts } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ server_opts,
+ [],
CompSpec),
%% Retrieve the channel we should use
Channel = proplists:get_value(channel, BertOpts, ?DEFAULT_BT_CHANNEL),
-
+
?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
-
- bt:start(),
- bt:debug(debug),
- bt_listener:start_link(),
- bt_connection_manager:start_link(),
+
+ Mode = get_mode(BertOpts),
+ case Mode of
+ bt ->
+ bt:start(),
+ bt:debug(debug);
+ tcp ->
+ ok
+ end,
+ bt_listener:start_link(Mode),
+ bt_connection_manager:start_link(Mode),
?info("dlink_bt:start_connection_manager(): Adding listener on bluetooth channel ~p", [Channel ]),
-
+
%% Add listener channel.
case bt_listener:add_listener(Channel) of
ok ->
ok;
- Err ->
+ Err ->
?error("dlink_bt:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
ok
end,
- {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
- ?MODULE,
- ?PERSISTENT_CONNECTIONS,
- [],
+ {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ ?PERSISTENT_CONNECTIONS,
+ [],
CompSpec),
setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
+get_mode(BertOpts) ->
+ case proplists:get_value(test_mode, BertOpts) of
+ TM when TM==undefined; TM==bt ->
+ bt;
+ tcp ->
+ tcp
+ end.
+
+
setup_persistent_connections_([ ], _CompSpec) ->
ok;
@@ -150,21 +165,21 @@ setup_persistent_connections_([ ], _CompSpec) ->
setup_persistent_connections_([ BTAddress | T], CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]),
[ BTAddr, Channel] = string:tokens(BTAddress, "-"),
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
+ connect_and_retry_remote(BTAddr, Channel, CompSpec),
setup_persistent_connections_(T, CompSpec),
ok.
service_available(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(data_link, ?MODULE,
- service_available,
+ rvi_common:notification(data_link, ?MODULE,
+ service_available,
[{ service, SvcName },
{ data_link_module, DataLinkModule }],
CompSpec).
service_unavailable(CompSpec, SvcName, DataLinkModule) ->
- rvi_common:notification(data_link, ?MODULE,
- service_unavailable,
+ rvi_common:notification(data_link, ?MODULE,
+ service_unavailable,
[{ service, SvcName },
{ data_link_module, DataLinkModule }],
CompSpec).
@@ -184,11 +199,11 @@ disconnect_data_link(CompSpec, NetworkAddress) ->
send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
rvi_common:request(data_link, ?MODULE, send_data,
- [ { proto_mod, ProtoMod },
- { service, Service },
+ [ { proto_mod, ProtoMod },
+ { service, Service },
{ data, Data },
{ opts, DataLinkOpts }
- ],
+ ],
[status], CompSpec).
@@ -209,33 +224,33 @@ connect_remote(BTAddr, Channel, CompSpec) ->
%%FIXME
%% Setup a genserver around the new connection.
- case bt_connection:connect(BTAddr, Channel,
+ case bt_connection:connect(BTAddr, Channel,
?MODULE, handle_socket, CompSpec ) of
- { ok, Pid } ->
- ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p",
+ { ok, Pid } ->
+ ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p",
[BTAddr, Channel, Pid]),
ok;
-
- {error, Err } ->
+
+ {error, Err } ->
?info("dlink_bt:connect_remote(): Failed ~p:~p: ~p",
[BTAddr, Channel, Err]),
not_available
end
end.
-
+
connect_and_retry_remote( BTAddr, Channel, CompSpec) ->
- ?info("dlink_bt:connect_and_retry_remote(): ~p:~p",
+ ?info("dlink_bt:connect_and_retry_remote(): ~p:~p",
[ BTAddr, Channel]),
case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of
ok -> ok;
Err -> %% Failed to connect. Sleep and try again
- ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p",
+ ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p",
[BTAddr, Channel, Err]),
- ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
+ ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
[BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec),
@@ -248,14 +263,14 @@ connect_and_retry_remote( BTAddr, Channel, CompSpec) ->
announce_local_service_(_CompSpec, [], _Service, _Availability) ->
ok;
-announce_local_service_(CompSpec,
+announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
[ ok, JWT ] = authorize_rpc:sign_message(
CompSpec, availability_msg(Availability, [Service])),
- Res = bt_connection:send(ConnPid,
+ Res = bt_connection:send(ConnPid,
term_to_json(
- { struct,
+ { struct,
[
{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
{ ?DLINK_ARG_TRANSACTION_ID, 3},
@@ -263,19 +278,19 @@ announce_local_service_(CompSpec,
]
})),
- ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p",
+ ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
%% Move on to next connection.
- announce_local_service_(CompSpec,
+ announce_local_service_(CompSpec,
T,
Service, Availability).
announce_local_service_(CompSpec, Service, Availability) ->
- ?debug("dlink_bt:announce_local_service(~p, ~p)",
+ ?debug("dlink_bt:announce_local_service(~p, ~p)",
[ Service, Availability]),
- announce_local_service_(CompSpec,
+ announce_local_service_(CompSpec,
get_connections(),
Service, Availability).
@@ -294,7 +309,7 @@ availability_msg(Availability, Services) ->
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-
+
process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
{ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg),
@@ -312,13 +327,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
end.
process_authorize(FromPid,
- PeerBTAddr,
+ PeerBTAddr,
PeerBTChannel,
- TransactionID,
- RemoteAddress,
- RemoteChannel,
- Protocol,
- Certificates,
+ TransactionID,
+ RemoteAddress,
+ RemoteChannel,
+ Protocol,
+ Certificates,
Signature,
CompSpec) ->
@@ -333,7 +348,7 @@ process_authorize(FromPid,
%% with the conneciton manager, this is an incoming connection
%% from the client. We should respond with our own authorize followed by
%% a service announce
-
+
Conn = {RemoteAddress, RemoteChannel},
case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of
true ->
@@ -343,7 +358,7 @@ process_authorize(FromPid,
false
end.
-handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
+handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
Payload, CompSpec) ->
{ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)),
@@ -351,34 +366,34 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
- RemoteChannel,
+ [ TransactionID,
+ RemoteAddress,
+ RemoteChannel,
RVIProtocol,
CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
- ?DLINK_ARG_PORT,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ Signature ] =
+ opts([?DLINK_ARG_TRANSACTION_ID,
+ ?DLINK_ARG_ADDRESS,
+ ?DLINK_ARG_PORT,
+ ?DLINK_ARG_VERSION,
+ ?DLINK_ARG_CERTIFICATES,
+ ?DLINK_ARG_SIGNATURE],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
+
+ Certificates =
+ case CertificatesTmp of
{ array, C} -> C;
undefined -> []
end,
process_authorize(FromPid, PeerBTAddr, RemoteChannel,
- TransactionID, RemoteAddress, RemoteChannel,
+ TransactionID, RemoteAddress, RemoteChannel,
RVIProtocol, Certificates, Signature, CompSpec);
-
-
+
+
?DLINK_CMD_SERVICE_ANNOUNCE ->
Conn = {PeerBTAddr, PeerChannel},
- [ TransactionID, Signature ] =
+ [ TransactionID, Signature ] =
opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE],
Elems, undefined),
case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
@@ -390,11 +405,11 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
end;
?DLINK_CMD_RECEIVE ->
- [ _TransactionID,
- ProtocolMod,
- Data ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_MODULE,
+ [ _TransactionID,
+ ProtocolMod,
+ Data ] =
+ opts([?DLINK_ARG_TRANSACTION_ID,
+ ?DLINK_ARG_MODULE,
?DLINK_ARG_DATA],
Elems, undefined),
process_data(FromPid, PeerBTAddr, PeerChannel,
@@ -404,7 +419,7 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]),
ok;
- undefined ->
+ undefined ->
?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
ok
end.
@@ -428,17 +443,17 @@ handle_socket(FromPid, SetupBTAddr, SetupChannel, closed, CompSpec) ->
case get_connections_by_service(SvcName) of
[] ->
service_discovery_rpc:
- unregister_services(CompSpec,
- [SvcName],
+ unregister_services(CompSpec,
+ [SvcName],
?MODULE);
_ -> ok
end
end, LostSvcNameList),
- {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
- ?MODULE,
- persistent_connections,
- [],
+ {ok, PersistentConnections } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ persistent_connections,
+ [],
CompSpec),
%% Check if this is a static node. If so, setup a timer for a reconnect
case lists:member(NetworkAddress, PersistentConnections) of
@@ -447,7 +462,7 @@ handle_socket(FromPid, SetupBTAddr, SetupChannel, closed, CompSpec) ->
?info("dlink_bt:closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]),
[ BTAddr, Channel] = string:tokens(NetworkAddress, "-"),
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL,
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL,
BTAddr, Channel, CompSpec);
false -> ok
end,
@@ -473,7 +488,7 @@ handle_notification("service_available", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- gen_server:cast(?SERVER, { rvi, service_available,
+ gen_server:cast(?SERVER, { rvi, service_available,
[ SvcName,
DataLinkModule ]}),
@@ -482,7 +497,7 @@ handle_notification("service_unavailable", Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- gen_server:cast(?SERVER, { rvi, service_unavailable,
+ gen_server:cast(?SERVER, { rvi, service_unavailable,
[ SvcName,
DataLinkModule ]}),
@@ -497,7 +512,7 @@ handle_rpc("setup_data_link", Args) ->
{ ok, Opts } = rvi_common:get_json_element(["opts"], Args),
- [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link,
+ [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link,
[ Service, Opts ] }),
{ok, [ {status, rvi_common:json_rpc_status(Res)} , { timeout, Timeout }]};
@@ -512,11 +527,11 @@ handle_rpc("send_data", Args) ->
{ ok, Service } = rvi_common:get_json_element(["service"], Args),
{ ok, Data } = rvi_common:get_json_element(["data"], Args),
{ ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args),
- [ Res ] = gen_server:call(?SERVER,
- { rvi, send_data,
+ [ Res ] = gen_server:call(?SERVER,
+ { rvi, send_data,
[ProtoMod, Service, Data, DataLinkOpts]}),
{ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
-
+
handle_rpc(Other, _Args) ->
?info("dlink_bt:handle_rpc(~p): unknown", [ Other ]),
@@ -541,7 +556,7 @@ handle_cast( {rvi, service_unavailable, [SvcName, local]}, St) ->
{noreply, St};
handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) ->
- %% We don't care about remote services available through
+ %% We don't care about remote services available through
%% other data link modules
{noreply, St};
@@ -561,7 +576,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
[Service]),
{ reply, [ok, -1 ], St };
- Addr ->
+ Addr ->
[ Address, Channel] = string:tokens(Addr, "-"),
case connect_remote(Address, list_to_integer(Channel), St#st.cs) of
@@ -569,7 +584,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
{ reply, [ok, 2000], St }; %% 2 second timeout
already_connected -> %% We are already connected
- { reply, [already_connected, -1], St };
+ { reply, [already_connected, -1], St };
Err ->
{ reply, [Err, 0], St }
@@ -595,34 +610,34 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
{ reply, [ no_route ], St};
%% FIXME: What to do if we have multiple connections to the same service?
- [ConnPid | _T] ->
+ [ConnPid | _T] ->
?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]),
- Res = bt_connection:send(ConnPid,
+ Res = bt_connection:send(ConnPid,
term_to_json(
- {struct,
+ {struct,
[ { ?DLINK_ARG_TRANSACTION_ID, 1 },
{ ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
{ ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
{ ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
]})),
-
+
{ reply, [ Res ], St}
end;
-
+
handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
- {ok, ServerOpts } = rvi_common:get_module_config(data_link,
+ {ok, ServerOpts } = rvi_common:get_module_config(data_link,
?MODULE,
- server_opts, [],
+ server_opts, [],
St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
- ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec",
+ ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec",
[ Address, Channel, Timeout] ),
-
+
erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Channel, Timeout }),
{reply, ok, St};
@@ -641,12 +656,12 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
true ->
?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]),
bt_connection:send(Pid, term_to_json(
- { struct,
- [ { ?DLINK_ARG_CMD,
- ?DLINK_CMD_PING
+ { struct,
+ [ { ?DLINK_ARG_CMD,
+ ?DLINK_CMD_PING
}]})),
- erlang:send_after(Timeout, self(),
+ erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Channel, Timeout });
false ->
@@ -671,9 +686,9 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
{ok,[{address, Address }]} = bt_drv:local_info([address]),
- bt_connection:send(Pid,
+ bt_connection:send(Pid,
term_to_json(
- {struct,
+ {struct,
[ { ?DLINK_ARG_TRANSACTION_ID, 1 },
{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
{ ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
@@ -697,7 +712,7 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
%% Send our own servide announcement to the remote server
%% that just authorized to us.
[ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
-
+
[ ok, FilteredServices ] = authorize_rpc:filter_by_service(
CompSpec, LocalServices, Conn),
@@ -707,9 +722,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
[ ok, JWT ] = authorize_rpc:sign_message(
CompSpec, availability_msg(available, FilteredServices)),
- bt_connection:send(FromPid,
+ bt_connection:send(FromPid,
term_to_json(
- {struct,
+ {struct,
[ { ?DLINK_ARG_TRANSACTION_ID, 1 },
{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
{ ?DLINK_ARG_SIGNATURE, JWT } ]})),
@@ -719,8 +734,8 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
ok.
setup_reconnect_timer(MSec, BTAddr, Channel, CompSpec) ->
- erlang:send_after(MSec, ?MODULE,
- { rvi_setup_persistent_connection,
+ erlang:send_after(MSec, ?MODULE,
+ { rvi_setup_persistent_connection,
BTAddr, Channel, CompSpec }),
ok.
@@ -739,20 +754,20 @@ get_connections_by_service(Service) ->
Connections;
[] -> []
end.
-
+
add_services(SvcNameList, ConnPid) ->
%% Create or replace existing connection table entry
%% with the sum of new and old services.
?debug("dlink_bt:add_services(~p, ~p)", [ ConnPid, SvcNameList]),
- ets:insert(?CONNECTION_TABLE,
+ ets:insert(?CONNECTION_TABLE,
#connection_entry {
connection = ConnPid,
services = SvcNameList ++ get_services_by_connection(ConnPid)
}),
%% Add the connection to the service entry for each service.
- [ ets:insert(?SERVICE_TABLE,
+ [ ets:insert(?SERVICE_TABLE,
#service_entry {
service = SvcName,
connections = [ConnPid | get_connections_by_service(SvcName)]
@@ -765,15 +780,15 @@ add_services(SvcNameList, ConnPid) ->
delete_services(ConnPid, SvcNameList) ->
- ets:insert(?CONNECTION_TABLE,
+ ets:insert(?CONNECTION_TABLE,
#connection_entry {
connection = ConnPid,
services = get_services_by_connection(ConnPid) -- SvcNameList
}),
-
+
%% Loop through all services and update the conn table
%% Update them with a new version where ConnPid has been removed
- [ ets:insert(?SERVICE_TABLE,
+ [ ets:insert(?SERVICE_TABLE,
#service_entry {
service = SvcName,
connections = get_connections_by_service(SvcName) -- [ConnPid]
@@ -785,7 +800,7 @@ delete_connection(Conn) ->
%% with the sum of new and old services.
SvcNameList = get_services_by_connection(Conn),
- %% Replace each existing connection entry that has
+ %% Replace each existing connection entry that has
%% SvcName with a new one where the SvcName is removed.
lists:map(fun(SvcName) ->
Existing = get_connections_by_service(SvcName),
@@ -795,12 +810,12 @@ delete_connection(Conn) ->
connections = Existing -- [ Conn ]
})
end, SvcNameList),
-
+
%% Delete the connection
ets:delete(?CONNECTION_TABLE, Conn),
ok.
-
+
get_connections('$end_of_table', Acc) ->
Acc;
@@ -808,7 +823,7 @@ get_connections('$end_of_table', Acc) ->
get_connections(Key, Acc) ->
get_connections(ets:next(?CONNECTION_TABLE, Key), [ Key | Acc ]).
-
+
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 068a0c5..77300d9 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -196,7 +196,7 @@ handle_cast({activate_socket, Sock}, State) ->
handle_cast(_Msg, State) ->
- ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
+ ?warning("handle_cast(): Unknown call: ~p", [_Msg]),
{noreply, State}.
%%--------------------------------------------------------------------
@@ -225,17 +225,17 @@ handle_info({tcp, Sock, Data},
func = Fun,
args = Arg,
pst = PST} = State) ->
- ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
- ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]),
+ ?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
case jsx_decode_stream(Data, PST) of
{ [], NPST } ->
- ?debug("~p:handle_info(data incomplete)", [ ?MODULE]),
+ ?debug("handle_info(data incomplete)", []),
inet:setopts(Sock, [{active, once}]),
{noreply, State#st { pst = NPST} };
{ JSONElements, NPST } ->
- ?debug("~p:handle_info(data complete): Processed: ~p", [ ?MODULE, JSONElements]),
+ ?debug("data complete: Processed: ~p",
+ [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]),
FromPid = self(),
[Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg)
|| SingleElem <- JSONElements],
@@ -251,7 +251,7 @@ handle_info({tcp_closed, Sock},
mod = Mod,
func = Fun,
args = Arg } = State) ->
- ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]),
+ ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
Mod:Fun(self(), IP, Port,closed, Arg),
gen_tcp:close(Sock),
connection_manager:delete_connection_by_pid(self()),
@@ -265,14 +265,14 @@ handle_info({tcp_error, _Sock},
func = Fun,
args = Arg} = State) ->
- ?debug("~p:handle_info(tcp_error): Address: ~p:~p ", [ ?MODULE, IP, Port]),
+ ?debug("handle_info(tcp_error): Address: ~p:~p ", [IP, Port]),
Mod:Fun(self(), IP, Port, error, Arg),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
handle_info(_Info, State) ->
- ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]),
+ ?warning("handle_cast(): Unknown info: ~p", [_Info]),
{noreply, State}.
%%--------------------------------------------------------------------
@@ -287,7 +287,7 @@ handle_info(_Info, State) ->
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
- ?debug("~p:terminate(): Reason: ~p ", [ ?MODULE, _Reason]),
+ ?debug("terminate(): Reason: ~p ", [_Reason]),
ok.
%%--------------------------------------------------------------------
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 2456f3e..b3ec80d 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -92,32 +92,14 @@ start_json_server() ->
start_connection_manager() ->
- CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- ?SERVER_OPTS,
- [],
- CompSpec),
- IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS),
- Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT),
-
- ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]),
-
%% Fire up listener
+ CompSpec = rvi_common:get_component_specification(),
connection_manager:start_link(),
+ ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]),
{ok,Pid} = listener:start_link(),
- ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
+ %%
+ setup_initial_listeners(Pid, CompSpec),
- %% Add listener port.
- case listener:add_listener(Pid, IP, Port, CompSpec) of
- ok ->
- ?notice("---- RVI Node External Address: ~s",
- [ application:get_env(rvi_core, node_address, undefined)]);
-
- Err ->
- ?error("dlink_tcp:init_rvi_component(): Failed to launch listener: ~p", [ Err ]),
- ok
- end,
?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."),
{ok, PersistentConnections } = rvi_common:get_module_config(data_link,
@@ -126,14 +108,38 @@ start_connection_manager() ->
[],
CompSpec),
-
setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
+
+setup_initial_listeners(Pid, CompSpec) ->
+ case rvi_common:get_module_config(data_link,
+ ?MODULE,
+ ?SERVER_OPTS,
+ CompSpec) of
+ {ok, ServerOpts} ->
+ IP = proplists:get_value(ip, ServerOpts, ?DEFAULT_TCP_ADDRESS),
+ Port = proplists:get_value(port, ServerOpts, ?DEFAULT_TCP_PORT),
+ ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
+ %%
+ %% Add listener port.
+ case listener:add_listener(Pid, IP, Port, CompSpec) of
+ ok ->
+ ?notice("---- RVI Node External Address: ~s",
+ [ application:get_env(rvi_core, node_address, undefined)]);
+ Err ->
+ ?warning(
+ "dlink_tcp:init_rvi_component(): Failed to launch listener (~p,~p): ~p",
+ [ IP, Port, Err ]),
+ ok
+ end;
+ {error, _} ->
+ ?info("dlink_tcp: no initial listeners", [])
+ end.
+
setup_persistent_connections_([ ], _CompSpec) ->
ok;
-
setup_persistent_connections_([ NetworkAddress | T], CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]),
[ IP, Port] = string:tokens(NetworkAddress, ":"),
@@ -188,13 +194,14 @@ connect_remote(IP, Port, CompSpec) ->
?info("connect_remote(~p, ~p)~n", [IP, Port]),
case connection_manager:find_connection_by_address(IP, Port) of
{ ok, _Pid } ->
+ log("already connected", [], CompSpec),
already_connected;
not_found ->
%% Setup a new outbound connection
?info("dlink_tcp:connect_remote(): Connecting ~p:~p",
[IP, Port]),
-
+ log("new connection", [], CompSpec),
case gen_tcp:connect(IP, Port, listener:sock_opts()) of
{ ok, Sock } ->
?info("dlink_tcp:connect_remote(): Connected ~p:~p",
@@ -217,12 +224,14 @@ connect_remote(IP, Port, CompSpec) ->
{ ?DLINK_ARG_CERTIFICATES,
get_certificates(CompSpec) },
{ ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
+ | rvi_common:log_id_json_tail(CompSpec)
])),
ok;
{error, Err } ->
?info("dlink_tcp:connect_remote(): Failed ~p:~p: ~p",
[IP, Port, Err]),
+ log("connect FAILED: ~w", [Err], CompSpec),
not_available
end
end.
@@ -231,8 +240,11 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
?info("dlink_tcp:connect_and_retry_remote(): ~p:~p",
[ IP, Port]),
- case connect_remote(IP, list_to_integer(Port), CompSpec) of
- ok -> ok;
+ CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
+ case connect_remote(IP, list_to_integer(Port), CS) of
+ ok ->
+ log("connected", [], CS),
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Failed: ~p",
@@ -240,8 +252,8 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
?notice("dlink_tcp:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
[IP, Port, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec),
+ log("start reconnect timer", [], CS),
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS),
not_available
end.
@@ -262,6 +274,7 @@ announce_local_service_(CompSpec,
[ { ?DLINK_ARG_TRANSACTION_ID, 1 },
{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
{ ?DLINK_ARG_SIGNATURE, JWT }
+ | rvi_common:log_id_json_tail(CompSpec)
])),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
@@ -332,14 +345,18 @@ handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
- ?debug("dlink_tcp:data(): Elems ~p", [Elems]),
+ ?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]),
+
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
+ ?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
[ TransactionID,
RemoteAddress,
RemotePort,
@@ -362,9 +379,10 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
end,
process_authorize(FromPid, PeerIP, PeerPort,
TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CompSpec);
+ ProtoVersion, Signature, Certificates, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
+ ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
[ TransactionID,
ProtoVersion,
Signature ] =
@@ -374,12 +392,14 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
Elems, undefined),
Conn = {PeerIP, PeerPort},
+ log("sa from ~s:~w", [PeerIP, PeerPort], CS),
case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
[ok, Msg] ->
?debug("Service Announce~nMsg = ~p~n", [Msg]),
process_announce(Msg, FromPid, PeerIP, PeerPort,
TransactionID, ProtoVersion, CompSpec);
_ ->
+ log("sa INVALID", [], CS),
?debug("Couldn't validate availability msg from ~p", [Conn])
end;
@@ -684,10 +704,10 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]),
+ ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]),
+ ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
- { _NRemoteAddress, _NRemotePort} = Conn =
+ {NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -697,6 +717,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
+ log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
true ->
connection_authorized(FromPid, Conn, CompSpec);
@@ -715,13 +736,15 @@ send_authorize(Pid, CompSpec) ->
{ ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
{ ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
{ ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ])).
+ { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
+ | rvi_common:log_id_json_tail(CompSpec) ])).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the conneciton manager, this is an incoming connection
%% from the client. We should respond with our own authorize followed by
%% a service announce
+ log("authorized: ~s:~p", [RemoteIP, RemotePort], CompSpec),
case connection_manager:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_tcp:authorize(): New connection!"),
@@ -746,11 +769,13 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
[ ok, JWT ] = authorize_rpc:sign_message(
CompSpec, availability_msg(available, FilteredServices)),
+ log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
connection:send(FromPid,
rvi_common:term_to_json(
[ { ?DLINK_ARG_TRANSACTION_ID, 1 },
{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ])),
+ { ?DLINK_ARG_SIGNATURE, JWT }
+ | rvi_common:log_id_json_tail(CompSpec)])),
%% Setup ping interval
gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
@@ -850,3 +875,15 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
[ opt(K, Elems, Def) || K <- Keys].
+
+
+log_orphan(Pfx, Fmt, Args) ->
+ start_log(Pfx, Fmt, Args, #component_spec{}).
+
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
+
+log(Fmt, Args, CS) ->
+ rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index 432cf54..abeb19c 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -28,7 +28,7 @@
terminate/2, code_change/3]).
-export([setup/6]).
--export([upgrade/2]).
+-export([upgrade/3]).
-export([send/2]).
-export([send/3]).
-export([is_connection_up/1]).
@@ -49,7 +49,8 @@
packet_st = [],
mod = undefined,
func = undefined,
- args = undefined
+ cs,
+ role = server :: client | server
}).
%%%===================================================================
@@ -57,11 +58,10 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, Arg) ->
- setup(IP, Port, Sock, Mod, Fun, Arg, []).
-
-setup(IP, Port, Sock, Mod, Fun, Arg, Opts) ->
- Params = {IP, Port, Sock, Mod, Fun, Arg, Opts},
+setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
+ Params = {IP, Port, Sock, Mod, Fun, CompSpec},
+ ?debug("setup() IP = ~p; Port = ~p; Mod = ~p; Fun = ~p", [IP, Port, Mod, Fun]),
+ ?debug("CompSpec = ~p", [CompSpec]),
case gen_server:start_link(?MODULE, Params ,[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
@@ -72,8 +72,8 @@ setup(IP, Port, Sock, Mod, Fun, Arg, Opts) ->
Err
end.
-upgrade(Pid, Role) ->
- gen_server:call(Pid, {upgrade, Role}).
+upgrade(Pid, Role, CompSpec) when Role==client; Role==server ->
+ gen_server:call(Pid, {upgrade, Role, CompSpec}).
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
@@ -132,7 +132,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, Arg, Opts}) ->
+init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> dlink_tls_connmgr:add_connection(IP, Port, self())
@@ -143,10 +143,10 @@ init({IP, Port, Sock, Mod, Fun, Arg, Opts}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- ?debug("connection:init(): Arg: ~p", [Arg]),
%% Grab socket control
- PktMod = opt(packet_mod, Opts, ?PACKET_MOD),
- PktSt = PktMod:init(Opts),
+ {ok, PktMod} = rvi_common:get_module_config(dlink_tls, dlink_tls_rpc,
+ packet_mod, ?PACKET_MOD, CompSpec),
+ PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -155,7 +155,7 @@ init({IP, Port, Sock, Mod, Fun, Arg, Opts}) ->
packet_mod = PktMod,
packet_st = PktSt,
func = Fun,
- args = Arg
+ cs = CompSpec
}}.
@@ -181,16 +181,20 @@ handle_call(terminate_connection, _From, St) ->
{stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St),
{stop, Reason, ok, NSt};
-handle_call({upgrade, Role} = Req, _From, #st{sock = S} = St) ->
+handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
?debug("~p:handle_call(~p)~n", [?MODULE, Req]),
{ok, [{active, Last}]} = inet:getopts(S, [active]),
inet:setopts(S, [{active, false}]),
- case do_upgrade(S, Role) of
+ case do_upgrade(S, Role, CompSpec) of
{ok, NewS} ->
?debug("upgrade to TLS succcessful~n", []),
- ssl:setopts(NewS, [{active, Last}]),
- {reply, ok, St#st{sock = NewS, mode = tls}};
+ ssl:setopts(NewS, [{active, Last}]),
+ {ok, {IP, Port}} = ssl:peername(NewS),
+ NewCS = rvi_common:set_value(dlink_tls_role, client, CompSpec),
+ {reply, ok, St#st{sock = NewS, mode = tls, role = Role,
+ ip = inet_parse:ntoa(IP), port = Port,
+ cs = NewCS}};
Error ->
?error("Cannot upgrade to TLS: ~p~n", [Error]),
{stop, Error, Error, St}
@@ -251,17 +255,14 @@ handle_info({tcp, Sock, Data},
handle_info({ssl, Sock, Data},
#st{ip = IP, port = Port,
- mod = Mod, func = Fun, args = Arg,
+ mod = Mod, func = Fun, cs = CS,
packet_mod = PMod, packet_st = PSt} = State) ->
?debug("handle_info(data): ~p", [Data]),
case PMod:decode(Data, PSt) of
{ok, Elements, PSt1} ->
?debug("~p:handle_info(data complete): Processed: ~p",
[?MODULE, Elements]),
- FromPid = self(),
- spawn(fun() -> [Mod:Fun(FromPid, IP, Port,
- data, Elem, Arg) || Elem <- Elements]
- end),
+ Mod:Fun(self(), IP, Port, data, Elements, CS),
ssl:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
@@ -276,7 +277,7 @@ handle_info({tcp, Sock, Data},
port = Port,
mod = Mod,
func = Fun,
- args = Arg,
+ cs = CS,
packet_mod = PMod,
packet_st = PSt} = State) ->
?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
@@ -286,10 +287,7 @@ handle_info({tcp, Sock, Data},
{ok, Elements, PSt1} ->
?debug("~p:handle_info(data complete): Processed: ~p",
[?MODULE, Elements]),
- FromPid = self(),
- spawn(fun() -> [Mod:Fun(FromPid, IP, Port,
- data, Elem, Arg) || Elem <- Elements]
- end),
+ Mod:Fun(self(), IP, Port, data, Elements, CS),
inet:setopts(Sock, [{active, once}]),
{noreply, State#st{packet_st = PSt1}};
{error, Reason} ->
@@ -305,9 +303,9 @@ handle_info({tcp_closed, Sock},
port = Port,
mod = Mod,
func = Fun,
- args = Arg } = State) ->
+ cs = CS} = State) ->
?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]),
- Mod:Fun(self(), IP, Port,closed, Arg),
+ Mod:Fun(self(), IP, Port,closed, CS),
gen_tcp:close(Sock),
dlink_tls_connmgr:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -317,10 +315,10 @@ handle_info({tcp_error, _Sock},
port = Port,
mod = Mod,
func = Fun,
- args = Arg} = State) ->
+ cs = CS} = State) ->
?debug("~p:handle_info(tcp_error): Address: ~p:~p ", [ ?MODULE, IP, Port]),
- Mod:Fun(self(), IP, Port, error, Arg),
+ Mod:Fun(self(), IP, Port, error, CS),
dlink_tls_connmgr:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -359,27 +357,24 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+do_upgrade(Sock, client, CompSpec) ->
+ ssl:connect(Sock, tls_opts(client, CompSpec));
+do_upgrade(Sock, server, CompSpec) ->
+ ssl:ssl_accept(Sock, tls_opts(server, CompSpec)).
+
%% FIXME: For now, use the example certs delivered with the OTP SSL appl.
-tls_opts(Role) ->
+tls_opts(Role, CompSpec) ->
Dir = tls_dir(Role),
- [{certfile, filename:join(Dir, "cert.pem")},
- {keyfile, filename:join(Dir, "key.pem")}].
+ {ok, CertFile} = get_config(certfile, filename:join(Dir, "cert.pem"), CompSpec),
+ {ok, KeyFile} = get_config(keyfile, filename:join(Dir, "key.pem"), CompSpec),
+ [{certfile, CertFile},
+ {keyfile, KeyFile}].
tls_dir(Role) when Role==client;
- Role==server ->
+ Role==server ->
filename:join([code:lib_dir(ssl), "examples", "certs", "etc",
- atom_to_list(Role)]).
-
-do_upgrade(Sock, client) ->
- ssl:connect(Sock, tls_opts(client));
-do_upgrade(Sock, server) ->
- ssl:ssl_accept(Sock, tls_opts(server)).
+ atom_to_list(Role)]).
-
-opt(K, Opts, Def) ->
- case lists:keyfind(K, 1, Opts) of
- {_, V} ->
- V;
- false ->
- Def
- end.
+get_config(Key, Default, CompSpec) ->
+ rvi_common:get_module_config(
+ dlink_tls, dlink_tls_rpc, Key, Default, CompSpec).
diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl
index 64b004f..25da496 100644
--- a/components/dlink_tls/src/dlink_tls_listener.erl
+++ b/components/dlink_tls/src/dlink_tls_listener.erl
@@ -64,18 +64,20 @@ terminate(_Reason, _State) ->
ok.
sock_opts() ->
- [list, {active, once}, {packet, 0}].
+ [{mode, binary}, {active, once}, {packet, 0}].
new_connection(IP, Port, Sock, State) ->
- ?debug("~p:new_connection(): Peer IP: ~p (ignored)", [?MODULE,IP]),
- ?debug("~p:new_connection(): Peer Port: ~p (ignored)", [?MODULE,Port]),
- ?debug("~p:new_connection(): Sock: ~p", [?MODULE,Sock]),
+ ?debug("new_connection(): Peer IP: ~p (ignored)", [IP]),
+ ?debug("new_connection(): Peer Port: ~p (ignored)", [Port]),
+ ?debug("new_connection(): Sock: ~p", [Sock]),
%% IP and Port are garbage. We'll grab peername when we get our
%% first data.
%% Provide component spec as extra arg.
- {ok, _P} = dlink_tls_conn:setup(
- undefined, 0, Sock,
- dlink_tls_rpc,
- handle_socket, [gen_nb_server:get_cb_state(State)]),
+ CompSpec = gen_nb_server:get_cb_state(State),
+ {ok, P} = dlink_tls_conn:setup(
+ undefined, 0, Sock,
+ dlink_tls_rpc,
+ handle_socket, [CompSpec]),
+ dlink_tls_conn:upgrade(P, server, CompSpec),
{ok, State}.
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index 21df19f..964cc0c 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -35,11 +35,11 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
+-include_lib("rvi_common/include/rvi_dlink_bin.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(SERVER_OPTS, server_opts).
--define(DEFAULT_TCP_PORT, 9999).
+-define(DEFAULT_TCP_PORT, 9998).
-define(DEFAULT_RECONNECT_INTERVAL, 5000).
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
@@ -94,13 +94,14 @@ start_json_server() ->
start_connection_manager() ->
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts} = rvi_common:get_module_config(data_link,
- ?MODULE,
- ?SERVER_OPTS,
- [],
- CompSpec),
- IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS),
- Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT),
+ {ok, TlsOpts} = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ ?SERVER_OPTS,
+ [],
+ CompSpec),
+ ?debug("TlsOpts = ~p", [TlsOpts]),
+ IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
+ Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]),
@@ -136,7 +137,8 @@ setup_persistent_connections_([ ], _CompSpec) ->
setup_persistent_connections_([ NetworkAddress | T], CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]),
[ IP, Port] = string:tokens(NetworkAddress, ":"),
- connect_and_retry_remote(IP, Port, CompSpec),
+ %% cast an immediate (re-)connect attempt to dlink_tls_rpc
+ setup_reconnect_timer(0, IP, Port, CompSpec),
setup_persistent_connections_(T, CompSpec),
ok.
@@ -187,41 +189,34 @@ connect_remote(IP, Port, CompSpec) ->
?info("connect_remote(~p, ~p)~n", [IP, Port]),
case dlink_tls_connmgr:find_connection_by_address(IP, Port) of
{ ok, _Pid } ->
+ log("already connected", [], CompSpec),
already_connected;
not_found ->
%% Setup a new outbound connection
- ?info("dlink_tls:connect_remote(): Connecting ~p:~p",
- [IP, Port]),
-
- case gen_tcp:connect(IP, Port, [list, {packet, 0}]) of
+ {ok, Timeout} = rvi_common:get_module_config(
+ dlink_tls, ?MODULE, connect_timeout, 10000, CompSpec),
+ ?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p",
+ [IP, Port, Timeout]),
+ log("new connection", [], CompSpec),
+ case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of
{ ok, Sock } ->
?info("dlink_tls:connect_remote(): Connected ~p:~p",
[IP, Port]),
%% Setup a genserver around the new connection.
{ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
-
+ ?MODULE, handle_socket, CompSpec),
+ UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec),
+ ?debug("Upgrade result = ~p", [UgRes]),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(
- Pid,
- term_to_json(
- {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- ]})),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p",
[IP, Port, Err]),
+ log("connect FAILED: ~w", [Err], CompSpec),
not_available
end
end.
@@ -229,9 +224,11 @@ connect_remote(IP, Port, CompSpec) ->
connect_and_retry_remote( IP, Port, CompSpec) ->
?info("dlink_tls:connect_and_retry_remote(): ~p:~p",
[ IP, Port]),
-
- case connect_remote(IP, list_to_integer(Port), CompSpec) of
- ok -> ok;
+ CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
+ case connect_remote(IP, list_to_integer(Port), CS) of
+ ok ->
+ log("connected", [], CS),
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_tls:connect_and_retry_remote(~p:~p): Failed: ~p",
@@ -239,8 +236,8 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
?notice("dlink_tls:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
[IP, Port, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec),
+ log("start reconnect timer", [], CS),
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS),
not_available
end.
@@ -253,16 +250,12 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
+ AvailabilityMsg = availability_msg(Availability, [Service]),
Res = dlink_tls_conn:send(
ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- ]})),
+ rvi_common:pass_log_id(
+ [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }
+ | AvailabilityMsg ], CompSpec)),
?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -282,7 +275,7 @@ announce_local_service_(CompSpec, Service, Availability) ->
handle_socket(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tls:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -323,57 +316,57 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
end,
ok;
-handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
+handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) ->
?info("dlink_tls:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
+
+ ?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]),
+ ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]),
- ?debug("dlink_tls:data(): Elems ~p", [Elems]),
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
+ ?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
+ [ RemoteAddress,
RemotePort,
- ProtoVersion,
- CertificatesTmp,
Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
?DLINK_ARG_SIGNATURE],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- { array, C} -> C;
- undefined -> []
- end,
- process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CompSpec);
+ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
+ Signature, CS);
+
+ ?DLINK_CMD_CERT_EXCHANGE ->
+ ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]),
+ [ Certs ] =
+ opts([?DLINK_ARG_CERTIFICATES], Elems, undefined),
+ ?debug("Certs = ~p", [Certs]),
+ log("certs from ~s:~w", [PeerIP, PeerPort], CS),
+ authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}),
+ case rvi_common:get_value(dlink_tls_role, client, CS) of
+ client -> ok;
+ server ->
+ send_certs(FromPid, CompSpec)
+ end,
+ ok;
?DLINK_CMD_SERVICE_ANNOUNCE ->
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
Conn = {PeerIP, PeerPort},
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ log("sa from ~s:~w", [PeerIP, PeerPort], CS),
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -644,20 +637,17 @@ delete_services(ConnPid, SvcNameList) ->
ok.
availability_msg(Availability, Services) ->
- {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, {array, Services} }]}.
+ [{ ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
- ?info("dlink_tls:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
- ?info("dlink_tls:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
- ?info("dlink_tls:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tls:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tls:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_tls:authorize(): Signature: ~p", [ Signature ]),
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, Signature, CompSpec) ->
+ ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]),
+ ?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]),
+ ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
{ _NRemoteAddress, _NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
@@ -669,7 +659,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
+ case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of
true ->
connection_authorized(FromPid, Conn, CompSpec);
false ->
@@ -677,37 +667,59 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
false
end.
+send_certs(Pid, CompSpec) ->
+ ?debug("send_certs (Pid = ~p)", [Pid]),
+ {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
+ dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE},
+ {?DLINK_ARG_ADDRESS, LocalIP},
+ {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+ {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)).
+
send_authorize(Pid, CompSpec) ->
+ ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]),
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
- { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+ JWT = get_authorize_jwt(CompSpec),
+ dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE},
+ {?DLINK_ARG_ADDRESS, LocalIP},
+ {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+ {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)).
+ %% dlink_tls_conn:send(Pid,
+ %% term_to_json(
+ %% {struct,
+ %% [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ %% { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ %% { ?DLINK_ARG_ADDRESS, LocalIP },
+ %% { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
+ %% { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
+ %% { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
+ %% { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the connection manager, this is an incoming connection
%% from the client. We should respond with our own authorize followed by
%% a service announce
+ log("authorized ~s:~w", [RemoteIP, RemotePort], CompSpec),
case dlink_tls_connmgr:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_tls:authorize(): New connection!"),
dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid),
?debug("dlink_tls:authorize(): Sending authorize."),
_Res = send_authorize(FromPid, CompSpec),
- ?debug("dlink_tls:upgrade connection", []),
- UgRes = dlink_tls_conn:upgrade(FromPid, server),
- ?debug("upgrade result: ~p", [UgRes]),
+ case rvi_common:get_value(dlink_tls_role, server, CompSpec) of
+ server -> ok;
+ client ->
+ send_certs(FromPid, CompSpec)
+ end,
+ %% ?debug("dlink_tls:upgrade connection", []),
+ %% UgRes = dlink_tls_conn:upgrade(FromPid, server),
+ %% ?debug("upgrade result: ~p", [UgRes]),
ok;
_ ->
- UgRes = dlink_tls_conn:upgrade(FromPid, client),
- ?debug("upgrade result: ~p", [UgRes]),
+ %% UgRes = dlink_tls_conn:upgrade(FromPid, client),
+ %% ?debug("upgrade result: ~p", [UgRes]),
ok
end,
@@ -722,14 +734,12 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tls:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices),
+ log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
dlink_tls_conn:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ]})),
+ rvi_common:pass_log_id(
+ [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }
+ | AvailabilityMsg ], CompSpec)),
%% Setup ping interval
gen_server:call(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
@@ -742,12 +752,8 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
base64:decode_to_string(Data)).
-process_announce({struct, Elems}, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- {array, Svcs} ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tls:service_announce(~p): TransactionID: ~p", [Avail,TID]),
?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]),
case Avail of
?DLINK_ARG_AVAILABLE ->
@@ -810,8 +816,8 @@ get_certificates(CompSpec) ->
error(no_certificate_found)
end.
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
+validate_auth_jwt(JWT, Conn, CompSpec) ->
+ case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
[ok] ->
true;
[not_found] ->
@@ -828,4 +834,18 @@ opt(K, L, Def) ->
end.
opts(Keys, Elems, Def) ->
- [ opt(K, Elems, Def) || K <- Keys].
+ Res = [ opt(K, Elems, Def) || K <- Keys],
+ ?debug("opts(~p) -> ~p", [Keys, Elems]),
+ Res.
+
+
+log_orphan(Pfx, Fmt, Args) ->
+ start_log(Pfx, Fmt, Args, #component_spec{}).
+
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
+
+log(Fmt, Args, CS) ->
+ rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
diff --git a/components/rvi_common/include/rvi_common.hrl b/components/rvi_common/include/rvi_common.hrl
index ca62c25..5deff46 100644
--- a/components/rvi_common/include/rvi_common.hrl
+++ b/components/rvi_common/include/rvi_common.hrl
@@ -2,11 +2,11 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
-%% A record defining the modules to use
+%% A record defining the modules to use
%% Used by rvi_common:request() to
%% figure out how to route an intra-component call
@@ -16,9 +16,11 @@
service_edge :: ?COMP_SPEC_TYPE,
schedule :: ?COMP_SPEC_TYPE,
service_discovery :: ?COMP_SPEC_TYPE,
- authorize :: ?COMP_SPEC_TYPE,
- data_link :: ?COMP_SPEC_TYPE,
- protocol :: ?COMP_SPEC_TYPE
+ authorize :: ?COMP_SPEC_TYPE,
+ data_link :: ?COMP_SPEC_TYPE,
+ protocol :: ?COMP_SPEC_TYPE,
+ rvi_common :: ?COMP_SPEC_TYPE,
+ values = [] :: [{any(), any()}]
}).
-define(COMP_SPEC_SERVICE_EDGE_DEFAULT, [ { service_edge_rpc, gen_server, [] } ]).
@@ -27,3 +29,4 @@
-define(COMP_SPEC_AUTHORIZE_DEFAULT, [ { authorize_rpc, gen_server, [] }]).
-define(COMP_SPEC_DATA_LINK_DEFAULT, [ { dlink_tcp_rpc, gen_server, [] } ]).
-define(COMP_SPEC_PROTOCOL_DEFAULT, [ { protocol, gen_server, [] } ]).
+-define(COMP_SPEC_RVI_COMMON_DEFAULT, [ { rvi_log, gen_server, [] } ]).
diff --git a/components/rvi_common/include/rvi_dlink.hrl b/components/rvi_common/include/rvi_dlink.hrl
index 24e7bc0..ab1e49d 100644
--- a/components/rvi_common/include/rvi_dlink.hrl
+++ b/components/rvi_common/include/rvi_dlink.hrl
@@ -9,6 +9,7 @@
%% Commonly used protocol identifiers across dlink implementations
%%
-define(DLINK_CMD_AUTHORIZE, "au").
+-define(DLINK_CMD_CERT_EXCHANGE, "crt").
-define(DLINK_CMD_SERVICE_ANNOUNCE, "sa").
-define(DLINK_CMD_RECEIVE, "rcv").
-define(DLINK_CMD_FRAG, "frg").
diff --git a/components/rvi_common/include/rvi_dlink_bin.hrl b/components/rvi_common/include/rvi_dlink_bin.hrl
index f48dfd5..f4f4cb8 100644
--- a/components/rvi_common/include/rvi_dlink_bin.hrl
+++ b/components/rvi_common/include/rvi_dlink_bin.hrl
@@ -9,6 +9,7 @@
%% Commonly used protocol identifiers across dlink implementations
%%
-define(DLINK_CMD_AUTHORIZE, <<"au">>).
+-define(DLINK_CMD_CERT_EXCHANGE, <<"crt">>).
-define(DLINK_CMD_SERVICE_ANNOUNCE, <<"sa">>).
-define(DLINK_CMD_RECEIVE, <<"rcv">>).
-define(DLINK_CMD_FRAG, <<"frg">>).
diff --git a/components/rvi_common/src/rvi_common.app.src b/components/rvi_common/src/rvi_common.app.src
index 2c5f1ff..368423b 100644
--- a/components/rvi_common/src/rvi_common.app.src
+++ b/components/rvi_common/src/rvi_common.app.src
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -18,6 +18,7 @@
stdlib
]},
{mod, { rvi_common_app, []}},
- {start_phases, [{setup_config, []}]},
+ {start_phases, [{setup_config, []},
+ {start_json_servers, []}]},
{env, []}
]}.
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index 911bd4a..c9bbabb 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -34,15 +34,25 @@
get_component_modules/1,
get_component_modules/2,
get_module_specification/3,
+ get_module_config/3,
get_module_config/4,
get_module_config/5,
get_module_json_rpc_address/3,
get_module_json_rpc_url/3,
- get_module_genserver_pid/3
+ get_module_genserver_pid/3,
+ get_value/3
]).
--export([utc_timestamp/0]).
+-export([set_value/3]).
+-export([get_log_id/1, %% (CompSpec)
+ get_json_log_id/1, %% (JSONArgs)
+ log_id_json_tail/1, %% (CompSpec)
+ pick_up_json_log_id/2, %% (JSONArgs, CompSpec)
+ pass_log_id/2]). %% (PropList, CompSpec)
+-export([utc_timestamp/0,
+ utc_timestamp/1]).
-export([bin/1]).
--export([start_json_rpc_server/3]).
+-export([start_json_rpc_server/3,
+ start_json_rpc_server/4]).
-export([extract_json/2,
normalize_json/1,
term_to_json/1]).
@@ -87,6 +97,7 @@ status_values() ->
{7, unauthorized}].
get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) ->
+ ?debug("JSONBody = ~p", [JSONBody]),
case get_json_element(["result", "status"], JSONBody) of
{ok, Value} ->
{ json_rpc_status(Value), JSONBody };
@@ -110,7 +121,7 @@ get_request_result(Other)->
json_argument([], [], Acc) ->
- Acc;
+ lists:reverse(Acc);
json_argument([Arg | AT], [Spec | ST], Acc) when is_atom(Arg)->
json_argument(AT, ST, [ { Spec, atom_to_list(Arg) } | Acc]);
@@ -121,24 +132,26 @@ json_argument([Arg | AT], [Spec | ST], Acc) ->
%% Convert a list of unnamed arguments to a proplist
%% understood by json encode
json_argument(ArgList, SpecList) ->
- { struct, json_argument(ArgList, SpecList, []) }.
+ json_argument(ArgList, SpecList, []).
request(Component,
Module,
Function,
- InArgPropList,
+ InArgPropList0,
OutArgSpec,
CompSpec) ->
%% Split [ { network_address, "127.0.0.1:888" } , { timeout, 34 } ] to
%% [ "127.0.0.1:888", 34] [ network_address, timeout ]
+ InArgPropList = pass_log_id(InArgPropList0, CompSpec),
InArg = [ Val || { _Key, Val } <- InArgPropList ],
InArgSpec = [ Key || { Key, _Val } <- InArgPropList ],
%% Figure out how we are to invoke this MFA.
case get_module_type(Component, Module, CompSpec) of
%% We have a gen_server
{ ok, gen_server } ->
- ?debug("Sending ~p - ~p:~p(~p)", [Component, Module, Function, InArg]),
+ ?debug("Sending ~p - ~p:~p(~p)", [Component, Module, Function,
+ authorize_keys:abbrev_payload(InArg)]),
gen_server:call(Module, { rvi, Function, InArg});
%% We have a JSON-RPC server
@@ -161,15 +174,15 @@ request(Component,
Err1 -> Err1
end.
-
notification(Component,
Module,
Function,
- InArgPropList,
+ InArgPropList0,
CompSpec) ->
%% Split [ { network_address, "127.0.0.1:888" } , { timeout, 34 } ] to
%% [ "127.0.0.1:888", 34] [ network_address, timeout ]
+ InArgPropList = pass_log_id(InArgPropList0, CompSpec),
InArg = [ Val || { _Key, Val } <- InArgPropList ],
InArgSpec = [ Key || { Key, _Val } <- InArgPropList ],
%% Figure out how we are to invoke this MFA.
@@ -202,6 +215,14 @@ notification(Component,
ok
end.
+pass_log_id(PList, CompSpec) ->
+ case get_value(rvi_log_id, undefined, CompSpec) of
+ undefined ->
+ PList;
+ ID ->
+ lists:keystore(rvi_log_id, 1, PList, {rvi_log_id, ID})
+ end.
+
send_json_request(Url,Method, Args) ->
Req = jsx_encode([{<<"jsonrpc">>, <<"2.0">>},
@@ -318,7 +339,7 @@ get_json_element_([Elem | T], JSON) ->
{'OR', Alts} ->
get_json_element_(T, get_json_element_alt(Alts, JSON));
_ ->
- get_json_element_(T, get_value(Elem, JSON, undefined))
+ get_json_element_(T, get_json_value(Elem, JSON, undefined))
end;
get_json_element_(Path,JSON) ->
@@ -342,12 +363,14 @@ member(K, [H|T]) ->
member(_, []) ->
false.
-get_value(K, [{K1,V}|T], Def) ->
+get_json_value(K, [{K1,V}|T], Def) ->
case comp(K, K1) of
true -> V;
- false -> get_value(K, T, Def)
+ false -> get_json_value(K, T, Def)
end;
-get_value(_, [], Def) ->
+get_json_value(K, [_|T], Def) ->
+ get_json_value(K, T, Def);
+get_json_value(_, [], Def) ->
Def.
comp(A, A) -> true;
@@ -497,7 +520,8 @@ get_component_specification_() ->
service_discovery = ?COMP_SPEC_SERVICE_DISCOVERY_DEFAULT,
authorize = ?COMP_SPEC_AUTHORIZE_DEFAULT,
data_link = ?COMP_SPEC_DATA_LINK_DEFAULT,
- protocol = ?COMP_SPEC_PROTOCOL_DEFAULT
+ protocol = ?COMP_SPEC_PROTOCOL_DEFAULT,
+ rvi_common = ?COMP_SPEC_RVI_COMMON_DEFAULT
};
CompList ->
@@ -545,10 +569,12 @@ get_component_modules(data_link, CompSpec) ->
get_component_modules(protocol, CompSpec) ->
CompSpec#component_spec.protocol;
+get_component_modules(rvi_common, CompSpec) ->
+ CompSpec#component_spec.rvi_common;
+
get_component_modules(_, _) ->
undefined.
-
%% Get the spec for a specific module (protocol_bert_rpc) within
%% a component (protocol).
get_module_specification(Component, Module, CompSpec) ->
@@ -577,6 +603,9 @@ get_module_specification(Component, Module, CompSpec) ->
end
end.
+get_module_config(Component, Module, Key) ->
+ get_module_config(Component, Module, Key, get_component_specification()).
+
%% Get a specific option (bert_rpc_port) for a specific module
%% (protocol_bert_rpc) within a component (protocol).
get_module_config(Component, Module, Key, CompSpec) ->
@@ -613,6 +642,43 @@ get_module_config(Component, Module, Key, Default, CompSpec) ->
_ -> {ok, Default }
end.
+set_value(Key, Value, #component_spec{values = Keys} = CompSpec) ->
+ CompSpec#component_spec{values = lists:keystore(Key, 1, Keys, {Key, Value})}.
+
+get_value(Key, Default, #component_spec{values = Values}) ->
+ case lists:keyfind(Key, 1, Values) of
+ {_, Value} ->
+ Value;
+ false ->
+ Default
+ end.
+
+get_log_id(CS) ->
+ get_value(rvi_log_id, <<"null">>, CS).
+
+log_id_json_tail(CS) ->
+ case get_value(rvi_log_id, undefined, CS) of
+ undefined ->
+ [];
+ Id ->
+ [{<<"rvi_log_id">>, Id}]
+ end.
+
+get_json_log_id(Args) ->
+ case get_json_element([<<"rvi_log_id">>], Args) of
+ {ok, ID} ->
+ ID;
+ {error, _} ->
+ <<"null">>
+ end.
+
+pick_up_json_log_id(Args, CS) ->
+ case get_json_element([<<"rvi_log_id">>], Args) of
+ {ok, ID} ->
+ set_value(rvi_log_id, ID, CS);
+ {error, _} ->
+ CS
+ end.
get_module_type(Component, Module, CompSpec) ->
case get_module_specification(Component, Module, CompSpec) of
@@ -683,13 +749,16 @@ get_module_genserver_pid(Component, Module, CompSpec) ->
start_json_rpc_server(Component, Module, Supervisor) ->
+ start_json_rpc_server(Component, Module, Supervisor, []).
+
+start_json_rpc_server(Component, Module, Supervisor, XOpts) ->
Addr = get_module_json_rpc_address(Component,
Module,
get_component_specification()),
case Addr of
{ ok, IP, Port } ->
- ExoHttpOpts = [ { ip, IP }, { port, Port } ],
+ ExoHttpOpts = [ { ip, IP }, { port, Port } | XOpts ],
exoport_exo_http:instance(Supervisor,
Module,
@@ -701,12 +770,14 @@ start_json_rpc_server(Component, Module, Supervisor) ->
Err
end.
-
-
utc_timestamp() ->
calendar:datetime_to_gregorian_seconds(
calendar:universal_time()) - seconds_jan_1970().
+utc_timestamp({_,_,_} = TS) ->
+ calendar:datetime_to_gregorian_seconds(
+ calendar:now_to_universal_time(TS)) - seconds_jan_1970().
+
seconds_jan_1970() ->
%% calendar:datetime_to_gregorian_seconds({{1970,1,1},{0,0,0}}).
62167219200.
diff --git a/components/rvi_common/src/rvi_common_app.erl b/components/rvi_common/src/rvi_common_app.erl
index 998b921..c2564d4 100644
--- a/components/rvi_common/src/rvi_common_app.erl
+++ b/components/rvi_common/src/rvi_common_app.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -25,6 +25,9 @@ start(_StartType, _StartArgs) ->
start_phase(setup_config, _, _) ->
rvi_config:setup_substitution(rvi_core),
+ ok;
+start_phase(start_json_servers, _, _) ->
+ rvi_log:start_json_server(),
ok.
stop(_State) ->
diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl
index 620672a..5d0fa8e 100644
--- a/components/rvi_common/src/rvi_log.erl
+++ b/components/rvi_common/src/rvi_log.erl
@@ -4,7 +4,16 @@
-export([start_link/0,
log/3,
- fetch/1]).
+ flog/4,
+ new_id/1,
+ fetch/1,
+ timestamp/0,
+ format/2
+ ]).
+
+-export([start_json_server/0,
+ handle_rpc/2,
+ handle_notification/2]).
-export([init/1,
handle_call/3,
@@ -13,35 +22,113 @@
terminate/2,
code_change/3]).
--record(st, {n = 100}).
+-include_lib("lager/include/log.hrl").
+
+-record(st, {n = 100,
+ seq = 1,
+ node_tag}).
-record(evt, {id,
component,
event}).
--define(TIDS, rvi_log_tids).
+-define(IDS, rvi_log_ids).
-define(EVENTS, rvi_log_events).
+-define(MAX_LENGTH, 60).
+
+
start_link() ->
create_tabs(),
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-log(TID, Component, Event) ->
- gen_server:cast(?MODULE, {log, TID, Component, Event}).
+log(TID, Component, Event) when is_binary(TID), is_binary(Component) ->
+ gen_server:cast(?MODULE, {log, TID, timestamp(), Component, bin(Event)}).
+
+new_id(Prefix) ->
+ gen_server:call(?MODULE, {new_id, bin(Prefix)}).
+
+format(Fmt, Args) ->
+ try format_(Fmt, Args)
+ catch
+ error:_ ->
+ format_("FORMAT ERROR ~p, ~p", [Fmt, Args])
+ end.
+
+format_(Fmt, Args) ->
+ trunc_msg(iolist_to_binary(io_lib:fwrite(Fmt, Args))).
+
+trunc_msg(Bin) when byte_size(Bin) =< ?MAX_LENGTH ->
+ Bin;
+trunc_msg(Bin) ->
+ binary:part(Bin, 0, ?MAX_LENGTH).
-fetch(Tid) ->
- ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
- component = '$2',
- event = '$3'}, [], [{{Tid, '$1', '$2', '$3'}}] }]).
+
+flog(Fmt, Args, Component, CS) ->
+ LogTID = rvi_common:get_log_id(CS),
+ log(LogTID, Component, format(Fmt, Args)).
+
+timestamp() ->
+ os:timestamp().
+
+fetch(Tids) ->
+ TidSet = select_ids(Tids),
+ [{Tid, ets:select(?EVENTS, [{#evt{id = {Tid,'$1'},
+ component = '$2',
+ event = '$3'}, [], [{{'$1', '$2', '$3'}}] }])}
+ || Tid <- TidSet].
init(_) ->
- {ok, #st{}}.
+ {ok, Tag} = rvi_common:get_module_config(
+ rvi_common, rvi_log, node_tag, base64url:encode(crypto:rand_bytes(3)),
+ rvi_common:get_component_specification()),
+ gen_server:cast(self(), log_start),
+ {ok, #st{node_tag = bin(Tag)}}.
+
+start_json_server() ->
+ ?debug("rvi_log:start_json_server()", []),
+ case rvi_common:start_json_rpc_server(rvi_common,
+ ?MODULE,
+ rvi_common_sup) of
+ ok -> ok;
+ Err ->
+ ?warning("rvi_log:start_json_server(): Failed to start: ~p", [Err]),
+ Err
+ end.
+handle_rpc(<<"log">>, Args) ->
+ handle_notification(<<"log">>, Args),
+ {ok, [{status, rvi_common:json_rpc_status(ok)}]};
+handle_rpc(<<"fetch">>, Args) ->
+ TIDs = get_json_ids(Args),
+ Res = [{TID, fetch(TID)} || TID <- TIDs],
+ {ok, [{status, rvi_common:json_rpc_status(ok)},
+ {<<"log">>, format_result(Res)}]};
+handle_rpc(Other, _Args) ->
+ ?warning("rvi_log:handle_rpc(~p): unknown command", [ Other ]),
+ {ok, [{status, rvi_common:json_rpc_status(invalid_command)}]}.
+
+handle_notification(<<"log">>, Args) ->
+ {ok, TID} = rvi_common:get_json_element([<<"tid">>], Args),
+ {ok, Component} = rvi_common:get_json_element([<<"cmp">>], Args),
+ {ok, Event} = rvi_common:get_json_element([<<"evt">>], Args),
+ log(TID, Component, Event),
+ ok;
+handle_notification(Other, _Args) ->
+ ?warning("rvi_log:handle_notification(~p): unknown command", [Other]),
+ ok.
-handle_cast({log, TID, Component, Event}, #st{n = N} = St) ->
- do_log(TID, Component, Event, N),
+handle_cast(log_start, #st{n = N, node_tag = Tag} = St) ->
+ do_log(tid_(<<"rvi_log">>, 0, Tag), timestamp(), <<"rvi_common">>,
+ format("Started - Tag = ~s", [Tag]), N),
+ {noreply, St};
+handle_cast({log, TID, TS, Component, Event}, #st{n = N} = St) ->
+ do_log(TID, TS, Component, Event, N),
{noreply, St};
handle_cast(_, St) ->
{noreply, St}.
+handle_call({new_id, Prefix}, _From, #st{seq = Seq, node_tag = Tag} = S) ->
+ {reply, <<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>,
+ S#st{seq = Seq + 1}};
handle_call(_, _From, St) ->
{reply, {error, unknown_call}, St}.
@@ -60,8 +147,8 @@ code_change(_, St, _) ->
%% ======================================================================
create_tabs() ->
- maybe_new(?TIDS, [ordered_set, named_table]),
- maybe_new(?EVENTS, [ordered_set, named_table]).
+ maybe_new(?IDS, [ordered_set, public, named_table]),
+ maybe_new(?EVENTS, [ordered_set, public, named_table, {keypos, #evt.id}]).
maybe_new(T, Opts) ->
case ets:info(T, name) of
@@ -71,34 +158,91 @@ maybe_new(T, Opts) ->
true
end.
-do_log(Tid, Component, Event, N) ->
- case ets:member(?TIDS, Tid) of
+tid_(Prefix, Seq, Tag) ->
+ <<Prefix/binary, ":", (integer_to_binary(Seq))/binary, "-", Tag/binary>>.
+
+do_log(Tid, TS, Component, Event, N) ->
+ case ets:member(?IDS, Tid) of
true ->
- store_event(Tid, Component, Event);
+ store_event(Tid, TS, Component, Event);
false ->
- case ets:info(?TIDS, size) of
+ case ets:info(?IDS, size) of
Sz when Sz >= N ->
- purge_tid();
+ purge_id();
_ ->
ok
end,
- store_event(Tid, Component, Event)
+ ets:insert(?IDS, {Tid}),
+ store_event(Tid, TS, Component, Event)
end.
-store_event(Tid, Component, Event) ->
- ets:insert(?EVENTS, #evt{id = {Tid, seq()},
+store_event(Tid, TS, Component, Event) ->
+ rvi_log_log:info("~-20s ~-12s ~s", [Tid, Component, Event]),
+ ?info("RVI_LOG: ~p/~p/~p", [Tid, Component, Event]),
+ ets:insert(?EVENTS, #evt{id = {Tid, TS},
component = Component,
event = Event}).
-purge_tid() ->
- case ets:first(?TIDS) of
+purge_id() ->
+ case ets:first(?IDS) of
'$end_of_table' ->
%% Should not be possible ...
ok;
- Tid ->
- ets:delete(?TIDS, Tid),
+ {Tid} ->
+ ets:delete(?IDS, Tid),
ets:match_delete(?EVENTS, #evt{id = {Tid,'_'}, _ = '_'})
end.
-seq() ->
- erlang:now().
+get_json_ids(Args) ->
+ Res = case rvi_common:get_json_element([<<"tid">>], Args) of
+ {ok, TID} when is_binary(TID) ->
+ [TID];
+ {ok, TIDs} when is_list(TIDs) ->
+ TIDs;
+ {error, _} ->
+ []
+ end,
+ lists:filter(fun valid_id_pat/1, Res).
+
+valid_id_pat(TP) ->
+ %% We'll accept anything that isn't blatantly *invalid*
+ try begin _ = re:run([], TP, []),
+ true
+ end
+ catch
+ _:_ ->
+ false
+ end.
+
+select_ids(TIDs) ->
+ ets:foldr(
+ fun({Tid}, Acc) ->
+ case match_id(Tid, TIDs) of
+ true -> [Tid|Acc];
+ false -> Acc
+ end
+ end, [], ?IDS).
+
+match_id(Tid, [Pat|Pats]) ->
+ case re:run(Tid, Pat, []) of
+ {match, _} -> true;
+ nomatch -> match_id(Tid, Pats)
+ end;
+match_id(_, []) ->
+ false.
+
+format_result(Log) ->
+ [{TID, format_events(Es)} || {TID, Es} <- Log].
+
+format_events([{TS, Comp, Evt}|Es]) ->
+ [[{<<"ts">>, rvi_common:utc_timestamp(TS)},
+ {<<"cmp">>, bin(Comp)},
+ {<<"evt">>, bin(Evt)}] || format_events(Es)];
+format_events([]) ->
+ [].
+
+bin(A) when is_atom(A) -> atom_to_binary(A, latin1);
+bin(B) when is_binary(B) -> B;
+bin(L) when is_list(L) -> iolist_to_binary(L);
+bin(Other) ->
+ iolist_to_binary(io_lib:fwrite("~w", [Other])).
diff --git a/components/schedule/src/rvi_routing.erl b/components/schedule/src/rvi_routing.erl
index a27fb49..67b68eb 100644
--- a/components/schedule/src/rvi_routing.erl
+++ b/components/schedule/src/rvi_routing.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2015, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
-module(rvi_routing).
@@ -28,7 +28,7 @@
service_prefix = ""::string(),
proto_link_pairs = []:: list(tuple())
}).
-
+
-record(st, {
routes= []:: list(#route{})
}).
@@ -173,7 +173,7 @@ prefix_match_(_Service, [], Len) ->
prefix_match_([], _Prefix, _Len ) ->
-1;
-%%
+%%
prefix_match_([ ServiceH | ServiceT], [ PrefixH | PrefixT ], Len)
when ServiceH =:= PrefixH ->
@@ -181,7 +181,7 @@ prefix_match_([ ServiceH | ServiceT], [ PrefixH | PrefixT ], Len)
%% Mismatch between the the service and candidate. No match
prefix_match_(_Service, _Prefix, _Len) ->
- -1.
+ -1.
find_routes_([], _Service, CurRoutes, CurMatchLen ) ->
{ CurRoutes, CurMatchLen };
@@ -191,7 +191,7 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen )
%% Do we have a better match than previosly recorded?
case MatchLen >= CurMatchLen of
- true ->
+ true ->
%% Continue with the new routes and matching len installed
find_routes_(T, Service, Routes, MatchLen);
@@ -203,9 +203,10 @@ find_routes_([ { ServicePrefix, Routes } | T], Service, CurRoutes, CurMatchLen )
find_routes_(Rt, _Svc, CurRoutes, CurMatchLen) ->
?warning("rvi_routing(): Incorrect route entry: ~p", [Rt]),
{ CurRoutes, CurMatchLen }.
-
+
find_routes(Routes, Service) ->
+ ?debug("find_routes(~p, ~p)", [Routes, Service]),
case find_routes_(Routes, Service, undefined, 0) of
{ undefined, 0 } ->
?debug("rvi_routing(): ~p -> unknown", [ Service]),
@@ -213,7 +214,7 @@ find_routes(Routes, Service) ->
{ MatchRoutes, _MatchLen } ->
?debug("rvi_routing(): ~p -> ~p", [ Service, MatchRoutes ]),
- normalize_routes_(MatchRoutes, [])
+ normalize_routes_(MatchRoutes, [])
end.
@@ -225,13 +226,13 @@ normalize_routes_([], Acc) ->
lists:reverse(Acc);
normalize_routes_([ {{ Pr, PrOp }, { DL, DLOp }} | Rem ], Acc) ->
- normalize_routes_( Rem, [ {{Pr, PrOp}, { DL, DLOp } } | Acc]);
+ normalize_routes_( Rem, [ {{Pr, PrOp}, { DL, DLOp } } | Acc]);
normalize_routes_([ { Pr, { DL, DLOp }} | Rem ], Acc) ->
- normalize_routes_(Rem, [ { {Pr, []}, { DL, DLOp } } | Acc]);
+ normalize_routes_(Rem, [ { {Pr, []}, { DL, DLOp } } | Acc]);
normalize_routes_([ {{ Pr, PrOp}, DL } | Rem ], Acc) ->
- normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
+ normalize_routes_(Rem, [ { {Pr, PrOp}, { DL, [] } } | Acc]);
normalize_routes_([ {Pr, DL} | Rem ], Acc) ->
normalize_routes_(Rem, [ { {Pr, []}, { DL, [] } } | Acc]).
@@ -242,11 +243,11 @@ find_protocols_(_DataLink, [], Acc ) ->
%% Matching data link. This is an allowed protocol
-find_protocols_(DataLink, [ {{ Pr, PrOp }, { DL, DLOp }} | T],
+find_protocols_(DataLink, [ {{ Pr, PrOp }, { DL, DLOp }} | T],
Acc) when DataLink =:= DL ->
find_protocols_(DataLink, T, [ { Pr, PrOp, DLOp } | Acc ]);
-
+
%% No match
find_protocols_(DataLink, [ {{ _Pr, _PrOp }, { _DL, _DLOp }} | T], Acc) ->
@@ -258,4 +259,3 @@ find_protocols(AllRoutes, Service, DataLink) ->
Res = find_protocols_(DataLink, SvcRoutes, []),
?debug("find_protocols(~p:~p): -> ~p", [ DataLink, Service, Res]),
Res.
-
diff --git a/components/schedule/src/schedule_rpc.erl b/components/schedule/src/schedule_rpc.erl
index bf19dda..b66bdd1 100644
--- a/components/schedule/src/schedule_rpc.erl
+++ b/components/schedule/src/schedule_rpc.erl
@@ -153,7 +153,7 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) ->
%% JSON-RPC entry point
%% CAlled by local exo http server
-handle_rpc("schedule_message", Args) ->
+handle_rpc(<<"schedule_message">>, Args) ->
{ok, SvcName} = rvi_common:get_json_element(["service"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index 04c897d..b4f14ab 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -46,7 +46,7 @@
}).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(st, {
%% Component specification
@@ -58,13 +58,13 @@ start_link() ->
init([]) ->
?info("svc_disc:init(): called."),
- ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table,
+ ets:new(?SERVICE_TABLE, [ duplicate_bag, public, named_table,
{ keypos, #service_entry.service }]),
- ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table,
+ ets:new(?MODULE_TABLE, [ duplicate_bag, public, named_table,
{ keypos, #service_entry.data_link_mod }]),
- ets:new(?SUBSCRIBER_TABLE, [set, public, named_table,
+ ets:new(?SUBSCRIBER_TABLE, [set, public, named_table,
{ keypos, #subscriber_entry.module }]),
{ok, #st { cs = rvi_common:get_component_specification() } }.
@@ -74,19 +74,19 @@ start_json_server() ->
rvi_common:start_json_rpc_server(service_discovery, ?MODULE, service_discovery_sup).
get_all_services(CompSpec) ->
- rvi_common:request(service_discovery, ?MODULE,
+ rvi_common:request(service_discovery, ?MODULE,
get_all_services, [], [status, services], CompSpec).
get_services_by_module(CompSpec, DataLinkMod) ->
- rvi_common:request(service_discovery, ?MODULE,
- get_services_by_module,
- [ { data_link_module, DataLinkMod }],
+ rvi_common:request(service_discovery, ?MODULE,
+ get_services_by_module,
+ [ { data_link_module, DataLinkMod }],
[status, services], CompSpec).
get_modules_by_service(CompSpec, Service) ->
- rvi_common:request(service_discovery, ?MODULE,
+ rvi_common:request(service_discovery, ?MODULE,
get_modules_by_service,
- [ { service, Service }],
+ [ { service, Service }],
[status, modules], CompSpec).
@@ -95,7 +95,7 @@ register_services(CompSpec, Services, DataLinkModule) ->
?debug(" CompSpec : ~p", [CompSpec]),
?debug(" Services : ~p", [Services]),
?debug(" DataLinkMod : ~p", [DataLinkModule]),
- rvi_common:notification(service_discovery, ?MODULE, register_services,
+ rvi_common:notification(service_discovery, ?MODULE, register_services,
[{ services, Services },
{ data_link_module, DataLinkModule }],
CompSpec).
@@ -105,55 +105,59 @@ unregister_services(CompSpec, Services, DataLinkModule) ->
?debug(" CompSpec : ", [CompSpec]),
?debug(" Services : ", [Services]),
?debug(" DataLinkMod : ", [DataLinkModule]),
- rvi_common:notification(service_discovery, ?MODULE, unregister_services,
+ rvi_common:notification(service_discovery, ?MODULE, unregister_services,
[{ services, Services },
{ data_link_module, DataLinkModule}],
CompSpec).
subscribe(CompSpec, SubscribingMod) ->
- rvi_common:notification(service_discovery, ?MODULE, subscribe,
- [ { subscribing_module, SubscribingMod }],
+ rvi_common:notification(service_discovery, ?MODULE, subscribe,
+ [ { subscribing_module, SubscribingMod }],
CompSpec).
unsubscribe(CompSpec, SubscribingMod) ->
- rvi_common:notification(service_discovery, ?MODULE, unsubscribe,
- [ { subscribing_module, SubscribingMod }],
+ rvi_common:notification(service_discovery, ?MODULE, unsubscribe,
+ [ { subscribing_module, SubscribingMod }],
CompSpec).
-
+
%% JSON-RPC entry point
%% Called by local exo http server
%% Register remote services
handle_notification("register_services", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args),
- gen_server:cast(?SERVER, { rvi, register_services,
- [ Services, list_to_atom(DataLinkModule) ]}),
+ gen_server:cast(?SERVER, { rvi, register_services,
+ [ Services, list_to_atom(DataLinkModule), LogId ]}),
ok;
handle_notification("unregister_services", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, DataLinkModule } = rvi_common:get_json_element(["data_link_module"], Args),
- gen_server:cast(?SERVER, { rvi, unregister_services,
- [ Services, list_to_atom(DataLinkModule) ]}),
+ gen_server:cast(?SERVER, { rvi, unregister_services,
+ [ Services, list_to_atom(DataLinkModule), LogId ]}),
ok;
handle_notification("subscribe", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args),
%% De-register service
- gen_server:cast(?SERVER, { rvi, subscribe, [ list_to_atom(Module) ]}),
+ gen_server:cast(?SERVER, { rvi, subscribe, [ list_to_atom(Module), LogId ]}),
ok;
handle_notification("unsubscribe_from_service", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Module } = rvi_common:get_json_element(["subscribing_module"], Args),
%% De-register service
- gen_server:cast(?SERVER, { rvi, unsubscribe, [ list_to_atom(Module) ]}),
+ gen_server:cast(?SERVER, { rvi, unsubscribe, [ list_to_atom(Module), LogId ]}),
ok;
handle_notification( Other, _Args) ->
@@ -164,36 +168,39 @@ handle_notification( Other, _Args) ->
%%
%% Get all services
%%
-handle_rpc("get_all_services", _Args) ->
+handle_rpc("get_all_services", Args) ->
?debug("svc_disc:get_all_services(json-rpc)"),
- [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services, []}),
- {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]};
+ LogId = rvi_common:get_json_log_id(Args),
+ [ok, Services ] = gen_server:call(?SERVER, { rvi, get_all_services, [LogId]}),
+ {ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, Services }]};
handle_rpc("get_services_by_module", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, DataLinkMod } = rvi_common:get_json_element(["data_link_module"], Args),
?debug("svc_disc:get_services_by_module(json-rpc): ~p ", [DataLinkMod]),
- [ok, Services ] = gen_server:call(?SERVER,
- { rvi,
- get_services_by_module,
- [DataLinkMod]}),
+ [ok, Services ] = gen_server:call(?SERVER,
+ { rvi,
+ get_services_by_module,
+ [DataLinkMod, LogId]}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} , { services, { array, Services } }]};
handle_rpc("get_modules_by_service", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Service } = rvi_common:get_json_element(["service"], Args),
?debug("svc_disc:get_modules_by_service(json-rpc): ~p ", [Service]),
- [ok, Modules ] = gen_server:call(?SERVER,
+ [ok, Modules ] = gen_server:call(?SERVER,
{ rvi,
get_modules_by_service,
- [Service]}),
+ [Service, LogId]}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} , { modules, { array, Modules } }]};
-%%
+%%
%% Handle the rest.
%%
handle_rpc( Other, _Args) ->
@@ -211,17 +218,17 @@ handle_call(Req, From, St) ->
end.
handle_call_({rvi, get_all_services, _Args}, _From, St) ->
- Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
- [ ServiceName | Acc ] end,
+ Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
+ [ ServiceName | Acc ] end,
[], ?SERVICE_TABLE),
{reply, [ok, Svcs], St };
-handle_call_({rvi, get_services_by_module, [Module]}, _From, St) ->
+handle_call_({rvi, get_services_by_module, [Module | _LogId]}, _From, St) ->
{reply, [ok, get_services_by_module_(Module)], St };
-handle_call_({rvi, get_modules_by_service, [Service]}, _From, St) ->
+handle_call_({rvi, get_modules_by_service, [Service | _LogId]}, _From, St) ->
{reply, [ok, get_modules_by_service_(Service)], St };
@@ -231,7 +238,7 @@ handle_call_(Other, _From, St) ->
-handle_cast({rvi, subscribe, [ SubsMod] }, St) ->
+handle_cast({rvi, subscribe, [ SubsMod | _LogId ] }, St) ->
%% Insert new entry, or replace existing one
ets:insert(?SUBSCRIBER_TABLE, #subscriber_entry { module = SubsMod}),
@@ -240,14 +247,14 @@ handle_cast({rvi, subscribe, [ SubsMod] }, St) ->
{ noreply, St};
-handle_cast({rvi, unsubscribe, [ SubsMod] }, St) ->
+handle_cast({rvi, unsubscribe, [ SubsMod | _LogId ] }, St) ->
ets:delete(?SUBSCRIBER_TABLE, SubsMod),
{ noreply, St};
%% Handle calls received through regular gen_server calls, routed by
%% rvi_common:request()
-handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) ->
+handle_cast({rvi, register_services, [Services, DataLinkModule | _LogId ] }, St) ->
?info("svc_disc:register_services(): ~p:~p",
[DataLinkModule, Services]),
@@ -255,15 +262,15 @@ handle_cast({rvi, register_services, [Services, DataLinkModule] }, St) ->
%% Notify all subscribers
notify_subscribers(St#st.cs,
- available,
- Services,
+ available,
+ Services,
DataLinkModule),
{noreply, St };
%% Handle calls received through regular gen_server calls, routed by
%% rvi_common:request()
-handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) ->
+handle_cast({rvi, unregister_services, [Services, DataLinkModule | _LogId ] }, St) ->
?info("svc_disc:unregister_services(): ~p:~p",
[DataLinkModule, Services]),
@@ -272,8 +279,8 @@ handle_cast({rvi, unregister_services, [Services, DataLinkModule] }, St) ->
%% Notify all subscribers
notify_subscribers(St#st.cs,
- unavailable,
- Services,
+ unavailable,
+ Services,
DataLinkModule),
@@ -295,30 +302,30 @@ code_change(_OldVsn, St, _Extra) ->
register_single_service_(Service, DataLinkModule) ->
- ?info("svc_disc:register_single_service_(~p:~p)",
+ ?info("svc_disc:register_single_service_(~p:~p)",
[DataLinkModule,Service]),
%% Delete any previous instances of the given entry, in case
%% the service registers multiple times
- ets:match_delete(?MODULE_TABLE,
- #service_entry {
- service = Service,
- data_link_mod = DataLinkModule
+ ets:match_delete(?MODULE_TABLE,
+ #service_entry {
+ service = Service,
+ data_link_mod = DataLinkModule
}),
- ets:match_delete(?SERVICE_TABLE,
- #service_entry {
- service = Service,
- data_link_mod = DataLinkModule
+ ets:match_delete(?SERVICE_TABLE,
+ #service_entry {
+ service = Service,
+ data_link_mod = DataLinkModule
}),
- ets:insert(?SERVICE_TABLE,
+ ets:insert(?SERVICE_TABLE,
#service_entry {
service = Service,
data_link_mod = DataLinkModule
}),
- ets:insert(?MODULE_TABLE,
+ ets:insert(?MODULE_TABLE,
#service_entry {
service = Service,
data_link_mod = DataLinkModule
@@ -330,21 +337,21 @@ register_single_service_(Service, DataLinkModule) ->
unregister_single_service_(Service, DataLinkModule) ->
- ?info("svc_disc:unregister_single_service_(): ~p:~p",
+ ?info("svc_disc:unregister_single_service_(): ~p:~p",
[DataLinkModule, Service]),
%% Delete any service table entries with a matching Service.
- ets:match_delete(?SERVICE_TABLE,
- #service_entry {
+ ets:match_delete(?SERVICE_TABLE,
+ #service_entry {
service = Service,
- data_link_mod = DataLinkModule
+ data_link_mod = DataLinkModule
}),
%% Delete any remote address table entries with a matching Service.
- ets:match_delete(?MODULE_TABLE,
- #service_entry {
+ ets:match_delete(?MODULE_TABLE,
+ #service_entry {
service = Service,
- data_link_mod = DataLinkModule
+ data_link_mod = DataLinkModule
}),
ok.
@@ -355,14 +362,14 @@ unregister_single_service_(Service, DataLinkModule) ->
get_modules_by_service_(Service) ->
ModMatch = ets:lookup(?SERVICE_TABLE, Service),
-
- ModNames = lists:foldl(fun(#service_entry {
- data_link_mod = Mod
- }, Acc) ->
+
+ ModNames = lists:foldl(fun(#service_entry {
+ data_link_mod = Mod
+ }, Acc) ->
[ Mod | Acc ]
end, [], ModMatch),
-
-
+
+
?debug("svc_disc:get_modules_by_service_(): ~p -> ~p", [ Service, ModNames ]),
@@ -373,10 +380,10 @@ get_modules_by_service_(Service) ->
get_services_by_module_(Module) ->
SvcMatch = ets:lookup(?MODULE_TABLE, Module),
-
- SvcNames = lists:foldl(fun(#service_entry {
- service = Svc
- }, Acc) ->
+
+ SvcNames = lists:foldl(fun(#service_entry {
+ service = Svc
+ }, Acc) ->
[ Svc | Acc ]
end, [], SvcMatch),
@@ -389,20 +396,20 @@ notify_single_subscriber(_CompSpec, '$end_of_table', _SubsFun,
_DataLinkModule, _Services) ->
ok;
-notify_single_subscriber(CompSpec, SubsModule, SubsFun,
+notify_single_subscriber(CompSpec, SubsModule, SubsFun,
DataLinkModule, Services) ->
%% Invoke subscriber for each service that has been updated.
- ?debug("notify_single_subscriber(~p:~p) ~p:~p()",
+ ?debug("notify_single_subscriber(~p:~p) ~p:~p()",
[SubsModule, SubsFun, DataLinkModule, Services ]),
[ SubsModule:SubsFun(CompSpec, SvcName, DataLinkModule) || SvcName <- Services],
%% Move on to the next subscribing module
- notify_single_subscriber(CompSpec,
+ notify_single_subscriber(CompSpec,
ets:next(?SUBSCRIBER_TABLE, SubsModule), SubsFun,
DataLinkModule, Services).
-notify_subscribers(CompSpec, Available, Services, DataLinkModule) ->
+notify_subscribers(CompSpec, Available, Services, DataLinkModule) ->
?debug("notify_subscribers(~p:~p) ~p", [ DataLinkModule, Services, Available]),
@@ -415,16 +422,16 @@ notify_subscribers(CompSpec, Available, Services, DataLinkModule) ->
ets:foldl(
%% Notify if this is not the originating service.
- fun(#subscriber_entry { module = Module }, Acc) ->
+ fun(#subscriber_entry { module = Module }, _Acc) ->
?debug(" notify_subscribers module: ~p ", [ Module]),
ok
end, ok, ?SUBSCRIBER_TABLE),
%% Initiate with the first module
- notify_single_subscriber(CompSpec,
- ets:first(?SUBSCRIBER_TABLE),
- Fun,
- DataLinkModule,
+ notify_single_subscriber(CompSpec,
+ ets:first(?SUBSCRIBER_TABLE),
+ Fun,
+ DataLinkModule,
Services).
@@ -438,15 +445,20 @@ initial_notification(CompSpec, SubsMod, Service) ->
[] -> %% Yanked
ok;
- [#service_entry { data_link_mod = DataLinkMod }] ->
+ [#service_entry { data_link_mod = DataLinkMod }] ->
SubsMod:service_available(CompSpec, Service, DataLinkMod)
end,
%% Move on to the next service
initial_notification(CompSpec, SubsMod,
ets:next(?SERVICE_TABLE, Service)).
-
-
+
+
initial_notification(CompSpec, SubsMod) ->
initial_notification(CompSpec, SubsMod, ets:first(?SERVICE_TABLE)),
ok.
+
+log([ID], Fmt, Args) ->
+ rvi_log:log(ID, <<"authorize">>, rvi_log:format(Fmt, Args));
+log(_, _, _) ->
+ ok.
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 2cf9c3a..b3ed684 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -30,6 +30,10 @@
-export([start_json_server/0,
start_websocket/0]).
+%% exo_socket authentication callbacks
+-export([authenticate/3,
+ incoming/2]).
+
%% Invoked by service discovery
%% FIXME: Should be rvi_service_discovery behavior
-export([service_available/3,
@@ -84,10 +88,22 @@ init([]) ->
start_json_server() ->
+ Allowed = get_allowed(),
?debug("service_edge_rpc:start_json_server()"),
+ Opts = [{exo_socket,
+ [{auth,
+ [{role, server},
+ {server,
+ [{mod, ?MODULE},
+ {allowed, Allowed}
+ ]}
+ ]}
+ ]}
+ ],
case rvi_common:start_json_rpc_server(service_edge,
?MODULE,
- service_edge_sup) of
+ service_edge_sup,
+ Opts) of
ok ->
ok;
@@ -97,6 +113,38 @@ start_json_server() ->
Err
end.
+get_allowed() ->
+ Allowed = case rvi_common:get_module_config(
+ service_edge, service_edge_rpc, allowed) of
+ {ok, L} -> L;
+ {error, _} -> [{127,0,0,1}]
+ end,
+ ?debug("get_allowed(); Allowed = ~p", [Allowed]),
+ lists:flatmap(
+ fun(Addr) ->
+ case inet:ip(Addr) of
+ {ok, IP} -> [IP];
+ {error, _} -> []
+ end
+ end, Allowed).
+
+authenticate(X, Role, Opts) ->
+ {ok, {PeerIP,_}} = exo_socket:peername(X),
+ ?debug("authenticate(~p, ~p, ~p)~nPeer = ~p~n", [X, Role, Opts, PeerIP]),
+ case lists:keyfind(allowed, 1, Opts) of
+ {_, Allowed} ->
+ case lists:member(PeerIP, Allowed) of
+ true ->
+ {ok, Opts};
+ false ->
+ error
+ end;
+ false ->
+ {ok, Opts}
+ end.
+
+incoming(Data, _) ->
+ Data.
start_websocket() ->
%%
@@ -356,6 +404,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
?debug("service_edge_rpc:register_local_service(): address: ~p ", [URL]),
FullSvcName = rvi_common:local_service_to_string(SvcName),
+ CS = start_log(<<"svc">>, "reg local service: ~s", [FullSvcName], St#st.cs),
?debug("service_edge_rpc:register_local_service(): full name: ~p ", [FullSvcName]),
ets:insert(?SERVICE_TABLE, #service_entry {
@@ -364,7 +413,7 @@ handle_call({ rvi, register_local_service, [SvcName, URL] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
- service_discovery_rpc:register_services(St#st.cs, [FullSvcName], local),
+ service_discovery_rpc:register_services(CS, [FullSvcName], local),
%% Return ok.
@@ -378,7 +427,8 @@ handle_call({ rvi, unregister_local_service, [SvcName] }, _From, St) ->
%% Register with service discovery, will trigger callback to service_available()
%% that forwards the registration to other connected services.
- service_discovery_rpc:unregister_services(St#st.cs, [SvcName], local),
+ CS = start_log(<<"svc">>, "unreg local service: ~s", [SvcName], St#st.cs),
+ service_discovery_rpc:unregister_services(CS, [SvcName], local),
%% Return ok.
{ reply, [ ok ], St };
@@ -401,18 +451,20 @@ handle_call({ rvi, handle_local_message,
?debug("service_edge_rpc:local_msg: service_name: ~p", [SvcName]),
?debug("service_edge_rpc:local_msg: timeout: ~p", [TimeoutArg]),
?debug("service_edge_rpc:local_msg: parameters: ~p", [Parameters]),
+ CS = start_log(<<"recv">>, "local_message: ~s", [SvcName], St#st.cs),
%%
%% Authorize local message and retrieve a certificate / signature
%% that will be accepted by the receiving node that will deliver
%% the messaage to its locally connected service_name service.
%%
+ ?debug("CS = ~p", [lager:pr(CS, rvi_common)]),
[ok, Signature ] =
authorize_rpc:authorize_local_message(
- St#st.cs, SvcName, [{service_name, SvcName},
- {timeout, TimeoutArg},
- %% {parameters, Parameters},
- {parameters, Parameters}
- ]),
+ CS, SvcName, [{service_name, SvcName},
+ {timeout, TimeoutArg},
+ %% {parameters, Parameters},
+ {parameters, Parameters}
+ ]),
%%
%% Slick but ugly.
@@ -456,16 +508,18 @@ handle_call({ rvi, handle_local_message,
case LookupRes of
[ #service_entry { url = URL } ] -> %% SvcName is local. Forward message
?debug("service_edge_rpc:local_msg(): Service is local. Forwarding."),
+ log("dispatch to ~s", [URL], CS),
Res = forward_message_to_local_service(URL,
SvcName,
Parameters,
- St#st.cs),
+ CS),
{ reply, Res , St};
_ -> %% SvcName is remote
%% Ask Schedule the request to resolve the network address
?debug("service_edge_rpc:local_msg(): Service is remote. Scheduling."),
- [ _, TID ] = schedule_rpc:schedule_message(St#st.cs,
+ log("schedule message (~s)", [SvcName], CS),
+ [ _, TID ] = schedule_rpc:schedule_message(CS,
SvcName,
Timeout,
Parameters,
@@ -481,13 +535,15 @@ handle_call(Other, _From, St) ->
handle_cast({rvi, service_available, [SvcName, _DataLinkModule] }, St) ->
- ?debug("service_edge_rpc: Service available: ~p:", [ SvcName]),
+ ?debug("service_edge_rpc: Service available: ~p", [ SvcName]),
+ start_log(<<"svc">>, "service_available: ~s", [SvcName], St#st.cs),
announce_service_availability(available, SvcName),
{ noreply, St };
handle_cast({rvi, service_unavailable, [SvcName, _DataLinkModule] }, St) ->
?debug("service_edge_rpc: Service unavailable: ~p:", [ SvcName]),
+ start_log(<<"rsvc">>, "service_unavailable: ~s", [SvcName], St#st.cs),
announce_service_availability(unavailable, SvcName),
{ noreply, St };
@@ -627,7 +683,7 @@ dispatch_to_local_service(URL, Command, Args) ->
%% Forward a message to a specific locally connected service.
%% Called by forward_message_to_local_service/2.
%%
-forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) ->
+forward_message_to_local_service(URL,SvcName, Parameters, CompSpec) ->
?debug("service_edge:forward_to_local(): URL: ~p", [URL]),
?debug("service_edge:forward_to_local(): Parameters: ~p", [Parameters]),
@@ -644,14 +700,22 @@ forward_message_to_local_service(URL,SvcName, Parameters, _CompSpec) ->
%% Deliver the message to the local service, which can
%% be either a wse websocket, or a regular HTTP JSON-RPC call
spawn(fun() ->
- rvi_common:get_request_result(
- dispatch_to_local_service(URL,
- message,
- [{<<"service_name">>, LocalSvcName },
- {<<"parameters">>, Parameters }]))
+ log_outcome(
+ rvi_common:get_request_result(
+ dispatch_to_local_service(URL,
+ message,
+ [{<<"service_name">>, LocalSvcName },
+ {<<"parameters">>, Parameters }])),
+ SvcName, CompSpec)
end),
[ ok, -1 ].
+log_outcome({Status, _}, _SvcName, CS) ->
+ log("result: ~w", [Status], CS);
+log_outcome(Other, _SvcName, CS) ->
+ log("unexpected: ~w", [Other], CS).
+
+
announce_service_availability(Available, SvcName) ->
Cmd = case Available of
@@ -697,3 +761,12 @@ announce_service_availability(Available, SvcName) ->
Acc
end
end, BlockURLs, ?SERVICE_TABLE).
+
+
+start_log(Pfx, Fmt, Args, CS) ->
+ LogTID = rvi_log:new_id(Pfx),
+ rvi_log:log(LogTID, <<"svc_edge">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogTID, CS).
+
+log(Fmt, Args, CS) ->
+ rvi_log:flog(Fmt, Args, <<"svc_edge">>, CS).