diff options
Diffstat (limited to 'deps')
-rw-r--r-- | deps/exo/src/exo_http_server.erl | 4 | ||||
-rw-r--r-- | deps/exo/src/exo_socket.erl | 81 | ||||
-rw-r--r-- | deps/exo/src/exo_socket_server.erl | 45 | ||||
-rw-r--r-- | deps/exo/src/exo_socket_session.erl | 122 |
4 files changed, 147 insertions, 105 deletions
diff --git a/deps/exo/src/exo_http_server.erl b/deps/exo/src/exo_http_server.erl index ae19faa..15c8d06 100644 --- a/deps/exo/src/exo_http_server.erl +++ b/deps/exo/src/exo_http_server.erl @@ -68,7 +68,9 @@ start(Port, ServerOptions) -> [{active,once},{reuseaddr,true}, {verify, verify_none}, {keyfile, filename:join(Dir, "host.key")}, - {certfile, filename:join(Dir, "host.cert")}], + {certfile, filename:join(Dir, "host.cert")}, + {upgrade_timeout, 5000}, + {accept_timeout, 5000}], ?MODULE, ServerOptions). %%----------------------------------------------------------------------------- diff --git a/deps/exo/src/exo_socket.erl b/deps/exo/src/exo_socket.erl index dc961ac..6967a40 100644 --- a/deps/exo/src/exo_socket.erl +++ b/deps/exo/src/exo_socket.erl @@ -9,7 +9,7 @@ %%%---- END COPYRIGHT --------------------------------------------------------- %%% @author Tony Rogvall <tony@rogvall.se> %%% @doc -%%% EXO socket +%%% EXO socket %%% @end %%% Created : 15 Dec 2011 by Tony Rogvall <tony@rogvall.se> @@ -17,7 +17,7 @@ -export([listen/1, listen/2, listen/3]). --export([accept/1, accept/2]). +-export([accept/1, accept/2, accept/3]). -export([async_accept/1, async_accept/2]). -export([connect/2, connect/3, connect/4, connect/5]). %% -export([async_connect/2, async_connect/3, async_connect/4]). @@ -85,7 +85,7 @@ listen(Port, Protos=[tcp|_], Opts0) -> Error end. -%% +%% %% %% connect(Host, Port) -> @@ -108,7 +108,7 @@ connect(Host, Port, Protos=[tcp|_], Opts0, Timeout) -> TcpConnectOpts = [{active,false},{packet,0},{mode,binary}|TcpOpts1], case gen_tcp:connect(Host, Port, TcpConnectOpts, Timeout) of {ok, S} -> - X = + X = #exo_socket { mdata = gen_tcp, mctl = inet, protocol = Protos, @@ -214,11 +214,11 @@ connect_upgrade(X, Protos0, Timeout) -> {SSLOpts0,Opts1} = split_options(ssl_connect_opts(),Opts), {_,SSLOpts} = split_options([ssl_imp], SSLOpts0), ?dbg("SSL upgrade, options = ~w\n", [SSLOpts]), - ?dbg("exo_socket: before ssl:connect opts=~w\n", + ?dbg("exo_socket: before ssl:connect opts=~w\n", [getopts(X, [active,packet,mode])]), case ssl_connect(X#exo_socket.socket, SSLOpts, Timeout) of {ok,S1} -> - ?dbg("exo_socket: ssl:connect opt=~w\n", + ?dbg("exo_socket: ssl:connect opt=~w\n", [ssl:getopts(S1, [active,packet,mode])]), X1 = X#exo_socket { socket=S1, mdata = ssl, @@ -227,25 +227,25 @@ connect_upgrade(X, Protos0, Timeout) -> tags={ssl,ssl_closed,ssl_error}}, connect_upgrade(X1, Protos1, Timeout); Error={error,_Reason} -> - ?dbg("exo_socket: ssl:connect error=~w\n", + ?dbg("exo_socket: ssl:connect error=~w\n", [_Reason]), Error end; [http|Protos1] -> {_, Close,Error} = X#exo_socket.tags, - X1 = X#exo_socket { packet = http, + X1 = X#exo_socket { packet = http, tags = {http, Close, Error }}, connect_upgrade(X1, Protos1, Timeout); [] -> setopts(X, [{mode,X#exo_socket.mode}, {packet,X#exo_socket.packet}, {active,X#exo_socket.active}]), - ?dbg("exo_socket: after upgrade opts=~w\n", + ?dbg("exo_socket: after upgrade opts=~w\n", [getopts(X, [active,packet,mode])]), {ok,X} end. - -ssl_connect(Socket, Options, Timeout) -> + +ssl_connect(Socket, Options, Timeout) -> case ssl:connect(Socket, Options, Timeout) of {error, ssl_not_started} -> ssl:start(), @@ -288,28 +288,39 @@ async_socket(Listen, Socket, AuthOpts) ok -> {ok,Mod} = inet_db:lookup_socket(Listen#exo_socket.socket), inet_db:register_socket(Socket, Mod), - X = Listen#exo_socket { transport=Socket, socket=Socket }, - maybe_auth( - accept_upgrade(X, tl(X#exo_socket.protocol), infinity), - server, - X#exo_socket.opts ++ AuthOpts); + {ok, Listen#exo_socket { transport=Socket, socket=Socket, + opts = Listen#exo_socket.opts ++ AuthOpts }}; Error -> prim_inet:close(Socket), Error end; Error -> + ?debug("getopts() -> ~p", [Error]), prim_inet:close(Socket), Error end. - accept(X) when is_record(X, exo_socket) -> - accept_upgrade(X, X#exo_socket.protocol, infinity). - -accept(X, Timeout) when - is_record(X, exo_socket), + Timeout = proplists:get_value(accept_timeout, X#exo_socket.opts, infinity), + maybe_auth( + accept_upgrade(X, X#exo_socket.protocol, Timeout), + server, + X#exo_socket.opts). + +accept(X, Timeout) when + is_record(X, exo_socket), (Timeout =:= infnity orelse (is_integer(Timeout) andalso Timeout >= 0)) -> - accept_upgrade(X, X#exo_socket.protocol, Timeout). + maybe_auth( + accept_upgrade(X, X#exo_socket.protocol, Timeout), + server, + X#exo_socket.opts). + +accept(X, Protos, Timeout) when + is_record(X, exo_socket) -> + maybe_auth( + accept_upgrade(X, Protos, Timeout), + server, + X#exo_socket.opts). accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) -> ?dbg("exo_socket: accept protos=~w\n", [Protos0]), @@ -327,11 +338,11 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) -> {SSLOpts0,Opts1} = split_options(ssl_listen_opts(),Opts), {_,SSLOpts} = split_options([ssl_imp], SSLOpts0), ?dbg("SSL upgrade, options = ~w\n", [SSLOpts]), - ?dbg("exo_socket: before ssl_accept opt=~w\n", + ?dbg("exo_socket: before ssl_accept opt=~w\n", [getopts(X, [active,packet,mode])]), case ssl_accept(X#exo_socket.socket, SSLOpts, Timeout) of {ok,S1} -> - ?dbg("exo_socket: ssl_accept opt=~w\n", + ?dbg("exo_socket: ssl_accept opt=~w\n", [ssl:getopts(S1, [active,packet,mode])]), X1 = X#exo_socket{socket=S1, mdata = ssl, @@ -340,7 +351,7 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) -> tags={ssl,ssl_closed,ssl_error}}, accept_upgrade(X1, Protos1, Timeout); Error={error,_Reason} -> - ?dbg("exo_socket: ssl:ssl_accept error=~w\n", + ?dbg("exo_socket: ssl:ssl_accept error=~w\n", [_Reason]), Error end; @@ -348,14 +359,14 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) -> accept_probe_ssl(X,Protos1,Timeout); [http|Protos1] -> {_, Close,Error} = X#exo_socket.tags, - X1 = X#exo_socket { packet = http, + X1 = X#exo_socket { packet = http, tags = {http, Close, Error }}, accept_upgrade(X1,Protos1,Timeout); [] -> setopts(X, [{mode,X#exo_socket.mode}, {packet,X#exo_socket.packet}, {active,X#exo_socket.active}]), - ?dbg("exo_socket: after upgrade opts=~w\n", + ?dbg("exo_socket: after upgrade opts=~w\n", [getopts(X, [active,packet,mode])]), {ok,X} end. @@ -393,7 +404,7 @@ accept_probe_ssl(X=#exo_socket { mdata=M, socket=S, Error end. -ssl_accept(Socket, Options, Timeout) -> +ssl_accept(Socket, Options, Timeout) -> case ssl:ssl_accept(Socket, Options, Timeout) of {error, ssl_not_started} -> ssl:start(), @@ -417,7 +428,7 @@ request_type(<<ContentType:8, _Version:16, _Length:16, _/binary>>) -> end; request_type(_) -> undefined. - + %% %% exo_socket wrapper for socket operations %% @@ -426,7 +437,7 @@ close(#exo_socket { mdata = M, socket = S}) -> shutdown(#exo_socket { mdata = M, socket = S}, How) -> M:shutdown(S, How). - + send(#exo_socket { mdata = M,socket = S, mauth = A,auth_state = Sa} = X, Data) -> if A == undefined -> M:send(S, Data); @@ -520,11 +531,11 @@ tcp_connect_options() -> ssl_listen_opts() -> - [versions, verify, verify_fun, + [versions, verify, verify_fun, fail_if_no_peer_cert, verify_client_once, - depth, cert, certfile, key, keyfile, + depth, cert, certfile, key, keyfile, password, cacerts, cacertfile, dh, dhfile, cihpers, - %% deprecated soon + %% deprecated soon ssl_imp, %% always new! %% server verify_client_once, @@ -533,9 +544,9 @@ ssl_listen_opts() -> debug, hibernate_after, erl_dist ]. ssl_connect_opts() -> - [versions, verify, verify_fun, + [versions, verify, verify_fun, fail_if_no_peer_cert, - depth, cert, certfile, key, keyfile, + depth, cert, certfile, key, keyfile, password, cacerts, cacertfile, dh, dhfile, cihpers, debug]. diff --git a/deps/exo/src/exo_socket_server.erl b/deps/exo/src/exo_socket_server.erl index 08a3e4c..db67c5a 100644 --- a/deps/exo/src/exo_socket_server.erl +++ b/deps/exo/src/exo_socket_server.erl @@ -20,7 +20,7 @@ %% %% methods -%% init(Socket, Args) -> +%% init(Socket, Args) -> %% {ok, State'} %% {stop, Reason, State'} %% @@ -30,7 +30,7 @@ %% %% close(Socket, State) -> %% {ok, State'} -%% +%% %% error(Socket, Error, State) -> %% {ok, State'} %% {stop, Reason, State'} @@ -53,7 +53,7 @@ %% -define(debug(Fmt,Args), ok). %% -define(error(Fmt,Args), error_logger:format(Fmt, Args)). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -record(state, { listen, %% #exo_socket{} @@ -102,15 +102,15 @@ -spec behaviour_info(callbacks) -> list(). behaviour_info(callbacks) -> [ - {init, 2}, %% init(Socket::socket(), Args::[term()] + {init, 2}, %% init(Socket::socket(), Args::[term()] %% -> {ok,state()} | {stop,reason(),state()} - {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state()) + {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state()) %% -> {ok,state()}|{close,state()}|{stop,reason(),state()} {close, 2}, %% close(Socket::socket(), State::state()) %% -> {ok,state()} {error, 3}, %% error(Socket::socket(),Error::error(), State:state()) %% -> {ok,state()} | {stop,reason(),state()} - {control, 4} %% control(Socket::socket(), Request::term(), + {control, 4} %% control(Socket::socket(), Request::term(), %% From::term(), State:state()) %% -> {reply, Reply::term(),state()} | {noreply, state()} | %% {ignore, state()} | {send, Bin::binary(), state()} | @@ -175,15 +175,15 @@ init([Port,Protos,Options,Module,Args] = _X) -> {ok,Listen} -> case exo_socket:async_accept(Listen) of {ok, Ref} -> - {ok, #state{ listen = Listen, - active = Active, + {ok, #state{ listen = Listen, + active = Active, socket_reuse = Reuse, ref=Ref, - module=Module, + module=Module, args=Args }}; {error, Reason} -> - {stop,Reason} + {stop,Reason} end; {error,Reason} -> {stop,Reason} @@ -196,8 +196,8 @@ init([Port,Protos,Options,Module,Args] = _X) -> %% %% @end %%-------------------------------------------------------------------- --spec handle_call(Request::term(), - From::{pid(), Tag::term()}, +-spec handle_call(Request::term(), + From::{pid(), Tag::term()}, State::#state{}) -> {reply, Reply::term(), State::#state{}} | {noreply, State::#state{}} | @@ -271,28 +271,34 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when +handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when (State#state.listen)#exo_socket.socket =:= LSocket, Ref =:= State#state.ref -> ?debug("<-- ~p~n", [_Msg]), Listen = State#state.listen, + AcceptTimeout = proplists:get_value(accept_timeout, Listen#exo_socket.opts, infinity), NewAccept = exo_socket:async_accept(Listen), - case exo_socket:async_socket(Listen, Socket, [delay_auth]) of + case exo_socket:async_socket(Listen, Socket, [{delay_auth, true}]) of {ok, XSocket} -> - case exo_socket_session:start(XSocket, + F = fun() -> + exo_socket:accept( + XSocket, tl(XSocket#exo_socket.protocol), AcceptTimeout) + end, + XSocketFun = {XSocket, F}, + case exo_socket_session:start(XSocketFun, State#state.module, State#state.args) of - {ok,Pid} -> + {ok, Pid} -> exo_socket:controlling_process(XSocket, Pid), gen_server:cast(Pid, {activate,State#state.active}); _Error -> exo_socket:close(XSocket) end; - _Error -> - error + _Error -> + error end, case NewAccept of - {ok,Ref1} -> + {ok, Ref1} -> {noreply, State#state { ref = Ref1 }}; {error, Reason} -> {stop, Reason, State} @@ -436,4 +442,3 @@ send_reuse_message(Host, Port, Args, M, MyPort, XSocket, RUSt) -> ReuseMsg = exo_socket_session:encode_reuse( MyPort, ReuseOpts), exo_socket:send(XSocket, ReuseMsg). - diff --git a/deps/exo/src/exo_socket_session.erl b/deps/exo/src/exo_socket_session.erl index 623b301..f4518db 100644 --- a/deps/exo/src/exo_socket_session.erl +++ b/deps/exo/src/exo_socket_session.erl @@ -19,21 +19,21 @@ -behaviour(gen_server). %% API --export([start/3, +-export([start/3, start_link/3]). %% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, +-export([init/1, + handle_call/3, + handle_cast/2, handle_info/2, - terminate/2, + terminate/2, code_change/3]). --export([encode_reuse/2, +-export([encode_reuse/2, decode_reuse_config/1]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). -record(state, { module, @@ -90,6 +90,7 @@ start(XSocket, Module, Args) -> %% @end %%-------------------------------------------------------------------- init([XSocket, Module, Args]) -> + ?debug("init(~p, ~p, ~p)", [XSocket, Module, Args]), {ok, #state{ socket=XSocket, module=Module, args=Args, @@ -104,23 +105,23 @@ init([XSocket, Module, Args]) -> %% %% @end %%-------------------------------------------------------------------- --spec handle_call(Request::term(), - From::{pid(), Tag::term()}, +-spec handle_call(Request::term(), + From::{pid(), Tag::term()}, State::#state{}) -> {reply, Reply::term(), State::#state{}} | {noreply, State::#state{}} | {stop, Reason::atom(), Reply::term(), State::#state{}}. %% No 'local' handle_call -handle_call(Request, From, +handle_call(Request, From, State=#state{module = M, state = MSt, socket = Socket}) -> ?dbg("handle_call: ~p", [Request]), try M:control(Socket, Request, From, MSt) of - Result -> + Result -> ?dbg("handle_call: reply ~p", [Result]), mod_reply(Result, From, State) catch - error:_Error -> + error:_Error -> ?dbg("handle_call: catch reason ~p", [_Error]), ret({reply, {error, unknown_call}, State}) end. @@ -172,40 +173,24 @@ send_(Bin, From, #state{socket = S, pending = P} = State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({activate,Active}, State0) -> +handle_cast({activate,Active}, #state{socket = XSocket0} = State0) -> ?dbg("activate~n", []), - try exo_socket:authenticate(State0#state.socket) of - {ok, S} -> - ?dbg("authentication done~n", []), - State = State0#state{socket = S}, - case apply(State#state.module, init, - [State#state.socket,State#state.args]) of - Ok when element(1, Ok) == ok -> - CSt0 = element(2, Ok), - %% enable active mode here (if ever wanted) once is handled, - %% automatically anyway. exit_on_close is default and - %% allow session statistics retrieval in the close callback - SessionOpts = [{active,Active},{exit_on_close, false}], - - _Res = exo_socket:setopts(State#state.socket, SessionOpts), - ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]), - State1 = State#state { active = Active, state = CSt0 }, - case Ok of - {_, _, Timeout} -> - ret({noreply, State1, Timeout}); - {_, _} -> - ret({noreply, State1}) - end; - {stop,Reason,CSt1} -> - {stop, Reason, State#state { state = CSt1 }} + case XSocket0 of + {#exo_socket{}, Fun} when is_function(Fun, 0) -> + try Fun() of + {ok, XSocket} -> + activate_(Active, State0#state{socket = XSocket}); + {error, _} = Error -> + ?debug("socket fun -> ~p", [Error]), + {stop, Error, State0} + catch + Cat:Exception -> + ?debug("caught ~p:~p from socket fun", [Cat, Exception]), + {stop, Exception, State0} end; - {error, Reason} -> - {stop, {auth_failure, Reason}, State0} - catch - error:Crash -> - {stop, {auth_failure, Crash}, State0} + #exo_socket{}-> + activate_(Active, State0) end; - handle_cast(_Msg, State) -> ret({noreply, State}). @@ -223,9 +208,9 @@ handle_info(timeout, State) -> exo_socket:shutdown(State#state.socket, write), ?dbg("exo_socket_session: idle_timeout~p~n", [self()]), {stop, normal, State}; -handle_info({Tag,Socket,Data0}, State) when +handle_info({Tag,Socket,Data0}, State) when %% FIXME: put socket tag in State for correct matching - (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http), + (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http), Socket =:= (State#state.socket)#exo_socket.socket -> ?dbg("exo_socket_session: got data ~p\n", [{Tag,Socket,Data0}]), try exo_socket:auth_incoming(State#state.socket, Data0) of @@ -247,7 +232,7 @@ handle_info({Tag,Socket}, State) when {ok,CSt1} -> {stop, normal, State#state { state = CSt1 }} end; -handle_info({Tag,Socket,Error}, State) when +handle_info({Tag,Socket,Error}, State) when (Tag =:= tcp_error orelse Tag =:= ssl_error), Socket =:= (State#state.socket)#exo_socket.socket -> ?dbg("exo_socket_session: got error ~p\n", [{Tag,Socket,Error}]), @@ -258,7 +243,7 @@ handle_info({Tag,Socket,Error}, State) when {stop,Reason,CSt1} -> {stop, Reason, State#state { state = CSt1 }} end; - + handle_info(_Info, State) -> ?dbg("Got info: ~p\n", [_Info]), ret({noreply, State}). @@ -275,9 +260,14 @@ handle_info(_Info, State) -> %% @end %%-------------------------------------------------------------------- terminate(_Reason, State) -> - exo_socket:close(State#state.socket), + socket_close(State#state.socket), ok. +socket_close({#exo_socket{} = S, _}) -> + exo_socket:close(S); +socket_close(#exo_socket{} = S) -> + exo_socket:close(S). + %%-------------------------------------------------------------------- %% @private %% @doc @@ -293,6 +283,40 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== +activate_(Active, State0) -> + try exo_socket:authenticate(State0#state.socket) of + {ok, S} -> + ?dbg("authentication done~n", []), + State = State0#state{socket = S}, + case apply(State#state.module, init, + [State#state.socket,State#state.args]) of + Ok when element(1, Ok) == ok -> + CSt0 = element(2, Ok), + %% enable active mode here (if ever wanted) once is handled, + %% automatically anyway. exit_on_close is default and + %% allow session statistics retrieval in the close callback + SessionOpts = [{active,Active},{exit_on_close, false}], + + _Res = exo_socket:setopts(State#state.socket, SessionOpts), + ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]), + State1 = State#state { active = Active, state = CSt0 }, + case Ok of + {_, _, Timeout} -> + ret({noreply, State1, Timeout}); + {_, _} -> + ret({noreply, State1}) + end; + {stop,Reason,CSt1} -> + {stop, Reason, State#state { state = CSt1 }} + end; + {error, Reason} -> + {stop, {auth_failure, Reason}, State0} + catch + error:Crash -> + {stop, {auth_failure, Crash}, State0} + end. + + ret({noreply, #state{idle_timeout = T} = S}) -> if T==undefined -> {noreply, S}; true -> {noreply, S, T} @@ -333,7 +357,7 @@ handle_reuse_data(Rest, #state{module = M, state = MSt} = State) -> handle_socket_data(Data, State) -> CSt0 = State#state.state, - ModResult = apply(State#state.module, data, + ModResult = apply(State#state.module, data, [State#state.socket,Data,CSt0]), ?dbg("handle_socket_data: result ~p", [ModResult]), handle_module_result(ModResult, State). |