summaryrefslogtreecommitdiff
path: root/components/dlink_tcp/src/dlink_tcp_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/dlink_tcp/src/dlink_tcp_rpc.erl')
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl207
1 files changed, 65 insertions, 142 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index b3ec80d..83e0a24 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.0").
+-define(DLINK_TCP_VERSION, "1.1").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) ->
%% Setup a genserver around the new connection.
{ok, Pid } = connection:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
+ ?MODULE, handle_socket, CompSpec ),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- connection:send(
- Pid,
- term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
@@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
case connect_remote(IP, list_to_integer(Port), CS) of
ok ->
- log("connected", [], CS),
+ log(result, "connected", [], CS),
ok;
Err -> %% Failed to connect. Sleep and try again
@@ -266,16 +253,8 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = connection:send(
- ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = connection:send(ConnPid, Msg),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) ->
handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]),
@@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemotePort,
ProtoVersion,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- C when is_list(C) -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CS);
+ RemoteAddress, RemotePort,
+ ProtoVersion, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- ?debug("Service Announce~nMsg = ~p~n", [Msg]),
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- log("sa INVALID", [], CS),
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) ->
{noreply, St};
handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) ->
+ ?debug("handle_socket, Arg (CS) = ~p", [Arg]),
try handle_socket_(FromPid, IP, Port, Event, Arg)
catch
C:E ->
?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]),
+ error("Caught ~p:~p", [C, E]),
ok
end,
{noreply, St};
@@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
Res = connection:send(ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ])),
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
@@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) ->
}) || SvcName <- SvcNameList ],
ok.
-availability_msg(Availability, Services) ->
- [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, Services }].
+availability_msg(Availability, Services, CompSpec) ->
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }
+ | log_id_tail(CompSpec) ].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
-
+ ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
{NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec) ])).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
+ { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec) ]).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
@@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
- connection:send(FromPid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)])),
-
+ connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
ok.
@@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
Proto = list_to_existing_atom(ProtocolMod),
- Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
- base64:decode_to_string(Data)).
+ Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data).
-process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- Svcs ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Services, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]),
- ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]),
+ ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]),
case Avail of
?DLINK_ARG_AVAILABLE ->
- add_services(Svcs, FromPid),
- service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE);
+ add_services(Services, FromPid),
+ service_discovery_rpc:register_services(CompSpec, Services, ?MODULE);
?DLINK_ARG_UNAVAILABLE ->
- delete_services(FromPid, Svcs),
- service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
+ delete_services(FromPid, Services),
+ service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE)
end,
ok.
@@ -837,36 +775,15 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
-
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
-
-term_to_json(Term) ->
- rvi_common:term_to_json(Term).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) ->
rvi_common:set_value(rvi_log_id, LogId, CS).
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).