summaryrefslogtreecommitdiff
path: root/components/dlink_tcp
diff options
context:
space:
mode:
authorUlf Wiger <ulf@wiger.net>2015-05-26 20:43:37 +0200
committerUlf Wiger <ulf@feuerlabs.com>2015-06-10 11:28:38 +0200
commitbd4dd9aeec5da35af21b2c996b05a9618ece568d (patch)
treee31087ea2e63b5e5e16635f6977dc6d656b17714 /components/dlink_tcp
parent179fbae4c5bc3fa1da7ff6515d0b295fc5de825c (diff)
downloadrvi_core-bd4dd9aeec5da35af21b2c996b05a9618ece568d.tar.gz
w.i.p.
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl276
1 files changed, 132 insertions, 144 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 52232d5..4d087aa 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -31,9 +31,9 @@
disconnect_data_link/2,
send_data/5]).
+
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BERT_RPC_PORT, 9999).
@@ -41,7 +41,6 @@
-define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.0").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -92,7 +91,7 @@ start_connection_manager() ->
CompSpec = rvi_common:get_component_specification(),
{ok, BertOpts } = rvi_common:get_module_config(data_link,
?MODULE,
- server_opts,
+ bert_rpc_server,
[],
CompSpec),
IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS),
@@ -181,6 +180,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
%% Connect to a remote RVI node.
%%
connect_remote(IP, Port, CompSpec) ->
+ ?info("connect_remote(~p, ~p)~n", [IP, Port]),
case connection_manager:find_connection_by_address(IP, Port) of
{ ok, _Pid } ->
already_connected;
@@ -202,15 +202,11 @@ connect_remote(IP, Port, CompSpec) ->
%% Send authorize
{ LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP},
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATE, "" },
- { ?DLINK_ARG_SIGNATURE, "" } ]})),
+ { authorize,
+ 1, LocalIP, LocalPort, rvi_binary,
+ get_certificate(CompSpec),
+ get_authorize_jwt(CompSpec)
+ }),
ok;
{error, Err } ->
@@ -248,18 +244,9 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- Status = case Availability of
- available -> ?DLINK_ARG_AVAILABLE;
- unavailable -> ?DLINK_ARG_UNAVAILABLE
- end,
Res = connection:send(ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_STATUS, Status },
- { ?DLINK_ARG_SERVICES, { array, [ Service ] }},
- { ?DLINK_ARG_SIGNATURE, "" } ]})),
+ {service_announce, 3, Availability,
+ [Service], { signature, {}}}),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -274,32 +261,31 @@ announce_local_service_(CompSpec, Service, Availability) ->
get_connections(),
Service, Availability).
-process_authorize(FromPid,
- PeerIP,
- PeerPort,
- TransactionID,
- RemoteAddress,
- RemotePort,
- ProtoVersion,
- Certificate,
- Signature,
- CompSpec) ->
+
+handle_socket(_FromPid, PeerIP, PeerPort, data, ping, [_CompSpec]) ->
+ ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]),
+ ok;
+
+handle_socket(FromPid, PeerIP, PeerPort, data,
+ { authorize,
+ TransactionID,
+ RemoteAddress,
+ RemotePort,
+ Protocol,
+ Cert,
+ AuthJWT}, [CompSpec]) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
- ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
+ ?info( "dlink_tcp:authorize(): Protocol: ~p", [ Protocol ]),
?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]),
-
-
- { LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
+ ?debug("dlink_tcp:authorize(): AuthJWT: ~p", [ AuthJWT ]),
%% If the remote address and port are both reported as "0.0.0.0" and 0,
%% then the client connects from behind a firewall and cannot
%% accept return connections. In these cases, we will tie the
%% gonnection to the peer address provided in PeerIP and PeerPort
- { NRemoteAddress, NRemotePort} =
+ { _NRemoteAddress, _NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -310,68 +296,25 @@ process_authorize(FromPid,
_ -> { RemoteAddress, RemotePort}
end,
- %% If FromPid (the genserver managing the socket) is not yet registered
- %% with the conneciton manager, this is an incoming connection
- %% from the client. We should respond with our own authorize followed by
- %% a service announce
-
- %% FIXME: Validate certificate and signature before continuing.
- case connection_manager:find_connection_by_pid(FromPid) of
- not_found ->
- ?info("dlink_tcp:authorize(): New connection!"),
- connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid),
- ?debug("dlink_tcp:authorize(): Sending authorize."),
- Res = connection:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalAddress},
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATE, "" },
- { ?DLINK_ARG_SIGNATURE, "" } ]})),
- ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]),
- ok;
- _ -> ok
- end,
-
- %% Send our own servide announcement to the remote server
- %% that just authorized to us.
- [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
-
-
- %% Send an authorize back to the remote node
- ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
- [LocalServices, NRemoteAddress, NRemotePort]),
-
- connection:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_STATUS, ?DLINK_ARG_AVAILABLE },
- { ?DLINK_ARG_SERVICES, { array, LocalServices }},
- { ?DLINK_ARG_SIGNATURE, "" } ]})),
-
- %% Setup ping interval
- gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }),
- ok.
-
+ case validate_auth_jwt(AuthJWT, Cert, Conn, CompSpec) of
+ true ->
+ connection_authorized(FromPid, Conn, CompSpec);
+ false ->
+ %% close connection (how?)
+ false
+ end;
-process_announce(FromPid,
- RemoteIP,
- RemotePort,
- TransactionID,
- ?DLINK_ARG_AVAILABLE,
- Services,
- Signature,
- CompSpec) ->
+handle_socket(FromPid, RemoteIP, RemotePort, data,
+ { service_announce,
+ TransactionID,
+ available,
+ Services,
+ Signature }, [CompSpec]) ->
?debug("dlink_tcp:service_announce(available): Address: ~p:~p", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:service_announce(available): Remote Port: ~p", [ RemotePort ]),
?debug("dlink_tcp:service_announce(available): TransactionID: ~p", [ TransactionID ]),
?debug("dlink_tcp:service_announce(available): Signature: ~p", [ Signature ]),
- ?debug("dlink_tcp:service_announce(available): Services: ~p", [ Services ]),
+ ?debug("dlink_tcp:service_announce(available): Service: ~p", [ Services ]),
add_services(Services, FromPid),
@@ -380,20 +323,17 @@ process_announce(FromPid,
ok;
-process_announce(FromPid,
- RemoteIP,
- RemotePort,
- TransactionID,
- ?DLINK_ARG_UNAVAILABLE,
- Services,
- Signature,
- CompSpec) ->
-
+handle_socket(FromPid, RemoteIP, RemotePort, data,
+ { service_announce,
+ TransactionID,
+ unavailable,
+ Services,
+ Signature}, [CompSpec]) ->
?debug("dlink_tcp:service_announce(unavailable): Address: ~p:~p", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:service_announce(unavailable): Remote Port: ~p", [ RemotePort ]),
?debug("dlink_tcp:service_announce(unavailable): TransactionID: ~p", [ TransactionID ]),
?debug("dlink_tcp:service_announce(unavailable): Signature: ~p", [ Signature ]),
- ?debug("dlink_tcp:service_announce(unavailable): Services: ~p", [ Services ]),
+ ?debug("dlink_tcp:service_announce(unavailable): Service: ~p", [ Services ]),
%% Register the received services with all relevant components
@@ -402,22 +342,21 @@ process_announce(FromPid,
delete_services(FromPid, Services),
service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE),
- ok.
+ ok;
-process_data(_FromPid,
- SetupIP,
- SetupPort,
- ProtocolMod,
- Data,
- CompSpec) ->
+handle_socket(_FromPid, SetupIP, SetupPort, data,
+ { receive_data, ProtocolMod, Data}, [CompSpec]) ->
%% ?info("dlink_tcp:receive_data(): ~p", [ Data ]),
?debug("dlink_tcp:receive_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
- Proto = list_to_atom(ProtocolMod),
- Proto:receive_message(CompSpec, base64:decode_to_string(Data)),
- ok.
+ ProtocolMod:receive_message(CompSpec, Data),
+ ok;
+handle_socket(_FromPid, SetupIP, SetupPort, data, Data, [_CompSpec]) ->
+ ?warning("dlink_tcp:unknown_data(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?warning("dlink_tcp:unknown_data(): Unknown data: ~p", [ Data]),
+ ok.
%% We lost the socket connection.
@@ -468,8 +407,7 @@ handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
?info("dlink_tcp:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
- ok.
-
+ ok;
handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) ->
{ok, {struct, Elems}} = exo_json:decode_string(Payload),
@@ -529,10 +467,6 @@ handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) ->
ok
end.
-
-
-
-
%% JSON-RPC entry point
%% CAlled by local exo http server
handle_notification("service_available", Args) ->
@@ -576,10 +510,9 @@ handle_rpc("disconenct_data_link", Args) ->
handle_rpc("send_data", Args) ->
{ ok, ProtoMod } = rvi_common:get_json_element(["proto_mod"], Args),
{ ok, Service } = rvi_common:get_json_element(["service"], Args),
- { ok, Data } = rvi_common:get_json_element([?DLINK_ARG_DATA], Args),
+ { ok, Data } = rvi_common:get_json_element(["data"], Args),
{ ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args),
- [ Res ] = gen_server:call(?SERVER, { rvi, send_data,
- [ProtoMod, Service, Data, DataLinkOpts]}),
+ [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}),
{ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
@@ -618,6 +551,7 @@ handle_cast(Other, St) ->
handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
%% Do we already have a connection that support service?
+ ?info("dlink_tcp: setup_data_link (~p, ~p)~n", [Service, Opts]),
case get_connections_by_service(Service) of
[] -> %% Nop[e
case proplists:get_value(target, Opts, undefined) of
@@ -661,14 +595,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
- Res = connection:send(ConnPid,
- term_to_json(
- { struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ]})),
+ Res = connection:send(ConnPid, {receive_data, ProtoMod, Data}),
{ reply, [ Res ], St}
end;
@@ -679,7 +606,7 @@ handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
{ok, ServerOpts } = rvi_common:get_module_config(data_link,
?MODULE,
- server_opts, [],
+ bert_rpc_server, [],
St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
@@ -703,7 +630,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
case connection:is_connection_up(Pid) of
true ->
?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
- connection:send(Pid, term_to_json({ struct, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]})),
+ connection:send(Pid, ping),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
@@ -714,6 +641,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
%% Setup static nodes
handle_info({ rvi_setup_persistent_connection, IP, Port, CompSpec }, St) ->
+ ?info("rvi_setup_persistent_connection, ~p, ~p~n", [IP, Port]),
connect_and_retry_remote(IP, Port, CompSpec),
{ noreply, St };
@@ -727,6 +655,49 @@ terminate(_Reason, _St) ->
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
+
+connection_authorized(FromPid, {NRemoteAddress, NRemotePort}, CompSpec) ->
+
+ { LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
+
+ %% If FromPid (the genserver managing the socket) is not yet registered
+ %% with the conneciton manager, this is an incoming connection
+ %% from the client. We should respond with our own authorize followed by
+ %% a service announce
+ case connection_manager:find_connection_by_pid(FromPid) of
+ not_found ->
+ ?info("dlink_tcp:authorize(): New connection!"),
+ connection_manager:add_connection(NRemoteAddress, NRemotePort, FromPid),
+ ?debug("dlink_tcp:authorize(): Sending authorize."),
+ Res = connection:send(FromPid,
+ { authorize,
+ 1, LocalAddress, LocalPort, rvi_binary,
+ get_certificate(CompSpec),
+ get_authorize_jwt(CompSpec)
+ }),
+ ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]),
+ ok;
+ _ -> ok
+ end,
+
+ %% Send our own servide announcement to the remote server
+ %% that just authorized to us.
+ [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
+
+
+ %% Send an authorize back to the remote node
+ ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
+ [LocalServices, NRemoteAddress, NRemotePort]),
+
+ connection:send(FromPid,
+ { service_announce, 2, available,
+ LocalServices, { signature, {}}}),
+
+ %% Setup ping interval
+ gen_server:call(?SERVER, { setup_initial_ping, NRemoteAddress, NRemotePort, FromPid }),
+ ok.
+
+
setup_reconnect_timer(MSec, IP, Port, CompSpec) ->
erlang:send_after(MSec, ?MODULE,
{ rvi_setup_persistent_connection,
@@ -784,6 +755,8 @@ delete_services(ConnPid, SvcNameList) ->
}) || SvcName <- SvcNameList ],
ok.
+
+
delete_connection(Conn) ->
%% Create or replace existing connection table entry
%% with the sum of new and old services.
@@ -816,14 +789,29 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-term_to_json(Term) ->
- binary_to_list(iolist_to_binary(exo_json:encode(Term))).
-opt(K, L, Def) ->
- case lists:keyfind(K, 1, L) of
- {_, V} -> V;
- false -> Def
+get_authorize_jwt(CompSpec) ->
+ case authorize_rpc:get_authorize_jwt(CompSpec) of
+ [ok, JWT] ->
+ JWT;
+ [not_found] ->
+ ?error("No authorize JWT~n", []),
+ error(cannot_authorize)
end.
-opts(Keys, Elems, Def) ->
- [ opt(K, Elems, Def) || K <- Keys].
+get_certificate(CompSpec) ->
+ case authorize_rpc:get_certificate(CompSpec) of
+ [ok, Cert] ->
+ Cert;
+ [not_found] ->
+ ?error("No certificate found~n", []),
+ error(no_certificate_found)
+ end.
+
+validate_auth_jwt(JWT, Cert, Conn, CompSpec) ->
+ case authorize_rpc:validate_authorization(CompSpec, JWT, Cert, Conn) of
+ [ok] ->
+ true;
+ [not_found] ->
+ false
+ end.