diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-06-07 08:44:04 +0200 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-06-10 11:37:42 +0200 |
commit | 46547382cbc92765c51f8d581d6f65855024850f (patch) | |
tree | 69f98931fbe438fc2110870321fe603af4038bb3 /components/dlink_tcp | |
parent | c434076126c43d1a6c8d28890169d08910fd324b (diff) | |
download | rvi_core-46547382cbc92765c51f8d581d6f65855024850f.tar.gz |
w.i.p. fixing auth for dlink_tcp+json
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 235 |
1 files changed, 167 insertions, 68 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 479e093..0df6d89 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -34,13 +34,15 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). +-include_lib("rvi_common/include/rvi_dlink.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BERT_RPC_PORT, 9999). -define(DEFAULT_RECONNECT_INTERVAL, 5000). -define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). +-define(DLINK_TCP_VERSION, "1.0"). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -91,7 +93,7 @@ start_connection_manager() -> CompSpec = rvi_common:get_component_specification(), {ok, BertOpts } = rvi_common:get_module_config(data_link, ?MODULE, - bert_rpc_server, + server_opts, [], CompSpec), IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS), @@ -201,12 +203,18 @@ connect_remote(IP, Port, CompSpec) -> %% Send authorize { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - connection:send(Pid, - { authorize, - 1, LocalIP, LocalPort, rvi_binary, - get_certificates(CompSpec), - get_authorize_jwt(CompSpec) - }), + connection:send( + Pid, + term_to_json( + {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, LocalPort }, + { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, + { ?DLINK_ARG_CERTIFICATES, + {array, get_certificates(CompSpec)} }, + { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } + ]})), ok; {error, Err } -> @@ -245,7 +253,14 @@ announce_local_service_(CompSpec, [ ok, JWT ] = authorize_rpc:sign_message( CompSpec, availability_msg(Availability, [Service])), - Res = connection:send(ConnPid, {service_announce, 3, JWT}), + Res = connection:send( + ConnPid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_SIGNATURE, JWT } + ]})), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -390,61 +405,62 @@ handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> ok; handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) -> - {ok, {struct, Elems}} = exo_json:decode_string(Payload), + {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)), ?debug("dlink_tcp:data(): Got ~p", [ Elems ]), case opt(?DLINK_ARG_CMD, Elems, undefined) of - ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, - RemotePort, - ProtoVersion, - Certificate, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, - ?DLINK_ARG_PORT, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATE, - ?DLINK_ARG_SIGNATURE], - Elems, undefined), - - process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Certificate, Signature, CompSpec); - - ?DLINK_CMD_SERVICE_ANNOUNCE -> - [ TransactionID, - Status, - { array, Services }, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_STATUS, - ?DLINK_ARG_SERVICES, - ?DLINK_ARG_SIGNATURE], - Elems, undefined), - - process_announce(FromPid, PeerIP, PeerPort, - TransactionID, Status, Services, - Signature, CompSpec); - ?DLINK_CMD_RECEIVE -> - [ _TransactionID, - ProtoMod, - Data ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_MODULE, - ?DLINK_ARG_DATA], - Elems, undefined), - process_data(FromPid, PeerIP, PeerPort, - ProtoMod, Data, CompSpec); - - ?DLINK_CMD_PING -> - ?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]), - ok; - - undefined -> - ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]), - ok + ?DLINK_CMD_AUTHORIZE -> + [ TransactionID, + RemoteAddress, + RemotePort, + ProtoVersion, + Signature ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_ADDRESS, + ?DLINK_ARG_PORT, + ?DLINK_ARG_VERSION, + ?DLINK_ARG_SIGNATURE], + Elems, undefined), + + process_authorize(FromPid, PeerIP, PeerPort, + TransactionID, RemoteAddress, RemotePort, + ProtoVersion, Signature, CompSpec); + + ?DLINK_CMD_SERVICE_ANNOUNCE -> + [ TransactionID, + ProtoVersion, + Signature ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_VERSION, + ?DLINK_ARG_SIGNATURE], + Elems, undefined), + + case authorize_rpc:validate_message(CompSpec, Signature, Conn) of + [ok, Msg] -> + process_announce(Msg, FromPid, PeerIP, PeerPort, + TransactionID, ProtoVersion, CompSpec); + _ -> + ?debug("Couldn't validate availability msg from ~p", [Conn]) + end; + + ?DLINK_CMD_RECEIVE -> + [ _TransactionID, + ProtoMod, + Data ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_MODULE, + ?DLINK_ARG_DATA], + Elems, undefined), + process_data(FromPid, PeerIP, PeerPort, + ProtoMod, Data, CompSpec); + + ?DLINK_CMD_PING -> + ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort ]), + ok; + + undefined -> + ?warning("dlink_tcp:data() cmd undefined, ~p", [ Elems ]), + ok end. %% JSON-RPC entry point @@ -739,20 +755,103 @@ delete_services(ConnPid, SvcNameList) -> ok. availability_msg(Availability, Services) -> - {struct, [{"availability", atom_to_list(Availability)}, - {"services", {array, Services}}]}. + {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, {array, Services} }]}. + +status_string(available ) -> ?DLINK_ARG_AVAILABLE; +status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. + +process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, RemotePort, + Protocol, Signature, CompSpec) -> + + ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), + ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), + ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), + ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), + ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]), + ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]), + + { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), + + { NRemoteAddress, NRemotePort} = Conn = + case { RemoteAddress, RemotePort } of + { "0.0.0.0", 0 } -> + + ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", + [ PeerIP, PeerPort]), + { PeerIP, PeerPort }; + _ -> { RemoteAddress, RemotePort} + end, + + case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of + true -> + connection_authorized(FromPid, Conn, CompSpec); + false -> + %% close connection (how?) + false + end. + +send_authorize(Pid, SetupChannel, CompSpec) -> + {ok,[{address, Address }]} = bt_drv:local_info([address]), + bt_connection:send(Pid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, + { ?DLINK_ARG_PORT, SetupChannel }, + { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, + { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, + { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + +connection_authorized(FromPid, {RemoteAddress, RemotePort} = Conn, CompSpec) -> + case connection_manager:find_connection_by_pid(FromPid) of + not_found -> + ?info("dlink_tcp:authorize(): New connection!"), + connection_manager:add_connection(RemoteAddress, RemoteChannel, FromPid), + ?debug("dlink_tcp:authorize(): Sending authorize."), + Res = send_authorize(FromPid, RemoteChannel, CompSpec), + ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]), + ok; + _ -> ok + end, + + %% Send our own servide announcement to the remote server + %% that just authorized to us. + [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), + + [ ok, FilteredServices ] = authorize_rpc:filter_by_destination( + CompSpec, LocalServices, Conn), + + %% Send an authorize back to the remote node + ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", + [FilteredServices, RemoteAddress, RemotePort]), + + [ ok, JWT ] = authorize_rpc:sign_message( + CompSpec, availability_msg(available, FilteredServices)), + connection:send(FromPid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_SIGNATURE, JWT } ]})), + + %% Setup ping interval + gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemotePort, FromPid }), + ok. -handle_availability_msg(Msg, FromPid, IP, Port, TID, CompSpec) -> - {ok, Avail} = rvi_common:get_json_element(["availability"], Msg), - {ok, Svcs} = rvi_common:get_json_element(["services"], Msg), +process_announce(Msg, FromPid, IP, Port, TID, _Vsn, CompSpec) -> + [ Avail, + {array, Svcs} ] = + opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ]), ?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]), ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]), case Avail of - "available" -> + ?DLINK_ARG_AVAILABLE -> add_services(Svcs, FromPid), service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); - "unavailable" -> + ?DLINK_ARG_UNAVAILABLE -> delete_services(FromPid, Svcs), service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) end, |