summaryrefslogtreecommitdiff
path: root/components/dlink_tcp/src/connection.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/dlink_tcp/src/connection.erl')
-rw-r--r--components/dlink_tcp/src/connection.erl99
1 files changed, 78 insertions, 21 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 7229b3c..1d4753e 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -26,13 +26,15 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([setup/6]).
+-export([setup/7]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
-export([terminate_connection/2]).
+-export([get_source_address/1]).
-define(SERVER, ?MODULE).
@@ -47,6 +49,8 @@
args = undefined,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ decode_st = <<>>,
+ frag_opts = [],
cs
}).
@@ -55,9 +59,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, CS) ->
+setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server ->
?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
+ case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) ->
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(IP, Port, Data) ->
case connection_manager:find_connection_by_address(IP, Port) of
{ok, Pid} ->
@@ -82,6 +88,9 @@ send(IP, Port, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -106,6 +115,9 @@ is_connection_up(IP, Port) ->
false
end.
+get_source_address(Pid) ->
+ gen_server:call(Pid, get_source_address).
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -124,11 +136,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
- case IP of
- undefined -> ok;
- _ -> connection_manager:add_connection(IP, Port, self())
- end,
+init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): self(): ~p", [self()]),
?debug("connection:init(): IP: ~p", [IP]),
?debug("connection:init(): Port: ~p", [Port]),
@@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Function: ~p", [Fun]),
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
func = Fun,
packet_mod = PktMod,
packet_st = PktSt,
- cs = CompSpec
+ frag_opts = FragOpts,
+ cs = rvi_common:save_source_address(
+ Role, Sock, rvi_common:set_value(role, Role, CompSpec))
}}.
get_module_config(Key, Default, CS) ->
@@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) ->
%% @end
%%--------------------------------------------------------------------
-
-handle_call(terminate_connection, _From, St) ->
+handle_call(get_source_address, _, #st{cs = CS} = St) ->
+ {reply, rvi_common:get_source_address(CS), St};
+handle_call(terminate_connection, _From, St) ->
?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.ip, St#st.port}]),
@@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
- ?debug("Encoded = ~p", [Encoded]),
gen_tcp:send(St#st.sock, Encoded),
{noreply, St#st{packet_st = PSt1}};
+handle_cast({send, Data, Opts}, #st{sock = Socket,
+ packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p",
+ [Opts, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE,
+ fun() ->
+ gen_tcp:send(Socket, Bin)
+ end),
+ {noreply, St1};
+handle_cast({send_data, Data}, #st{sock = Sock} = St) ->
+ gen_tcp:send(Sock, Data),
+ {noreply, St};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
packet_mod = PMod,
- packet_st = PSt} = State) ->
- ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]),
- case PMod:decode(Data, fun(Elems) ->
- handle_elements(Elems, State)
- end, PSt) of
- {ok, PSt1} ->
+ packet_st = PSt,
+ decode_st = DSt,
+ frag_opts = FragOpts} = State) ->
+ ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ",
+ [Data, PMod, PSt, IP, Port]),
+ case dlink_data:decode(Data, fun(Elems) ->
+ got_msg(Elems, State)
+ end, DSt, ?MODULE, FragOpts) of
+ {ok, DSt1} = Ok ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}};
+ {noreply, State#st{decode_st = DSt1}};
{error, Reason} ->
?error("decode failed, Reason = ~p", [Reason]),
{stop, Reason, State}
end;
+ %% case PMod:decode(Data, fun(Elems) ->
+ %% handle_elements(Elems, State)
+ %% end, PSt) of
+ %% {ok, PSt1} ->
+ %% inet:setopts(Sock, [{active, once}]),
+ %% {noreply, State#st{packet_st = PSt1}};
+ %% {error, Reason} ->
+ %% ?error("decode failed, Reason = ~p", [Reason]),
+ %% {stop, Reason, State}
+ %% end;
handle_info({tcp_closed, Sock},
#st { ip = IP,
@@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) ->
%% {ok, PSt1};
%% { ->
-handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
- ip = IP, port = Port}) ->
+%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) ->
+%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts),
+%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]),
+%% case MaybeF of
+%% true ->
+%% %% It was a fragment, but not a complete message yet
+%% St;
+%% {true, Msg} ->
+%% #st{packet_mod = PMod, packet_st = PSt} = St,
+%% PMod:decode(Msg, fun(Elems) ->
+%% got_msg(Elems, St)
+%% end, PSt);
+%% false ->
+%% got_msg(Elements, St)
+%% end.
+
+got_msg(Elements, #st{ip = IP, port = Port,
+ mod = Mod, func = Fun, cs = CS}) ->
?debug("data complete: Processed: ~p",
[authorize_keys:abbrev(Elements)]),
Mod:Fun(self(), IP, Port, data, Elements, CS).