diff options
author | Ulf Wiger <ulf@wiger.net> | 2015-05-26 20:43:37 +0200 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-06-10 11:28:38 +0200 |
commit | bd4dd9aeec5da35af21b2c996b05a9618ece568d (patch) | |
tree | e31087ea2e63b5e5e16635f6977dc6d656b17714 /components/dlink_tcp | |
parent | 179fbae4c5bc3fa1da7ff6515d0b295fc5de825c (diff) | |
download | rvi_core-bd4dd9aeec5da35af21b2c996b05a9618ece568d.tar.gz |
w.i.p.
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 276 |
1 files changed, 132 insertions, 144 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 52232d5..4d087aa 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -31,9 +31,9 @@ disconnect_data_link/2, send_data/5]). + -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --include_lib("rvi_common/include/rvi_dlink.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BERT_RPC_PORT, 9999). @@ -41,7 +41,6 @@ -define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.0"). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -92,7 +91,7 @@ start_connection_manager() -> CompSpec = rvi_common:get_component_specification(), {ok, BertOpts } = rvi_common:get_module_config(data_link, ?MODULE, - server_opts, + bert_rpc_server, [], CompSpec), IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS), @@ -181,6 +180,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> %% Connect to a remote RVI node. %% connect_remote(IP, Port, CompSpec) -> + ?info("connect_remote(~p, ~p)~n", [IP, Port]), case connection_manager:find_connection_by_address(IP, Port) of { ok, _Pid } -> already_connected; @@ -202,15 +202,11 @@ connect_remote(IP, Port, CompSpec) -> %% Send authorize { LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP}, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATE, "" }, - { ?DLINK_ARG_SIGNATURE, "" } ]})), + { authorize, + 1, LocalIP, LocalPort, rvi_binary, + get_certificate(CompSpec), + get_authorize_jwt(CompSpec) + }), ok; {error, Err } -> @@ -248,18 +244,9 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - Status = case Availability of - available -> ?DLINK_ARG_AVAILABLE; - unavailable -> ?DLINK_ARG_UNAVAILABLE - end, Res = connection:send(ConnPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_STATUS, Status }, - { ?DLINK_ARG_SERVICES, { array, [ Service ] }}, - { ?DLINK_ARG_SIGNATURE, "" } ]})), + {service_announce, 3, Availability, + [Service], { signature, {}}}), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -274,32 +261,31 @@ announce_local_service_(CompSpec, Service, Availability) -> get_connections(), Service, Availability). -process_authorize(FromPid, - PeerIP, - PeerPort, - TransactionID, - RemoteAddress, - RemotePort, - ProtoVersion, - Certificate, - Signature, - CompSpec) -> + +handle_socket(_FromPid, PeerIP, PeerPort, data, ping, [_CompSpec]) -> + ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]), + ok; + +handle_socket(FromPid, PeerIP, PeerPort, data, + { authorize, + TransactionID, + RemoteAddress, + RemotePort, + Protocol, + Cert, + AuthJWT}, [CompSpec]) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), - ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), + ?info( "dlink_tcp:authorize(): Protocol: ~p", [ Protocol ]), ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]), - ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]), - - - { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), + ?debug("dlink_tcp:authorize(): AuthJWT: ~p", [ AuthJWT ]), %% If the remote address and port are both reported as "0.0.0.0" and 0, %% then the client connects from behind a firewall and cannot %% accept return connections. In these cases, we will tie the %% gonnection to the peer address provided in PeerIP and PeerPort - { NRemoteAddress, NRemotePort} = + { _NRemoteAddress, _NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -310,68 +296,25 @@ process_authorize(FromPid, _ -> { RemoteAddress, RemotePort} end, - %% If FromPid (the genserver managing the socket) is not yet registered - %% with the conneciton manager, this is an incoming connection - %% from the client. We should respond with our own authorize followed by - %% a service announce - - %% FIXME: Validate certificate and signature before continuing. - case connection_manager:find_connection_by_pid(FromPid) of - not_found -> - ?info("dlink_tcp:authorize(): New connection!"), - connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid), - ?debug("dlink_tcp:authorize(): Sending authorize."), - Res = connection:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalAddress}, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATE, "" }, - { ?DLINK_ARG_SIGNATURE, "" } ]})), - ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]), - ok; - _ -> ok - end, - - %% Send our own servide announcement to the remote server - %% that just authorized to us. - [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), - - - %% Send an authorize back to the remote node - ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", - [LocalServices, NRemoteAddress, NRemotePort]), - - connection:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_STATUS, ?DLINK_ARG_AVAILABLE }, - { ?DLINK_ARG_SERVICES, { array, LocalServices }}, - { ?DLINK_ARG_SIGNATURE, "" } ]})), - - %% Setup ping interval - gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }), - ok. - + case validate_auth_jwt(AuthJWT, Cert, Conn, CompSpec) of + true -> + connection_authorized(FromPid, Conn, CompSpec); + false -> + %% close connection (how?) + false + end; -process_announce(FromPid, - RemoteIP, - RemotePort, - TransactionID, - ?DLINK_ARG_AVAILABLE, - Services, - Signature, - CompSpec) -> +handle_socket(FromPid, RemoteIP, RemotePort, data, + { service_announce, + TransactionID, + available, + Services, + Signature }, [CompSpec]) -> ?debug("dlink_tcp:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]), ?debug("dlink_tcp:service_announce(available): Remote Port: ~p", [ RemotePort ]), ?debug("dlink_tcp:service_announce(available): TransactionID: ~p", [ TransactionID ]), ?debug("dlink_tcp:service_announce(available): Signature: ~p", [ Signature ]), - ?debug("dlink_tcp:service_announce(available): Services: ~p", [ Services ]), + ?debug("dlink_tcp:service_announce(available): Service: ~p", [ Services ]), add_services(Services, FromPid), @@ -380,20 +323,17 @@ process_announce(FromPid, ok; -process_announce(FromPid, - RemoteIP, - RemotePort, - TransactionID, - ?DLINK_ARG_UNAVAILABLE, - Services, - Signature, - CompSpec) -> - +handle_socket(FromPid, RemoteIP, RemotePort, data, + { service_announce, + TransactionID, + unavailable, + Services, + Signature}, [CompSpec]) -> ?debug("dlink_tcp:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]), ?debug("dlink_tcp:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]), ?debug("dlink_tcp:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]), ?debug("dlink_tcp:service_announce(unavailable): Signature: ~p", [ Signature ]), - ?debug("dlink_tcp:service_announce(unavailable): Services: ~p", [ Services ]), + ?debug("dlink_tcp:service_announce(unavailable): Service: ~p", [ Services ]), %% Register the received services with all relevant components @@ -402,22 +342,21 @@ process_announce(FromPid, delete_services(FromPid, Services), service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE), - ok. + ok; -process_data(_FromPid, - SetupIP, - SetupPort, - ProtocolMod, - Data, - CompSpec) -> +handle_socket(_FromPid, SetupIP, SetupPort, data, + { receive_data, ProtocolMod, Data}, [CompSpec]) -> %% ?info("dlink_tcp:receive_data(): ~p", [ Data ]), ?debug("dlink_tcp:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), - Proto = list_to_atom(ProtocolMod), - Proto:receive_message(CompSpec, base64:decode_to_string(Data)), - ok. + ProtocolMod:receive_message(CompSpec, Data), + ok; +handle_socket(_FromPid, SetupIP, SetupPort, data, Data, [_CompSpec]) -> + ?warning("dlink_tcp:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?warning("dlink_tcp:unknown_data(): Unknown data: ~p", [ Data]), + ok. %% We lost the socket connection. @@ -468,8 +407,7 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> ?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), - ok. - + ok; handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) -> {ok, {struct, Elems}} = exo_json:decode_string(Payload), @@ -529,10 +467,6 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) -> ok end. - - - - %% JSON-RPC entry point %% CAlled by local exo http server handle_notification("service_available", Args) -> @@ -576,10 +510,9 @@ handle_rpc("disconenct_data_link", Args) -> handle_rpc("send_data", Args) -> { ok, ProtoMod } = rvi_common:get_json_element(["proto_mod"], Args), { ok, Service } = rvi_common:get_json_element(["service"], Args), - { ok, Data } = rvi_common:get_json_element([?DLINK_ARG_DATA], Args), + { ok, Data } = rvi_common:get_json_element(["data"], Args), { ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args), - [ Res ] = gen_server:call(?SERVER, { rvi, send_data, - [ProtoMod, Service, Data, DataLinkOpts]}), + [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}), {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; @@ -618,6 +551,7 @@ handle_cast(Other, St) -> handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> %% Do we already have a connection that support service? + ?info("dlink_tcp: setup_data_link (~p, ~p)~n", [Service, Opts]), case get_connections_by_service(Service) of [] -> %% Nop[e case proplists:get_value(target, Opts, undefined) of @@ -661,14 +595,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> - Res = connection:send(ConnPid, - term_to_json( - { struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ]})), + Res = connection:send(ConnPid, {receive_data, ProtoMod, Data}), { reply, [ Res ], St} end; @@ -679,7 +606,7 @@ handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) -> %% Create a timer to handle periodic pings. {ok, ServerOpts } = rvi_common:get_module_config(data_link, ?MODULE, - server_opts, [], + bert_rpc_server, [], St#st.cs), Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), @@ -703,7 +630,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> case connection:is_connection_up(Pid) of true -> ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]), - connection:send(Pid, term_to_json({ struct, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]})), + connection:send(Pid, ping), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); @@ -714,6 +641,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> %% Setup static nodes handle_info({ rvi_setup_persistent_connection, IP, Port, CompSpec }, St) -> + ?info("rvi_setup_persistent_connection, ~p, ~p~n", [IP, Port]), connect_and_retry_remote(IP, Port, CompSpec), { noreply, St }; @@ -727,6 +655,49 @@ terminate(_Reason, _St) -> code_change(_OldVsn, St, _Extra) -> {ok, St}. + +connection_authorized(FromPid, {NRemoteAddress, NRemotePort}, CompSpec) -> + + { LocalAddress, LocalPort } = rvi_common:node_address_tuple(), + + %% If FromPid (the genserver managing the socket) is not yet registered + %% with the conneciton manager, this is an incoming connection + %% from the client. We should respond with our own authorize followed by + %% a service announce + case connection_manager:find_connection_by_pid(FromPid) of + not_found -> + ?info("dlink_tcp:authorize(): New connection!"), + connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid), + ?debug("dlink_tcp:authorize(): Sending authorize."), + Res = connection:send(FromPid, + { authorize, + 1, LocalAddress, LocalPort, rvi_binary, + get_certificate(CompSpec), + get_authorize_jwt(CompSpec) + }), + ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]), + ok; + _ -> ok + end, + + %% Send our own servide announcement to the remote server + %% that just authorized to us. + [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), + + + %% Send an authorize back to the remote node + ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", + [LocalServices, NRemoteAddress, NRemotePort]), + + connection:send(FromPid, + { service_announce, 2, available, + LocalServices, { signature, {}}}), + + %% Setup ping interval + gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }), + ok. + + setup_reconnect_timer(MSec, IP, Port, CompSpec) -> erlang:send_after(MSec, ?MODULE, { rvi_setup_persistent_connection, @@ -784,6 +755,8 @@ delete_services(ConnPid, SvcNameList) -> }) || SvcName <- SvcNameList ], ok. + + delete_connection(Conn) -> %% Create or replace existing connection table entry %% with the sum of new and old services. @@ -816,14 +789,29 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). -term_to_json(Term) -> - binary_to_list(iolist_to_binary(exo_json:encode(Term))). -opt(K, L, Def) -> - case lists:keyfind(K, 1, L) of - {_, V} -> V; - false -> Def +get_authorize_jwt(CompSpec) -> + case authorize_rpc:get_authorize_jwt(CompSpec) of + [ok, JWT] -> + JWT; + [not_found] -> + ?error("No authorize JWT~n", []), + error(cannot_authorize) end. -opts(Keys, Elems, Def) -> - [ opt(K, Elems, Def) || K <- Keys]. +get_certificate(CompSpec) -> + case authorize_rpc:get_certificate(CompSpec) of + [ok, Cert] -> + Cert; + [not_found] -> + ?error("No certificate found~n", []), + error(no_certificate_found) + end. + +validate_auth_jwt(JWT, Cert, Conn, CompSpec) -> + case authorize_rpc:validate_authorization(CompSpec, JWT, Cert, Conn) of + [ok] -> + true; + [not_found] -> + false + end. |