diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-12 08:17:30 -0800 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2016-01-12 08:22:36 -0800 |
commit | 37dd6cef3e8abdee0829aabf121a2ca5dd35b14e (patch) | |
tree | b7174d0d66f2db6f9a0d724213ad19e427e22847 /components/dlink_tcp | |
parent | 7922125aba23033945e3b55a4bf78ef8e84521d0 (diff) | |
download | rvi_core-37dd6cef3e8abdee0829aabf121a2ca5dd35b14e.tar.gz |
fragmentation tests
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 99 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 61 | ||||
-rw-r--r-- | components/dlink_tcp/src/gen_nb_server.erl | 11 | ||||
-rw-r--r-- | components/dlink_tcp/src/listener.erl | 2 |
4 files changed, 135 insertions, 38 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 7229b3c..1d4753e 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -26,13 +26,15 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([setup/6]). +-export([setup/7]). -export([send/2]). -export([send/3]). +-export([send_data/2]). -export([is_connection_up/1]). -export([is_connection_up/2]). -export([terminate_connection/1]). -export([terminate_connection/2]). +-export([get_source_address/1]). -define(SERVER, ?MODULE). @@ -47,6 +49,8 @@ args = undefined, packet_mod = ?PACKET_MOD, packet_st = [], + decode_st = <<>>, + frag_opts = [], cs }). @@ -55,9 +59,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, CS) -> +setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server -> ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of + case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) -> send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). +send(Pid, Data, Opts) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data, Opts}); send(IP, Port, Data) -> case connection_manager:find_connection_by_address(IP, Port) of {ok, Pid} -> @@ -82,6 +88,9 @@ send(IP, Port, Data) -> end. +send_data(Pid, Data) -> + gen_server:cast(Pid, {send_data, Data}). + terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). @@ -106,6 +115,9 @@ is_connection_up(IP, Port) -> false end. +get_source_address(Pid) -> + gen_server:call(Pid, get_source_address). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -124,11 +136,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, CompSpec}) -> - case IP of - undefined -> ok; - _ -> connection_manager:add_connection(IP, Port, self()) - end, +init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): self(): ~p", [self()]), ?debug("connection:init(): IP: ~p", [IP]), ?debug("connection:init(): Port: ~p", [Port]), @@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): Function: ~p", [Fun]), {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), PktSt = PktMod:init(CompSpec), + {ok, FragOpts} = get_module_config( + frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec), {ok, #st{ ip = IP, port = Port, @@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> func = Fun, packet_mod = PktMod, packet_st = PktSt, - cs = CompSpec + frag_opts = FragOpts, + cs = rvi_common:save_source_address( + Role, Sock, rvi_common:set_value(role, Role, CompSpec)) }}. get_module_config(Key, Default, CS) -> @@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) -> %% @end %%-------------------------------------------------------------------- - -handle_call(terminate_connection, _From, St) -> +handle_call(get_source_address, _, #st{cs = CS} = St) -> + {reply, rvi_common:get_source_address(CS), St}; +handle_call(terminate_connection, _From, St) -> ?debug("~p:handle_call(terminate_connection): Terminating: ~p", [ ?MODULE, {St#st.ip, St#st.port}]), @@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), - ?debug("Encoded = ~p", [Encoded]), gen_tcp:send(St#st.sock, Encoded), {noreply, St#st{packet_st = PSt1}}; +handle_cast({send, Data, Opts}, #st{sock = Socket, + packet_mod = PMod, + packet_st = PSt, + frag_opts = FragOpts} = St) -> + ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p", + [Opts, FragOpts]), + {ok, Bin, PSt1} = PMod:encode(Data, PSt), + St1 = St#st{packet_st = PSt1}, + rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, + fun() -> + gen_tcp:send(Socket, Bin) + end), + {noreply, St1}; +handle_cast({send_data, Data}, #st{sock = Sock} = St) -> + gen_tcp:send(Sock, Data), + {noreply, St}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, packet_mod = PMod, - packet_st = PSt} = State) -> - ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]), - case PMod:decode(Data, fun(Elems) -> - handle_elements(Elems, State) - end, PSt) of - {ok, PSt1} -> + packet_st = PSt, + decode_st = DSt, + frag_opts = FragOpts} = State) -> + ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ", + [Data, PMod, PSt, IP, Port]), + case dlink_data:decode(Data, fun(Elems) -> + got_msg(Elems, State) + end, DSt, ?MODULE, FragOpts) of + {ok, DSt1} = Ok -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st{packet_st = PSt1}}; + {noreply, State#st{decode_st = DSt1}}; {error, Reason} -> ?error("decode failed, Reason = ~p", [Reason]), {stop, Reason, State} end; + %% case PMod:decode(Data, fun(Elems) -> + %% handle_elements(Elems, State) + %% end, PSt) of + %% {ok, PSt1} -> + %% inet:setopts(Sock, [{active, once}]), + %% {noreply, State#st{packet_st = PSt1}}; + %% {error, Reason} -> + %% ?error("decode failed, Reason = ~p", [Reason]), + %% {stop, Reason, State} + %% end; handle_info({tcp_closed, Sock}, #st { ip = IP, @@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) -> %% {ok, PSt1}; %% { -> -handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, - ip = IP, port = Port}) -> +%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) -> +%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts), +%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]), +%% case MaybeF of +%% true -> +%% %% It was a fragment, but not a complete message yet +%% St; +%% {true, Msg} -> +%% #st{packet_mod = PMod, packet_st = PSt} = St, +%% PMod:decode(Msg, fun(Elems) -> +%% got_msg(Elems, St) +%% end, PSt); +%% false -> +%% got_msg(Elements, St) +%% end. + +got_msg(Elements, #st{ip = IP, port = Port, + mod = Mod, func = Fun, cs = CS}) -> ?debug("data complete: Processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), IP, Port, data, Elements, CS). diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 330985a..78de04a 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -44,7 +44,7 @@ -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.1"). +-define(DLINK_TCP_VERSION, <<"1.1">>). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -208,11 +208,12 @@ connect_remote(IP, Port, CompSpec) -> [IP, Port]), %% Setup a genserver around the new connection. - {ok, Pid } = connection:setup(IP, Port, Sock, + {ok, Pid } = connection:setup(client, IP, Port, Sock, ?MODULE, handle_socket, CompSpec ), %% Send authorize send_authorize(Pid, CompSpec), + connection_manager:add_connection(IP, Port, Pid), ok; {error, Err } -> @@ -283,6 +284,7 @@ handle_socket_(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?debug("CompSpec = ~p", [CompSpec]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -538,7 +540,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> { reply, [ Res ], St }; -handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) -> +handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) -> %% Resolve connection pid from service case get_connections_by_service(Service) of @@ -552,7 +554,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, { ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) }, { ?DLINK_ARG_DATA, Data } - ]), + ], DataLinkOpts), { reply, [ Res ], St} end; @@ -569,7 +571,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> case connection:is_connection_up(Pid) of true -> ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]), - connection:send(Pid, jsx:encode([{?DLINK_ARG_CMD, ?DLINK_CMD_PING}])), + connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); @@ -659,22 +661,53 @@ availability_msg(Availability, Services, CompSpec) -> status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. +bin(S) -> + iolist_to_binary(S). + process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, ProtoVersion, Credentials, CompSpec) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]), - {NRemoteAddress, NRemotePort} = Conn = - case { RemoteAddress, RemotePort } of - { <<"0.0.0.0">>, 0 } -> - ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", - [ PeerIP, PeerPort]), - { PeerIP, PeerPort }; - _ -> { RemoteAddress, RemotePort} - end, + F = fun() -> + process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, ProtoVersion, Credentials, CompSpec) + end, + case connection_manager:find_connection_by_address(PeerIP, PeerPort) of + not_found -> F(); + BPid -> + deconflict_conns(FromPid, BPid, CompSpec, F) + end. +deconflict_conns(APid, BPid, CsA, F) -> + {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA), + case connection:get_source_address(BPid) of + undefined -> + ?debug("Deconflict - BSrc = undefined, kill BPid (~p)", [BPid]), + exit(BPid, deconflict), + F(); + {_,_} = BSrc when BSrc > ASrc -> + ?debug("Deconflict - kill BPid (~p): ASrc = ~p, BSrc = ~p", [BPid, ASrc, BSrc]), + exit(BPid, deconflict), + F(); + BSrc -> + ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]), + exit(deconflict) + end. + + +process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, + _ProtoVersion, Credentials, CompSpec) -> + {NRemoteAddress, NRemotePort} = Conn = + case { RemoteAddress, RemotePort } of + { "0.0.0.0", 0 } -> + ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", + [ PeerIP, PeerPort]), + { PeerIP, PeerPort }; + _ -> { RemoteAddress, RemotePort} + end, log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), authorize_rpc:store_creds(CompSpec, Credentials, Conn), connection_authorized(FromPid, Conn, CompSpec). @@ -683,7 +716,7 @@ send_authorize(Pid, CompSpec) -> {LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_ADDRESS, bin(LocalIP) }, { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) }, { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl index ae0a605..16693ae 100644 --- a/components/dlink_tcp/src/gen_nb_server.erl +++ b/components/dlink_tcp/src/gen_nb_server.erl @@ -90,7 +90,8 @@ store_cb_state(CBState, State) when is_record(State, state) -> %% @doc Adds a new listener socket to be managed by gen_nb_server %% NOTE: Should only be called by the submodule -spec add_listen_socket({string(), integer()}, #state{}) -> {ok, #state{}} | {error, any()}. -add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) -> +add_listen_socket({IpAddr0, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) -> + IpAddr = str(IpAddr0), Key = {IpAddr, Port}, case dict:find(Key, Socks) of {ok, _} -> @@ -108,7 +109,8 @@ add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}= %% @doc Removes a new listener socket to be managed by gen_nb_server %% NOTE: Should only be called by the submodule -spec remove_listen_socket({string(), integer()}, #state{}) -> {error, not_listening} | {ok, #state{}}. -remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) -> +remove_listen_socket({IpAddr0, Port}, #state{socks=Socks, addrs=Addrs}=State) -> + IpAddr = str(IpAddr0), Key = {IpAddr, Port}, case dict:find(Key, Socks) of error -> @@ -119,6 +121,11 @@ remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) -> addrs=dict:erase(Sock, Addrs)}} end. +str(Addr) when is_list(Addr) -> + Addr; +str(Addr) when is_binary(Addr) -> + binary_to_list(Addr). + %% @doc Returns the callback module's state -spec init([atom()|any()]) -> {ok, #state{}} | {error, bad_init_state} | {error, any()}. init([CallbackModule, InitParams]) -> diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 6def59a..88c3d23 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -101,7 +101,7 @@ new_connection(IP, Port, Sock, State) -> %% IP and Port are garbage. We'll grab peername when we get our %% first data. %% Provide component spec as extra arg. - {ok, _P} = connection:setup(undefined, 0, Sock, + {ok, _P} = connection:setup(server, undefined, 0, Sock, dlink_tcp_rpc, handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. |