path: root/components/dlink_tcp/src/connection.erl
diff options
Diffstat (limited to 'components/dlink_tcp/src/connection.erl')
1 files changed, 65 insertions, 45 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 77300d9..b24215c 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -36,6 +36,7 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
ip = {0,0,0,0},
@@ -44,7 +45,9 @@
mod = undefined,
func = undefined,
args = undefined,
- pst = undefined %% Payload state
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ cs
@@ -52,8 +55,9 @@
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, Arg) ->
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of
+setup(IP, Port, Sock, Mod, Fun, CS) ->
+ ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
+ case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -120,7 +124,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, Arg}) ->
+init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> connection_manager:add_connection(IP, Port, self())
@@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- ?debug("connection:init(): Arg: ~p", [Arg]),
- %% Grab socket control
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
+ PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
port = Port,
sock = Sock,
mod = Mod,
func = Fun,
- args = Arg,
- pst = undefined
+ packet_mod = PktMod,
+ packet_st = PktSt,
+ cs = CompSpec
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
%% @private
@@ -182,13 +189,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
-handle_cast({send, Data}, St) ->
+handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ gen_tcp:send(St#st.sock, Encoded),
- gen_tcp:send(St#st.sock, Data),
- {noreply, St};
+ {noreply, St#st{packet_st = PSt1}};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data},
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- args = Arg,
- pst = PST} = State) ->
+ packet_mod = PMod,
+ packet_st = PSt} = State) ->
?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
- case jsx_decode_stream(Data, PST) of
- { [], NPST } ->
- ?debug("handle_info(data incomplete)", []),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st { pst = NPST} };
- { JSONElements, NPST } ->
- ?debug("data complete: Processed: ~p",
- [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]),
- FromPid = self(),
- [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg)
- || SingleElem <- JSONElements],
- inet:setopts(Sock, [ { active, once } ]),
- {noreply, State#st { pst = NPST} }
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
handle_info({tcp_closed, Sock},
#st { ip = IP,
port = Port,
@@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock},
func = Fun,
args = Arg } = State) ->
?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port,closed, Arg),
+ Mod:Fun(self(), IP, Port, closed, Arg),
{stop, normal, State};
@@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
-jsx_decode_stream(Data, St) ->
- jsx_decode_stream(Data, St, []).
-jsx_decode_stream(Data, undefined, Acc) ->
- case jsx:decode(Data, [stream, return_tail]) of
- {incomplete, Cont} ->
- {lists:reverse(Acc), Cont};
- {with_tail, Elems, <<>>} ->
- {lists:reverse([Elems|Acc]), undefined};
- {with_tail, Elems, Rest} ->
- jsx_decode_stream(Rest, undefined, [Elems|Acc])
- end.
+%% jsx_decode_stream(Data, St) ->
+%% jsx_decode_stream(Data, St, []).
+%% jsx_decode_stream(Data, undefined, Acc) ->
+%% case jsx:decode(Data, [stream, return_tail]) of
+%% {incomplete, Cont} ->
+%% {lists:reverse(Acc), Cont};
+%% {with_tail, Elems, <<>>} ->
+%% {lists:reverse([Elems|Acc]), undefined};
+%% {with_tail, Elems, Rest} ->
+%% jsx_decode_stream(Rest, undefined, [Elems|Acc])
+%% end.
+%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) ->
+%% case PMod:decode(Data, PSt) of
+%% {ok, Elements, PSt1} ->
+%% ?debug("data complete: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% {ok, PSt1};
+%% {more, Elements, Rest, PSt1} ->
+%% ?debug("data complete with Rest: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS);
+%% {more, PSt1} ->
+%% {ok, PSt1};
+%% { ->
+handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("data complete: Processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS).