diff options
Diffstat (limited to 'components/dlink_tcp/src/connection.erl')
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 99 |
1 files changed, 78 insertions, 21 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 7229b3c..1d4753e 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -26,13 +26,15 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([setup/6]). +-export([setup/7]). -export([send/2]). -export([send/3]). +-export([send_data/2]). -export([is_connection_up/1]). -export([is_connection_up/2]). -export([terminate_connection/1]). -export([terminate_connection/2]). +-export([get_source_address/1]). -define(SERVER, ?MODULE). @@ -47,6 +49,8 @@ args = undefined, packet_mod = ?PACKET_MOD, packet_st = [], + decode_st = <<>>, + frag_opts = [], cs }). @@ -55,9 +59,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, CS) -> +setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server -> ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of + case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) -> send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). +send(Pid, Data, Opts) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data, Opts}); send(IP, Port, Data) -> case connection_manager:find_connection_by_address(IP, Port) of {ok, Pid} -> @@ -82,6 +88,9 @@ send(IP, Port, Data) -> end. +send_data(Pid, Data) -> + gen_server:cast(Pid, {send_data, Data}). + terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). @@ -106,6 +115,9 @@ is_connection_up(IP, Port) -> false end. +get_source_address(Pid) -> + gen_server:call(Pid, get_source_address). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -124,11 +136,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, CompSpec}) -> - case IP of - undefined -> ok; - _ -> connection_manager:add_connection(IP, Port, self()) - end, +init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): self(): ~p", [self()]), ?debug("connection:init(): IP: ~p", [IP]), ?debug("connection:init(): Port: ~p", [Port]), @@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): Function: ~p", [Fun]), {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), PktSt = PktMod:init(CompSpec), + {ok, FragOpts} = get_module_config( + frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec), {ok, #st{ ip = IP, port = Port, @@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> func = Fun, packet_mod = PktMod, packet_st = PktSt, - cs = CompSpec + frag_opts = FragOpts, + cs = rvi_common:save_source_address( + Role, Sock, rvi_common:set_value(role, Role, CompSpec)) }}. get_module_config(Key, Default, CS) -> @@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) -> %% @end %%-------------------------------------------------------------------- - -handle_call(terminate_connection, _From, St) -> +handle_call(get_source_address, _, #st{cs = CS} = St) -> + {reply, rvi_common:get_source_address(CS), St}; +handle_call(terminate_connection, _From, St) -> ?debug("~p:handle_call(terminate_connection): Terminating: ~p", [ ?MODULE, {St#st.ip, St#st.port}]), @@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), - ?debug("Encoded = ~p", [Encoded]), gen_tcp:send(St#st.sock, Encoded), {noreply, St#st{packet_st = PSt1}}; +handle_cast({send, Data, Opts}, #st{sock = Socket, + packet_mod = PMod, + packet_st = PSt, + frag_opts = FragOpts} = St) -> + ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p", + [Opts, FragOpts]), + {ok, Bin, PSt1} = PMod:encode(Data, PSt), + St1 = St#st{packet_st = PSt1}, + rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, + fun() -> + gen_tcp:send(Socket, Bin) + end), + {noreply, St1}; +handle_cast({send_data, Data}, #st{sock = Sock} = St) -> + gen_tcp:send(Sock, Data), + {noreply, St}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, packet_mod = PMod, - packet_st = PSt} = State) -> - ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]), - case PMod:decode(Data, fun(Elems) -> - handle_elements(Elems, State) - end, PSt) of - {ok, PSt1} -> + packet_st = PSt, + decode_st = DSt, + frag_opts = FragOpts} = State) -> + ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ", + [Data, PMod, PSt, IP, Port]), + case dlink_data:decode(Data, fun(Elems) -> + got_msg(Elems, State) + end, DSt, ?MODULE, FragOpts) of + {ok, DSt1} = Ok -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st{packet_st = PSt1}}; + {noreply, State#st{decode_st = DSt1}}; {error, Reason} -> ?error("decode failed, Reason = ~p", [Reason]), {stop, Reason, State} end; + %% case PMod:decode(Data, fun(Elems) -> + %% handle_elements(Elems, State) + %% end, PSt) of + %% {ok, PSt1} -> + %% inet:setopts(Sock, [{active, once}]), + %% {noreply, State#st{packet_st = PSt1}}; + %% {error, Reason} -> + %% ?error("decode failed, Reason = ~p", [Reason]), + %% {stop, Reason, State} + %% end; handle_info({tcp_closed, Sock}, #st { ip = IP, @@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) -> %% {ok, PSt1}; %% { -> -handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, - ip = IP, port = Port}) -> +%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) -> +%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts), +%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]), +%% case MaybeF of +%% true -> +%% %% It was a fragment, but not a complete message yet +%% St; +%% {true, Msg} -> +%% #st{packet_mod = PMod, packet_st = PSt} = St, +%% PMod:decode(Msg, fun(Elems) -> +%% got_msg(Elems, St) +%% end, PSt); +%% false -> +%% got_msg(Elements, St) +%% end. + +got_msg(Elements, #st{ip = IP, port = Port, + mod = Mod, func = Fun, cs = CS}) -> ?debug("data complete: Processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), IP, Port, data, Elements, CS). |