diff options
Diffstat (limited to 'components/dlink_tcp/src/dlink_tcp_rpc.erl')
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 207 |
1 files changed, 65 insertions, 142 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index b3ec80d..83e0a24 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -44,7 +44,7 @@ -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.0"). +-define(DLINK_TCP_VERSION, "1.1"). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) -> %% Setup a genserver around the new connection. {ok, Pid } = connection:setup(IP, Port, Sock, - ?MODULE, handle_socket, [CompSpec] ), + ?MODULE, handle_socket, CompSpec ), %% Send authorize - { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - connection:send( - Pid, - term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, - get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) - ])), + send_authorize(Pid, CompSpec), ok; {error, Err } -> @@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) -> CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), case connect_remote(IP, list_to_integer(Port), CS) of ok -> - log("connected", [], CS), + log(result, "connected", [], CS), ok; Err -> %% Failed to connect. Sleep and try again @@ -266,16 +253,8 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), - Res = connection:send( - ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec) - ])), + Msg = availability_msg(Availability, [Service], CompSpec), + Res = connection:send(ConnPid, Msg), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) -> handle_socket_(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); -handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> +handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. -handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> +handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]), @@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - RemoteAddress, + [ RemoteAddress, RemotePort, ProtoVersion, - CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + Credentials ] = + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), - - Certificates = - case CertificatesTmp of - C when is_list(C) -> C; - undefined -> [] - end, process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CS); + RemoteAddress, RemotePort, + ProtoVersion, Credentials, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - ProtoVersion, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_SIGNATURE], + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), - Conn = {PeerIP, PeerPort}, log("sa from ~s:~w", [PeerIP, PeerPort], CS), - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - ?debug("Service Announce~nMsg = ~p~n", [Msg]), - process_announce(Msg, FromPid, PeerIP, PeerPort, - TransactionID, ProtoVersion, CompSpec); - _ -> - log("sa INVALID", [], CS), - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) -> {noreply, St}; handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) -> + ?debug("handle_socket, Arg (CS) = ~p", [Arg]), try handle_socket_(FromPid, IP, Port, Event, Arg) catch C:E -> ?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), + error("Caught ~p:~p", [C, E]), ok end, {noreply, St}; @@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> Res = connection:send(ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ])), + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, Data } + ]), { reply, [ Res ], St} end; @@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) -> }) || SvcName <- SvcNameList ], ok. -availability_msg(Availability, Services) -> - [{ ?DLINK_ARG_STATUS, status_string(Availability) }, - { ?DLINK_ARG_SERVICES, Services }]. +availability_msg(Availability, Services, CompSpec) -> + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, Services } + | log_id_tail(CompSpec) ]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, - RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> +process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, ProtoVersion, Credentials, CompSpec) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), - ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]), - ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), - + ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]), {NRemoteAddress, NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, - log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), - case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. + log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn), + connection_authorized(FromPid, Conn, CompSpec). send_authorize(Pid, CompSpec) -> {LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) ])). + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) }, + { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, + { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } + | log_id_tail(CompSpec) ]). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered @@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteIP, RemotePort]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), + AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec), log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), - connection:send(FromPid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec)])), - + connection:send(FromPid, AvailabilityMsg), %% Setup ping interval gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), ok. @@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> ?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]), ?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), Proto = list_to_existing_atom(ProtocolMod), - Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, - base64:decode_to_string(Data)). + Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data). -process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) -> - [ Avail, - Svcs ] = - opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), +process_announce(Avail, Services, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), - ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]), - ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]), + ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]), case Avail of ?DLINK_ARG_AVAILABLE -> - add_services(Svcs, FromPid), - service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); + add_services(Services, FromPid), + service_discovery_rpc:register_services(CompSpec, Services, ?MODULE); ?DLINK_ARG_UNAVAILABLE -> - delete_services(FromPid, Svcs), - service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) + delete_services(FromPid, Services), + service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE) end, ok. @@ -837,36 +775,15 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). - -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; - [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) - end. - -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of - [ok] -> - true; - [not_found] -> - false - end. - -term_to_json(Term) -> - rvi_common:term_to_json(Term). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; @@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) -> rvi_common:set_value(rvi_log_id, LogId, CS). log(Fmt, Args, CS) -> - rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). + log(info, Fmt, Args, CS). + +log(Lvl, Fmt, Args, CS) -> + rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS). + +log_id_tail(CompSpec) -> + rvi_common:log_id_json_tail(CompSpec). |