diff options
Diffstat (limited to 'components/dlink_tcp/src/connection.erl')
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 110 |
1 files changed, 65 insertions, 45 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 77300d9..b24215c 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -36,6 +36,7 @@ -define(SERVER, ?MODULE). +-define(PACKET_MOD, dlink_data_json). -record(st, { ip = {0,0,0,0}, @@ -44,7 +45,9 @@ mod = undefined, func = undefined, args = undefined, - pst = undefined %% Payload state + packet_mod = ?PACKET_MOD, + packet_st = [], + cs }). %%%=================================================================== @@ -52,8 +55,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, Arg) -> - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of +setup(IP, Port, Sock, Mod, Fun, CS) -> + ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), + case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -120,7 +124,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, Arg}) -> +init({IP, Port, Sock, Mod, Fun, CompSpec}) -> case IP of undefined -> ok; _ -> connection_manager:add_connection(IP, Port, self()) @@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) -> ?debug("connection:init(): Sock: ~p", [Sock]), ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), - ?debug("connection:init(): Arg: ~p", [Arg]), - %% Grab socket control + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), + PktSt = PktMod:init(CompSpec), {ok, #st{ ip = IP, port = Port, sock = Sock, mod = Mod, func = Fun, - args = Arg, - pst = undefined + packet_mod = PktMod, + packet_st = PktSt, + cs = CompSpec }}. +get_module_config(Key, Default, CS) -> + rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS). %%-------------------------------------------------------------------- %% @private @@ -182,13 +189,14 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({send, Data}, St) -> +handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), + {ok, Encoded, PSt1} = PMod:encode(Data, PSt), + ?debug("Encoded = ~p", [Encoded]), + gen_tcp:send(St#st.sock, Encoded), - gen_tcp:send(St#st.sock, Data), - - {noreply, St}; + {noreply, St#st{packet_st = PSt1}}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data}, handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, - mod = Mod, - func = Fun, - args = Arg, - pst = PST} = State) -> + packet_mod = PMod, + packet_st = PSt} = State) -> ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), - - case jsx_decode_stream(Data, PST) of - { [], NPST } -> - ?debug("handle_info(data incomplete)", []), + case PMod:decode(Data, fun(Elems) -> + handle_elements(Elems, State) + end, PSt) of + {ok, PSt1} -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st { pst = NPST} }; - - { JSONElements, NPST } -> - ?debug("data complete: Processed: ~p", - [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]), - FromPid = self(), - [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg) - || SingleElem <- JSONElements], - inet:setopts(Sock, [ { active, once } ]), - {noreply, State#st { pst = NPST} } + {noreply, State#st{packet_st = PSt1}}; + {error, Reason} -> + ?error("decode failed, Reason = ~p", [Reason]), + {stop, Reason, State} end; - - handle_info({tcp_closed, Sock}, #st { ip = IP, port = Port, @@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock}, func = Fun, args = Arg } = State) -> ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]), - Mod:Fun(self(), IP, Port,closed, Arg), + Mod:Fun(self(), IP, Port, closed, Arg), gen_tcp:close(Sock), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; @@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -jsx_decode_stream(Data, St) -> - jsx_decode_stream(Data, St, []). - -jsx_decode_stream(Data, undefined, Acc) -> - case jsx:decode(Data, [stream, return_tail]) of - {incomplete, Cont} -> - {lists:reverse(Acc), Cont}; - {with_tail, Elems, <<>>} -> - {lists:reverse([Elems|Acc]), undefined}; - {with_tail, Elems, Rest} -> - jsx_decode_stream(Rest, undefined, [Elems|Acc]) - end. +%% jsx_decode_stream(Data, St) -> +%% jsx_decode_stream(Data, St, []). + +%% jsx_decode_stream(Data, undefined, Acc) -> +%% case jsx:decode(Data, [stream, return_tail]) of +%% {incomplete, Cont} -> +%% {lists:reverse(Acc), Cont}; +%% {with_tail, Elems, <<>>} -> +%% {lists:reverse([Elems|Acc]), undefined}; +%% {with_tail, Elems, Rest} -> +%% jsx_decode_stream(Rest, undefined, [Elems|Acc]) +%% end. + +%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) -> +%% case PMod:decode(Data, PSt) of +%% {ok, Elements, PSt1} -> +%% ?debug("data complete: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% {ok, PSt1}; +%% {more, Elements, Rest, PSt1} -> +%% ?debug("data complete with Rest: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS); +%% {more, PSt1} -> +%% {ok, PSt1}; +%% { -> + +handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, + ip = IP, port = Port}) -> + ?debug("data complete: Processed: ~p", + [authorize_keys:abbrev(Elements)]), + Mod:Fun(self(), IP, Port, data, Elements, CS). |