diff options
Diffstat (limited to 'components/dlink_tls/src/dlink_tls_rpc.erl')
-rw-r--r-- | components/dlink_tls/src/dlink_tls_rpc.erl | 254 |
1 files changed, 137 insertions, 117 deletions
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 21df19f..964cc0c 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -35,11 +35,11 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --include_lib("rvi_common/include/rvi_dlink.hrl"). +-include_lib("rvi_common/include/rvi_dlink_bin.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(SERVER_OPTS, server_opts). --define(DEFAULT_TCP_PORT, 9999). +-define(DEFAULT_TCP_PORT, 9998). -define(DEFAULT_RECONNECT_INTERVAL, 5000). -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes @@ -94,13 +94,14 @@ start_json_server() -> start_connection_manager() -> CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts} = rvi_common:get_module_config(data_link, - ?MODULE, - ?SERVER_OPTS, - [], - CompSpec), - IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS), - Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT), + {ok, TlsOpts} = rvi_common:get_module_config(data_link, + ?MODULE, + ?SERVER_OPTS, + [], + CompSpec), + ?debug("TlsOpts = ~p", [TlsOpts]), + IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), + Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), @@ -136,7 +137,8 @@ setup_persistent_connections_([ ], _CompSpec) -> setup_persistent_connections_([ NetworkAddress | T], CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]), [ IP, Port] = string:tokens(NetworkAddress, ":"), - connect_and_retry_remote(IP, Port, CompSpec), + %% cast an immediate (re-)connect attempt to dlink_tls_rpc + setup_reconnect_timer(0, IP, Port, CompSpec), setup_persistent_connections_(T, CompSpec), ok. @@ -187,41 +189,34 @@ connect_remote(IP, Port, CompSpec) -> ?info("connect_remote(~p, ~p)~n", [IP, Port]), case dlink_tls_connmgr:find_connection_by_address(IP, Port) of { ok, _Pid } -> + log("already connected", [], CompSpec), already_connected; not_found -> %% Setup a new outbound connection - ?info("dlink_tls:connect_remote(): Connecting ~p:~p", - [IP, Port]), - - case gen_tcp:connect(IP, Port, [list, {packet, 0}]) of + {ok, Timeout} = rvi_common:get_module_config( + dlink_tls, ?MODULE, connect_timeout, 10000, CompSpec), + ?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p", + [IP, Port, Timeout]), + log("new connection", [], CompSpec), + case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of { ok, Sock } -> ?info("dlink_tls:connect_remote(): Connected ~p:~p", [IP, Port]), %% Setup a genserver around the new connection. {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, - ?MODULE, handle_socket, [CompSpec] ), - + ?MODULE, handle_socket, CompSpec), + UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec), + ?debug("Upgrade result = ~p", [UgRes]), %% Send authorize - { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - dlink_tls_conn:send( - Pid, - term_to_json( - {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, - { ?DLINK_ARG_CERTIFICATES, - {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - ]})), + send_authorize(Pid, CompSpec), ok; {error, Err } -> ?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), + log("connect FAILED: ~w", [Err], CompSpec), not_available end end. @@ -229,9 +224,11 @@ connect_remote(IP, Port, CompSpec) -> connect_and_retry_remote( IP, Port, CompSpec) -> ?info("dlink_tls:connect_and_retry_remote(): ~p:~p", [ IP, Port]), - - case connect_remote(IP, list_to_integer(Port), CompSpec) of - ok -> ok; + CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), + case connect_remote(IP, list_to_integer(Port), CS) of + ok -> + log("connected", [], CS), + ok; Err -> %% Failed to connect. Sleep and try again ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Failed: ~p", @@ -239,8 +236,8 @@ connect_and_retry_remote( IP, Port, CompSpec) -> ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Will try again in ~p sec", [IP, Port, ?DEFAULT_RECONNECT_INTERVAL]), - - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec), + log("start reconnect timer", [], CS), + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CS), not_available end. @@ -253,16 +250,12 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), + AvailabilityMsg = availability_msg(Availability, [Service]), Res = dlink_tls_conn:send( ConnPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - ]})), + rvi_common:pass_log_id( + [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE } + | AvailabilityMsg ], CompSpec)), ?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -282,7 +275,7 @@ announce_local_service_(CompSpec, Service, Availability) -> handle_socket(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); -handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> +handle_socket(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tls:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -323,57 +316,57 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> end, ok; -handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> +handle_socket(_FromPid, SetupIP, SetupPort, error, _CS) -> ?info("dlink_tls:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. -handle_socket(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> +handle_socket(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> + + ?debug("PeerIP = ~p, PeerPort = ~p", [PeerIP, PeerPort]), + ?debug("data(): Elems ~p~nCS = ~p", [Elems, CompSpec]), - ?debug("dlink_tls:data(): Elems ~p", [Elems]), + CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, + ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), + [ RemoteAddress, RemotePort, - ProtoVersion, - CertificatesTmp, Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, ?DLINK_ARG_SIGNATURE], Elems, undefined), - - Certificates = - case CertificatesTmp of - { array, C} -> C; - undefined -> [] - end, - process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CompSpec); + process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, + Signature, CS); + + ?DLINK_CMD_CERT_EXCHANGE -> + ?debug("got cert exch ~s:~w", [PeerIP, PeerPort]), + [ Certs ] = + opts([?DLINK_ARG_CERTIFICATES], Elems, undefined), + ?debug("Certs = ~p", [Certs]), + log("certs from ~s:~w", [PeerIP, PeerPort], CS), + authorize_rpc:store_certs(CS, Certs, {PeerIP, PeerPort}), + case rvi_common:get_value(dlink_tls_role, client, CS) of + client -> ok; + server -> + send_certs(FromPid, CompSpec) + end, + ok; ?DLINK_CMD_SERVICE_ANNOUNCE -> - [ TransactionID, - ProtoVersion, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_SIGNATURE], + ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), Conn = {PeerIP, PeerPort}, - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - process_announce(Msg, FromPid, PeerIP, PeerPort, - TransactionID, ProtoVersion, CompSpec); - _ -> - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + log("sa from ~s:~w", [PeerIP, PeerPort], CS), + process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -644,20 +637,17 @@ delete_services(ConnPid, SvcNameList) -> ok. availability_msg(Availability, Services) -> - {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) }, - { ?DLINK_ARG_SERVICES, {array, Services} }]}. + [{ ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, Services }]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, - RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> - ?info("dlink_tls:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), - ?info("dlink_tls:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), - ?info("dlink_tls:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), - ?debug("dlink_tls:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tls:authorize(): Certificates: ~p", [ Certificates ]), - ?debug("dlink_tls:authorize(): Signature: ~p", [ Signature ]), +process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, Signature, CompSpec) -> + ?info("dlink_tls:authorize(): Peer Address: ~s:~p", [PeerIP, PeerPort ]), + ?info("dlink_tls:authorize(): Remote Address: ~s:~s", [ RemoteAddress, RemotePort ]), + ?debug("dlink_tls:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), { _NRemoteAddress, _NRemotePort} = Conn = case { RemoteAddress, RemotePort } of @@ -669,7 +659,7 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, - case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of + case validate_auth_jwt(Signature, {PeerIP, PeerPort}, CompSpec) of true -> connection_authorized(FromPid, Conn, CompSpec); false -> @@ -677,37 +667,59 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, false end. +send_certs(Pid, CompSpec) -> + ?debug("send_certs (Pid = ~p)", [Pid]), + {LocalIP, LocalPort} = rvi_common:node_address_tuple(), + dlink_tls_conn:send(Pid, rvi_common:pass_log_id( + [{?DLINK_ARG_CMD, ?DLINK_CMD_CERT_EXCHANGE}, + {?DLINK_ARG_ADDRESS, LocalIP}, + {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, + {?DLINK_ARG_CERTIFICATES, [get_certificates(CompSpec)]}], CompSpec)). + send_authorize(Pid, CompSpec) -> + ?debug("send_authorize() Pid = ~p; CompSpec = ~p", [Pid, CompSpec]), {LocalIP, LocalPort} = rvi_common:node_address_tuple(), - dlink_tls_conn:send(Pid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, - { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, - { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + JWT = get_authorize_jwt(CompSpec), + dlink_tls_conn:send(Pid, rvi_common:pass_log_id( + [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE}, + {?DLINK_ARG_ADDRESS, LocalIP}, + {?DLINK_ARG_PORT, integer_to_list(LocalPort)}, + {?DLINK_ARG_SIGNATURE, JWT}], CompSpec)). + %% dlink_tls_conn:send(Pid, + %% term_to_json( + %% {struct, + %% [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + %% { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + %% { ?DLINK_ARG_ADDRESS, LocalIP }, + %% { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, + %% { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, + %% { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, + %% { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered %% with the connection manager, this is an incoming connection %% from the client. We should respond with our own authorize followed by %% a service announce + log("authorized ~s:~w", [RemoteIP, RemotePort], CompSpec), case dlink_tls_connmgr:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_tls:authorize(): New connection!"), dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid), ?debug("dlink_tls:authorize(): Sending authorize."), _Res = send_authorize(FromPid, CompSpec), - ?debug("dlink_tls:upgrade connection", []), - UgRes = dlink_tls_conn:upgrade(FromPid, server), - ?debug("upgrade result: ~p", [UgRes]), + case rvi_common:get_value(dlink_tls_role, server, CompSpec) of + server -> ok; + client -> + send_certs(FromPid, CompSpec) + end, + %% ?debug("dlink_tls:upgrade connection", []), + %% UgRes = dlink_tls_conn:upgrade(FromPid, server), + %% ?debug("upgrade result: ~p", [UgRes]), ok; _ -> - UgRes = dlink_tls_conn:upgrade(FromPid, client), - ?debug("upgrade result: ~p", [UgRes]), + %% UgRes = dlink_tls_conn:upgrade(FromPid, client), + %% ?debug("upgrade result: ~p", [UgRes]), ok end, @@ -722,14 +734,12 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> ?info("dlink_tls:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteIP, RemotePort]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), + AvailabilityMsg = availability_msg(available, FilteredServices), + log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), dlink_tls_conn:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } ]})), + rvi_common:pass_log_id( + [ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE } + | AvailabilityMsg ], CompSpec)), %% Setup ping interval gen_server:call(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), @@ -742,12 +752,8 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, base64:decode_to_string(Data)). -process_announce({struct, Elems}, FromPid, IP, Port, TID, _Vsn, CompSpec) -> - [ Avail, - {array, Svcs} ] = - opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), +process_announce(Avail, Svcs, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), - ?debug("dlink_tls:service_announce(~p): TransactionID: ~p", [Avail,TID]), ?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]), case Avail of ?DLINK_ARG_AVAILABLE -> @@ -810,8 +816,8 @@ get_certificates(CompSpec) -> error(no_certificate_found) end. -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of +validate_auth_jwt(JWT, Conn, CompSpec) -> + case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of [ok] -> true; [not_found] -> @@ -828,4 +834,18 @@ opt(K, L, Def) -> end. opts(Keys, Elems, Def) -> - [ opt(K, Elems, Def) || K <- Keys]. + Res = [ opt(K, Elems, Def) || K <- Keys], + ?debug("opts(~p) -> ~p", [Keys, Elems]), + Res. + + +log_orphan(Pfx, Fmt, Args) -> + start_log(Pfx, Fmt, Args, #component_spec{}). + +start_log(Pfx, Fmt, Args, CS) -> + LogId = rvi_log:new_id(Pfx), + rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogId, CS). + +log(Fmt, Args, CS) -> + rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). |