summaryrefslogtreecommitdiff
path: root/components/dlink_tcp/src/dlink_tcp_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/dlink_tcp/src/dlink_tcp_rpc.erl')
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl61
1 files changed, 47 insertions, 14 deletions
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 330985a..78de04a 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.1").
+-define(DLINK_TCP_VERSION, <<"1.1">>).
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -208,11 +208,12 @@ connect_remote(IP, Port, CompSpec) ->
[IP, Port]),
%% Setup a genserver around the new connection.
- {ok, Pid } = connection:setup(IP, Port, Sock,
+ {ok, Pid } = connection:setup(client, IP, Port, Sock,
?MODULE, handle_socket, CompSpec ),
%% Send authorize
send_authorize(Pid, CompSpec),
+ connection_manager:add_connection(IP, Port, Pid),
ok;
{error, Err } ->
@@ -283,6 +284,7 @@ handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?debug("CompSpec = ~p", [CompSpec]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -538,7 +540,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) ->
+handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) ->
%% Resolve connection pid from service
case get_connections_by_service(Service) of
@@ -552,7 +554,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
{ ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
{ ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) },
{ ?DLINK_ARG_DATA, Data }
- ]),
+ ], DataLinkOpts),
{ reply, [ Res ], St}
end;
@@ -569,7 +571,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
case connection:is_connection_up(Pid) of
true ->
?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
- connection:send(Pid, jsx:encode([{?DLINK_ARG_CMD, ?DLINK_CMD_PING}])),
+ connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
@@ -659,22 +661,53 @@ availability_msg(Availability, Services, CompSpec) ->
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
+bin(S) ->
+ iolist_to_binary(S).
+
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
- {NRemoteAddress, NRemotePort} = Conn =
- case { RemoteAddress, RemotePort } of
- { <<"0.0.0.0">>, 0 } ->
- ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
- [ PeerIP, PeerPort]),
- { PeerIP, PeerPort };
- _ -> { RemoteAddress, RemotePort}
- end,
+ F = fun() ->
+ process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec)
+ end,
+ case connection_manager:find_connection_by_address(PeerIP, PeerPort) of
+ not_found -> F();
+ BPid ->
+ deconflict_conns(FromPid, BPid, CompSpec, F)
+ end.
+deconflict_conns(APid, BPid, CsA, F) ->
+ {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA),
+ case connection:get_source_address(BPid) of
+ undefined ->
+ ?debug("Deconflict - BSrc = undefined, kill BPid (~p)", [BPid]),
+ exit(BPid, deconflict),
+ F();
+ {_,_} = BSrc when BSrc > ASrc ->
+ ?debug("Deconflict - kill BPid (~p): ASrc = ~p, BSrc = ~p", [BPid, ASrc, BSrc]),
+ exit(BPid, deconflict),
+ F();
+ BSrc ->
+ ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]),
+ exit(deconflict)
+ end.
+
+
+process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
+ _ProtoVersion, Credentials, CompSpec) ->
+ {NRemoteAddress, NRemotePort} = Conn =
+ case { RemoteAddress, RemotePort } of
+ { "0.0.0.0", 0 } ->
+ ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
+ [ PeerIP, PeerPort]),
+ { PeerIP, PeerPort };
+ _ -> { RemoteAddress, RemotePort}
+ end,
log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
authorize_rpc:store_creds(CompSpec, Credentials, Conn),
connection_authorized(FromPid, Conn, CompSpec).
@@ -683,7 +716,7 @@ send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
[{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_ADDRESS, bin(LocalIP) },
{ ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
{ ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
{ ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }