summaryrefslogtreecommitdiff
path: root/components/dlink_tls/src/dlink_tls_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/dlink_tls/src/dlink_tls_rpc.erl')
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl254
1 files changed, 137 insertions, 117 deletions
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index 21df19f..964cc0c 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -35,11 +35,11 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
+-include_lib("rvi_common/include/rvi_dlink_bin.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(SERVER_OPTS, server_opts).
--define(DEFAULT_TCP_PORT, 9999).
+-define(DEFAULT_TCP_PORT, 9998).
-define(DEFAULT_RECONNECT_INTERVAL, 5000).
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
@@ -94,13 +94,14 @@ start_json_server() ->
start_connection_manager() ->
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts} = rvi_common:get_module_config(data_link,
- ?MODULE,
- ?SERVER_OPTS,
- [],
- CompSpec),
- IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS),
- Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT),
+ {ok, TlsOpts} = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ ?SERVER_OPTS,
+ [],
+ CompSpec),
+ ?debug("TlsOpts = ~p", [TlsOpts]),
+ IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
+ Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]),
@@ -136,7 +137,8 @@ setup_persistent_connections_([ ], _CompSpec) ->
setup_persistent_connections_([ NetworkAddress | T], CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]),
[ IP, Port] = string:tokens(NetworkAddress, ":"),
- connect_and_retry_remote(IP, Port, CompSpec),
+ %% cast an immediate (re-)connect attempt to dlink_tls_rpc
+ setup_reconnect_timer(0, IP, Port, CompSpec),
setup_persistent_connections_(T, CompSpec),
ok.
@@ -187,41 +189,34 @@ connect_remote(IP, Port, CompSpec) ->
?info("connect_remote(~p, ~p)~n", [IP, Port]),
case dlink_tls_connmgr:find_connection_by_address(IP, Port) of
{ ok, _Pid } ->
+ log("already connected", [], CompSpec),
already_connected;
not_found ->
%% Setup a new outbound connection
- ?info("dlink_tls:connect_remote(): Connecting ~p:~p",
- [IP, Port]),
-
- case gen_tcp:connect(IP, Port, [list, {packet, 0}]) of
+ {ok, Timeout} = rvi_common:get_module_config(
+ dlink_tls, ?MODULE, connect_timeout, 10000, CompSpec),
+ ?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p",
+ [IP, Port, Timeout]),
+ log("new connection", [], CompSpec),
+ case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of
{ ok, Sock } ->
?info("dlink_tls:connect_remote(): Connected ~p:~p",
[IP, Port]),
%% Setup a genserver around the new connection.
{ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
-
+ ?MODULE, handle_socket, CompSpec),
+ UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec),
+ ?debug("Upgrade result = ~p", [UgRes]),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(
- Pid,
- term_to_json(
- {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- ]})),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p",
[IP, Port, Err]),
+ log("connect FAILED: ~w", [Err], CompSpec),
not_available
end
end.
@@ -229,9 +224,11 @@ connect_remote(IP, Port, CompSpec) ->
connect_and_retry_remote( IP, Port, CompSpec) ->
?info("dlink_tls:connect_and_retry_remote(): ~p:~p",
[ IP, Port]),
-
- case connect_remote(IP, list_to_integer(Port), CompSpec) of
- ok -> ok;
+ CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
+ case connect_remote(IP, list_to_integer(Port), CS) of
+ ok ->
+ log("connected", [], CS),
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_tls:connect_and_retry_remote(~p:~p): Failed: ~p",
@@ -239,8 +236,8 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
?notice("dlink_tls:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
[IP, Port, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec),
+ log("start reconnect timer", [], CS),
+ setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS),
not_available
end.
@@ -253,16 +250,12 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
+ AvailabilityMsg = availability_msg(Availability, [Service]),
Res = dlink_tls_conn:send(
ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- ]})),
+ rvi_common:pass_log_id(
+ [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }
+ | AvailabilityMsg ], CompSpec)),
?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -282,7 +275,7 @@ announce_local_service_(CompSpec, Service, Availability) ->
handle_socket(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tls:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -323,57 +316,57 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
end,
ok;
-handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
+handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) ->
?info("dlink_tls:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
+
+ ?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]),
+ ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]),
- ?debug("dlink_tls:data(): Elems ~p", [Elems]),
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
+ ?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
+ [ RemoteAddress,
RemotePort,
- ProtoVersion,
- CertificatesTmp,
Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
?DLINK_ARG_SIGNATURE],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- { array, C} -> C;
- undefined -> []
- end,
- process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CompSpec);
+ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
+ Signature, CS);
+
+ ?DLINK_CMD_CERT_EXCHANGE ->
+ ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]),
+ [ Certs ] =
+ opts([?DLINK_ARG_CERTIFICATES], Elems, undefined),
+ ?debug("Certs = ~p", [Certs]),
+ log("certs from ~s:~w", [PeerIP, PeerPort], CS),
+ authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}),
+ case rvi_common:get_value(dlink_tls_role, client, CS) of
+ client -> ok;
+ server ->
+ send_certs(FromPid, CompSpec)
+ end,
+ ok;
?DLINK_CMD_SERVICE_ANNOUNCE ->
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
Conn = {PeerIP, PeerPort},
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ log("sa from ~s:~w", [PeerIP, PeerPort], CS),
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -644,20 +637,17 @@ delete_services(ConnPid, SvcNameList) ->
ok.
availability_msg(Availability, Services) ->
- {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, {array, Services} }]}.
+ [{ ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
- ?info("dlink_tls:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
- ?info("dlink_tls:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
- ?info("dlink_tls:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tls:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tls:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_tls:authorize(): Signature: ~p", [ Signature ]),
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, Signature, CompSpec) ->
+ ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]),
+ ?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]),
+ ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
{ _NRemoteAddress, _NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
@@ -669,7 +659,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
+ case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of
true ->
connection_authorized(FromPid, Conn, CompSpec);
false ->
@@ -677,37 +667,59 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
false
end.
+send_certs(Pid, CompSpec) ->
+ ?debug("send_certs (Pid = ~p)", [Pid]),
+ {LocalIP, LocalPort} = rvi_common:node_address_tuple(),
+ dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE},
+ {?DLINK_ARG_ADDRESS, LocalIP},
+ {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+ {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)).
+
send_authorize(Pid, CompSpec) ->
+ ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]),
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- dlink_tls_conn:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
- { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+ JWT = get_authorize_jwt(CompSpec),
+ dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE},
+ {?DLINK_ARG_ADDRESS, LocalIP},
+ {?DLINK_ARG_PORT, integer_to_list(LocalPort)},
+ {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)).
+ %% dlink_tls_conn:send(Pid,
+ %% term_to_json(
+ %% {struct,
+ %% [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ %% { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ %% { ?DLINK_ARG_ADDRESS, LocalIP },
+ %% { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
+ %% { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION },
+ %% { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
+ %% { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the connection manager, this is an incoming connection
%% from the client. We should respond with our own authorize followed by
%% a service announce
+ log("authorized ~s:~w", [RemoteIP, RemotePort], CompSpec),
case dlink_tls_connmgr:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_tls:authorize(): New connection!"),
dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid),
?debug("dlink_tls:authorize(): Sending authorize."),
_Res = send_authorize(FromPid, CompSpec),
- ?debug("dlink_tls:upgrade connection", []),
- UgRes = dlink_tls_conn:upgrade(FromPid, server),
- ?debug("upgrade result: ~p", [UgRes]),
+ case rvi_common:get_value(dlink_tls_role, server, CompSpec) of
+ server -> ok;
+ client ->
+ send_certs(FromPid, CompSpec)
+ end,
+ %% ?debug("dlink_tls:upgrade connection", []),
+ %% UgRes = dlink_tls_conn:upgrade(FromPid, server),
+ %% ?debug("upgrade result: ~p", [UgRes]),
ok;
_ ->
- UgRes = dlink_tls_conn:upgrade(FromPid, client),
- ?debug("upgrade result: ~p", [UgRes]),
+ %% UgRes = dlink_tls_conn:upgrade(FromPid, client),
+ %% ?debug("upgrade result: ~p", [UgRes]),
ok
end,
@@ -722,14 +734,12 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tls:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices),
+ log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
dlink_tls_conn:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ]})),
+ rvi_common:pass_log_id(
+ [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }
+ | AvailabilityMsg ], CompSpec)),
%% Setup ping interval
gen_server:call(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
@@ -742,12 +752,8 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
base64:decode_to_string(Data)).
-process_announce({struct, Elems}, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- {array, Svcs} ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tls:service_announce(~p): TransactionID: ~p", [Avail,TID]),
?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]),
case Avail of
?DLINK_ARG_AVAILABLE ->
@@ -810,8 +816,8 @@ get_certificates(CompSpec) ->
error(no_certificate_found)
end.
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
+validate_auth_jwt(JWT, Conn, CompSpec) ->
+ case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
[ok] ->
true;
[not_found] ->
@@ -828,4 +834,18 @@ opt(K, L, Def) ->
end.
opts(Keys, Elems, Def) ->
- [ opt(K, Elems, Def) || K <- Keys].
+ Res = [ opt(K, Elems, Def) || K <- Keys],
+ ?debug("opts(~p) -> ~p", [Keys, Elems]),
+ Res.
+
+
+log_orphan(Pfx, Fmt, Args) ->
+ start_log(Pfx, Fmt, Args, #component_spec{}).
+
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
+
+log(Fmt, Args, CS) ->
+ rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).