summaryrefslogtreecommitdiff
path: root/components/dlink_tcp
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-06-07 08:44:04 +0200
committerUlf Wiger <ulf@feuerlabs.com>2015-06-10 11:37:42 +0200
commit46547382cbc92765c51f8d581d6f65855024850f (patch)
tree69f98931fbe438fc2110870321fe603af4038bb3 /components/dlink_tcp
parentc434076126c43d1a6c8d28890169d08910fd324b (diff)
downloadrvi_core-46547382cbc92765c51f8d581d6f65855024850f.tar.gz
w.i.p. fixing auth for dlink_tcp+json
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl235
1 files changed, 167 insertions, 68 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 479e093..0df6d89 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -34,13 +34,15 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
+-include_lib("rvi_common/include/rvi_dlink.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BERT_RPC_PORT, 9999).
-define(DEFAULT_RECONNECT_INTERVAL, 5000).
-define(DEFAULT_BERT_RPC_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
+-define(DLINK_TCP_VERSION, "1.0").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -91,7 +93,7 @@ start_connection_manager() ->
CompSpec = rvi_common:get_component_specification(),
{ok, BertOpts } = rvi_common:get_module_config(data_link,
?MODULE,
- bert_rpc_server,
+ server_opts,
[],
CompSpec),
IP = proplists:get_value(ip, BertOpts, ?DEFAULT_BERT_RPC_ADDRESS),
@@ -201,12 +203,18 @@ connect_remote(IP, Port, CompSpec) ->
%% Send authorize
{ LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- connection:send(Pid,
- { authorize,
- 1, LocalIP, LocalPort, rvi_binary,
- get_certificates(CompSpec),
- get_authorize_jwt(CompSpec)
- }),
+ connection:send(
+ Pid,
+ term_to_json(
+ {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_PORT, LocalPort },
+ { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
+ { ?DLINK_ARG_CERTIFICATES,
+ {array, get_certificates(CompSpec)} },
+ { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
+ ]})),
ok;
{error, Err } ->
@@ -245,7 +253,14 @@ announce_local_service_(CompSpec,
[ ok, JWT ] = authorize_rpc:sign_message(
CompSpec, availability_msg(Availability, [Service])),
- Res = connection:send(ConnPid, {service_announce, 3, JWT}),
+ Res = connection:send(
+ ConnPid,
+ term_to_json(
+ {struct,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_SIGNATURE, JWT }
+ ]})),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -390,61 +405,62 @@ handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
ok;
handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) ->
- {ok, {struct, Elems}} = exo_json:decode_string(Payload),
+ {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)),
?debug("dlink_tcp:data(): Got ~p", [ Elems ]),
case opt(?DLINK_ARG_CMD, Elems, undefined) of
- ?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
- RemotePort,
- ProtoVersion,
- Certificate,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
- ?DLINK_ARG_PORT,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATE,
- ?DLINK_ARG_SIGNATURE],
- Elems, undefined),
-
- process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Certificate, Signature, CompSpec);
-
- ?DLINK_CMD_SERVICE_ANNOUNCE ->
- [ TransactionID,
- Status,
- { array, Services },
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_STATUS,
- ?DLINK_ARG_SERVICES,
- ?DLINK_ARG_SIGNATURE],
- Elems, undefined),
-
- process_announce(FromPid, PeerIP, PeerPort,
- TransactionID, Status, Services,
- Signature, CompSpec);
- ?DLINK_CMD_RECEIVE ->
- [ _TransactionID,
- ProtoMod,
- Data ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_MODULE,
- ?DLINK_ARG_DATA],
- Elems, undefined),
- process_data(FromPid, PeerIP, PeerPort,
- ProtoMod, Data, CompSpec);
-
- ?DLINK_CMD_PING ->
- ?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort]),
- ok;
-
- undefined ->
- ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
- ok
+ ?DLINK_CMD_AUTHORIZE ->
+ [ TransactionID,
+ RemoteAddress,
+ RemotePort,
+ ProtoVersion,
+ Signature ] =
+ opts([?DLINK_ARG_TRANSACTION_ID,
+ ?DLINK_ARG_ADDRESS,
+ ?DLINK_ARG_PORT,
+ ?DLINK_ARG_VERSION,
+ ?DLINK_ARG_SIGNATURE],
+ Elems, undefined),
+
+ process_authorize(FromPid, PeerIP, PeerPort,
+ TransactionID, RemoteAddress, RemotePort,
+ ProtoVersion, Signature, CompSpec);
+
+ ?DLINK_CMD_SERVICE_ANNOUNCE ->
+ [ TransactionID,
+ ProtoVersion,
+ Signature ] =
+ opts([?DLINK_ARG_TRANSACTION_ID,
+ ?DLINK_ARG_VERSION,
+ ?DLINK_ARG_SIGNATURE],
+ Elems, undefined),
+
+ case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
+ [ok, Msg] ->
+ process_announce(Msg, FromPid, PeerIP, PeerPort,
+ TransactionID, ProtoVersion, CompSpec);
+ _ ->
+ ?debug("Couldn't validate availability msg from ~p", [Conn])
+ end;
+
+ ?DLINK_CMD_RECEIVE ->
+ [ _TransactionID,
+ ProtoMod,
+ Data ] =
+ opts([?DLINK_ARG_TRANSACTION_ID,
+ ?DLINK_ARG_MODULE,
+ ?DLINK_ARG_DATA],
+ Elems, undefined),
+ process_data(FromPid, PeerIP, PeerPort,
+ ProtoMod, Data, CompSpec);
+
+ ?DLINK_CMD_PING ->
+ ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort ]),
+ ok;
+
+ undefined ->
+ ?warning("dlink_tcp:data() cmd undefined, ~p", [ Elems ]),
+ ok
end.
%% JSON-RPC entry point
@@ -739,20 +755,103 @@ delete_services(ConnPid, SvcNameList) ->
ok.
availability_msg(Availability, Services) ->
- {struct, [{"availability", atom_to_list(Availability)},
- {"services", {array, Services}}]}.
+ {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, {array, Services} }]}.
+
+status_string(available ) -> ?DLINK_ARG_AVAILABLE;
+status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
+
+process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, RemotePort,
+ Protocol, Signature, CompSpec) ->
+
+ ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
+ ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
+ ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
+ ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
+ ?debug("dlink_tcp:authorize(): Certificate: ~p", [ Certificate ]),
+ ?debug("dlink_tcp:authorize(): Signature: ~p", [ Signature ]),
+
+ { LocalAddress, LocalPort } = rvi_common:node_address_tuple(),
+
+ { NRemoteAddress, NRemotePort} = Conn =
+ case { RemoteAddress, RemotePort } of
+ { "0.0.0.0", 0 } ->
+
+ ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
+ [ PeerIP, PeerPort]),
+ { PeerIP, PeerPort };
+ _ -> { RemoteAddress, RemotePort}
+ end,
+
+ case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of
+ true ->
+ connection_authorized(FromPid, Conn, CompSpec);
+ false ->
+ %% close connection (how?)
+ false
+ end.
+
+send_authorize(Pid, SetupChannel, CompSpec) ->
+ {ok,[{address, Address }]} = bt_drv:local_info([address]),
+ bt_connection:send(Pid,
+ term_to_json(
+ {struct,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
+ { ?DLINK_ARG_PORT, SetupChannel },
+ { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
+ { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
+ { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+
+connection_authorized(FromPid, {RemoteAddress, RemotePort} = Conn, CompSpec) ->
+ case connection_manager:find_connection_by_pid(FromPid) of
+ not_found ->
+ ?info("dlink_tcp:authorize(): New connection!"),
+ connection_manager:add_connection(RemoteAddress, RemoteChannel, FromPid),
+ ?debug("dlink_tcp:authorize(): Sending authorize."),
+ Res = send_authorize(FromPid, RemoteChannel, CompSpec),
+ ?debug("dlink_tcp:authorize(): Sending authorize: ~p", [ Res]),
+ ok;
+ _ -> ok
+ end,
+
+ %% Send our own servide announcement to the remote server
+ %% that just authorized to us.
+ [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
+
+ [ ok, FilteredServices ] = authorize_rpc:filter_by_destination(
+ CompSpec, LocalServices, Conn),
+
+ %% Send an authorize back to the remote node
+ ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
+ [FilteredServices, RemoteAddress, RemotePort]),
+
+ [ ok, JWT ] = authorize_rpc:sign_message(
+ CompSpec, availability_msg(available, FilteredServices)),
+ connection:send(FromPid,
+ term_to_json(
+ {struct,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_SIGNATURE, JWT } ]})),
+
+ %% Setup ping interval
+ gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemotePort, FromPid }),
+ ok.
-handle_availability_msg(Msg, FromPid, IP, Port, TID, CompSpec) ->
- {ok, Avail} = rvi_common:get_json_element(["availability"], Msg),
- {ok, Svcs} = rvi_common:get_json_element(["services"], Msg),
+process_announce(Msg, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
+ [ Avail,
+ {array, Svcs} ] =
+ opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ]),
?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]),
?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]),
case Avail of
- "available" ->
+ ?DLINK_ARG_AVAILABLE ->
add_services(Svcs, FromPid),
service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE);
- "unavailable" ->
+ ?DLINK_ARG_UNAVAILABLE ->
delete_services(FromPid, Svcs),
service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
end,