summaryrefslogtreecommitdiff
path: root/components/dlink_tcp
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:17:30 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:22:36 -0800
commit37dd6cef3e8abdee0829aabf121a2ca5dd35b14e (patch)
treeb7174d0d66f2db6f9a0d724213ad19e427e22847 /components/dlink_tcp
parent7922125aba23033945e3b55a4bf78ef8e84521d0 (diff)
downloadrvi_core-37dd6cef3e8abdee0829aabf121a2ca5dd35b14e.tar.gz
fragmentation tests
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r--components/dlink_tcp/src/connection.erl99
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl61
-rw-r--r--components/dlink_tcp/src/gen_nb_server.erl11
-rw-r--r--components/dlink_tcp/src/listener.erl2
4 files changed, 135 insertions, 38 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 7229b3c..1d4753e 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -26,13 +26,15 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([setup/6]).
+-export([setup/7]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
-export([terminate_connection/2]).
+-export([get_source_address/1]).
-define(SERVER, ?MODULE).
@@ -47,6 +49,8 @@
args = undefined,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ decode_st = <<>>,
+ frag_opts = [],
cs
}).
@@ -55,9 +59,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, CS) ->
+setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server ->
?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
+ case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) ->
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(IP, Port, Data) ->
case connection_manager:find_connection_by_address(IP, Port) of
{ok, Pid} ->
@@ -82,6 +88,9 @@ send(IP, Port, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -106,6 +115,9 @@ is_connection_up(IP, Port) ->
false
end.
+get_source_address(Pid) ->
+ gen_server:call(Pid, get_source_address).
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -124,11 +136,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
- case IP of
- undefined -> ok;
- _ -> connection_manager:add_connection(IP, Port, self())
- end,
+init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): self(): ~p", [self()]),
?debug("connection:init(): IP: ~p", [IP]),
?debug("connection:init(): Port: ~p", [Port]),
@@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Function: ~p", [Fun]),
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
func = Fun,
packet_mod = PktMod,
packet_st = PktSt,
- cs = CompSpec
+ frag_opts = FragOpts,
+ cs = rvi_common:save_source_address(
+ Role, Sock, rvi_common:set_value(role, Role, CompSpec))
}}.
get_module_config(Key, Default, CS) ->
@@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) ->
%% @end
%%--------------------------------------------------------------------
-
-handle_call(terminate_connection, _From, St) ->
+handle_call(get_source_address, _, #st{cs = CS} = St) ->
+ {reply, rvi_common:get_source_address(CS), St};
+handle_call(terminate_connection, _From, St) ->
?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.ip, St#st.port}]),
@@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
- ?debug("Encoded = ~p", [Encoded]),
gen_tcp:send(St#st.sock, Encoded),
{noreply, St#st{packet_st = PSt1}};
+handle_cast({send, Data, Opts}, #st{sock = Socket,
+ packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p",
+ [Opts, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE,
+ fun() ->
+ gen_tcp:send(Socket, Bin)
+ end),
+ {noreply, St1};
+handle_cast({send_data, Data}, #st{sock = Sock} = St) ->
+ gen_tcp:send(Sock, Data),
+ {noreply, St};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
packet_mod = PMod,
- packet_st = PSt} = State) ->
- ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]),
- case PMod:decode(Data, fun(Elems) ->
- handle_elements(Elems, State)
- end, PSt) of
- {ok, PSt1} ->
+ packet_st = PSt,
+ decode_st = DSt,
+ frag_opts = FragOpts} = State) ->
+ ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ",
+ [Data, PMod, PSt, IP, Port]),
+ case dlink_data:decode(Data, fun(Elems) ->
+ got_msg(Elems, State)
+ end, DSt, ?MODULE, FragOpts) of
+ {ok, DSt1} = Ok ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}};
+ {noreply, State#st{decode_st = DSt1}};
{error, Reason} ->
?error("decode failed, Reason = ~p", [Reason]),
{stop, Reason, State}
end;
+ %% case PMod:decode(Data, fun(Elems) ->
+ %% handle_elements(Elems, State)
+ %% end, PSt) of
+ %% {ok, PSt1} ->
+ %% inet:setopts(Sock, [{active, once}]),
+ %% {noreply, State#st{packet_st = PSt1}};
+ %% {error, Reason} ->
+ %% ?error("decode failed, Reason = ~p", [Reason]),
+ %% {stop, Reason, State}
+ %% end;
handle_info({tcp_closed, Sock},
#st { ip = IP,
@@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) ->
%% {ok, PSt1};
%% { ->
-handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
- ip = IP, port = Port}) ->
+%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) ->
+%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts),
+%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]),
+%% case MaybeF of
+%% true ->
+%% %% It was a fragment, but not a complete message yet
+%% St;
+%% {true, Msg} ->
+%% #st{packet_mod = PMod, packet_st = PSt} = St,
+%% PMod:decode(Msg, fun(Elems) ->
+%% got_msg(Elems, St)
+%% end, PSt);
+%% false ->
+%% got_msg(Elements, St)
+%% end.
+
+got_msg(Elements, #st{ip = IP, port = Port,
+ mod = Mod, func = Fun, cs = CS}) ->
?debug("data complete: Processed: ~p",
[authorize_keys:abbrev(Elements)]),
Mod:Fun(self(), IP, Port, data, Elements, CS).
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 330985a..78de04a 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.1").
+-define(DLINK_TCP_VERSION, <<"1.1">>).
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -208,11 +208,12 @@ connect_remote(IP, Port, CompSpec) ->
[IP, Port]),
%% Setup a genserver around the new connection.
- {ok, Pid } = connection:setup(IP, Port, Sock,
+ {ok, Pid } = connection:setup(client, IP, Port, Sock,
?MODULE, handle_socket, CompSpec ),
%% Send authorize
send_authorize(Pid, CompSpec),
+ connection_manager:add_connection(IP, Port, Pid),
ok;
{error, Err } ->
@@ -283,6 +284,7 @@ handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?debug("CompSpec = ~p", [CompSpec]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -538,7 +540,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) ->
+handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) ->
%% Resolve connection pid from service
case get_connections_by_service(Service) of
@@ -552,7 +554,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
{ ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
{ ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) },
{ ?DLINK_ARG_DATA, Data }
- ]),
+ ], DataLinkOpts),
{ reply, [ Res ], St}
end;
@@ -569,7 +571,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
case connection:is_connection_up(Pid) of
true ->
?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
- connection:send(Pid, jsx:encode([{?DLINK_ARG_CMD, ?DLINK_CMD_PING}])),
+ connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
@@ -659,22 +661,53 @@ availability_msg(Availability, Services, CompSpec) ->
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
+bin(S) ->
+ iolist_to_binary(S).
+
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
- {NRemoteAddress, NRemotePort} = Conn =
- case { RemoteAddress, RemotePort } of
- { <<"0.0.0.0">>, 0 } ->
- ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
- [ PeerIP, PeerPort]),
- { PeerIP, PeerPort };
- _ -> { RemoteAddress, RemotePort}
- end,
+ F = fun() ->
+ process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec)
+ end,
+ case connection_manager:find_connection_by_address(PeerIP, PeerPort) of
+ not_found -> F();
+ BPid ->
+ deconflict_conns(FromPid, BPid, CompSpec, F)
+ end.
+deconflict_conns(APid, BPid, CsA, F) ->
+ {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA),
+ case connection:get_source_address(BPid) of
+ undefined ->
+ ?debug("Deconflict - BSrc = undefined, kill BPid (~p)", [BPid]),
+ exit(BPid, deconflict),
+ F();
+ {_,_} = BSrc when BSrc > ASrc ->
+ ?debug("Deconflict - kill BPid (~p): ASrc = ~p, BSrc = ~p", [BPid, ASrc, BSrc]),
+ exit(BPid, deconflict),
+ F();
+ BSrc ->
+ ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]),
+ exit(deconflict)
+ end.
+
+
+process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
+ _ProtoVersion, Credentials, CompSpec) ->
+ {NRemoteAddress, NRemotePort} = Conn =
+ case { RemoteAddress, RemotePort } of
+ { "0.0.0.0", 0 } ->
+ ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
+ [ PeerIP, PeerPort]),
+ { PeerIP, PeerPort };
+ _ -> { RemoteAddress, RemotePort}
+ end,
log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
authorize_rpc:store_creds(CompSpec, Credentials, Conn),
connection_authorized(FromPid, Conn, CompSpec).
@@ -683,7 +716,7 @@ send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
[{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_ADDRESS, bin(LocalIP) },
{ ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
{ ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
{ ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl
index ae0a605..16693ae 100644
--- a/components/dlink_tcp/src/gen_nb_server.erl
+++ b/components/dlink_tcp/src/gen_nb_server.erl
@@ -90,7 +90,8 @@ store_cb_state(CBState, State) when is_record(State, state) ->
%% @doc Adds a new listener socket to be managed by gen_nb_server
%% NOTE: Should only be called by the submodule
-spec add_listen_socket({string(), integer()}, #state{}) -> {ok, #state{}} | {error, any()}.
-add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) ->
+add_listen_socket({IpAddr0, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) ->
+ IpAddr = str(IpAddr0),
Key = {IpAddr, Port},
case dict:find(Key, Socks) of
{ok, _} ->
@@ -108,7 +109,8 @@ add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=
%% @doc Removes a new listener socket to be managed by gen_nb_server
%% NOTE: Should only be called by the submodule
-spec remove_listen_socket({string(), integer()}, #state{}) -> {error, not_listening} | {ok, #state{}}.
-remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
+remove_listen_socket({IpAddr0, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
+ IpAddr = str(IpAddr0),
Key = {IpAddr, Port},
case dict:find(Key, Socks) of
error ->
@@ -119,6 +121,11 @@ remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
addrs=dict:erase(Sock, Addrs)}}
end.
+str(Addr) when is_list(Addr) ->
+ Addr;
+str(Addr) when is_binary(Addr) ->
+ binary_to_list(Addr).
+
%% @doc Returns the callback module's state
-spec init([atom()|any()]) -> {ok, #state{}} | {error, bad_init_state} | {error, any()}.
init([CallbackModule, InitParams]) ->
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 6def59a..88c3d23 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -101,7 +101,7 @@ new_connection(IP, Port, Sock, State) ->
%% IP and Port are garbage. We'll grab peername when we get our
%% first data.
%% Provide component spec as extra arg.
- {ok, _P} = connection:setup(undefined, 0, Sock,
+ {ok, _P} = connection:setup(server, undefined, 0, Sock,
dlink_tcp_rpc,
handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.