summaryrefslogtreecommitdiff
path: root/deps
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
committerUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
commit6cfeffca9f8e93e45dd885702a77896e2a1d0951 (patch)
tree620e2dd9006b52df7129d135fa7256d793571df1 /deps
parent7d098a34b25704dbaa8bea0217ca6b7be37a0e48 (diff)
downloadrvi_core-6cfeffca9f8e93e45dd885702a77896e2a1d0951.tar.gz
new protocol & setup scripts
Diffstat (limited to 'deps')
-rw-r--r--deps/exo/src/exo_http_server.erl4
-rw-r--r--deps/exo/src/exo_socket.erl81
-rw-r--r--deps/exo/src/exo_socket_server.erl45
-rw-r--r--deps/exo/src/exo_socket_session.erl122
4 files changed, 147 insertions, 105 deletions
diff --git a/deps/exo/src/exo_http_server.erl b/deps/exo/src/exo_http_server.erl
index ae19faa..15c8d06 100644
--- a/deps/exo/src/exo_http_server.erl
+++ b/deps/exo/src/exo_http_server.erl
@@ -68,7 +68,9 @@ start(Port, ServerOptions) ->
[{active,once},{reuseaddr,true},
{verify, verify_none},
{keyfile, filename:join(Dir, "host.key")},
- {certfile, filename:join(Dir, "host.cert")}],
+ {certfile, filename:join(Dir, "host.cert")},
+ {upgrade_timeout, 5000},
+ {accept_timeout, 5000}],
?MODULE, ServerOptions).
%%-----------------------------------------------------------------------------
diff --git a/deps/exo/src/exo_socket.erl b/deps/exo/src/exo_socket.erl
index dc961ac..6967a40 100644
--- a/deps/exo/src/exo_socket.erl
+++ b/deps/exo/src/exo_socket.erl
@@ -9,7 +9,7 @@
%%%---- END COPYRIGHT ---------------------------------------------------------
%%% @author Tony Rogvall <tony@rogvall.se>
%%% @doc
-%%% EXO socket
+%%% EXO socket
%%% @end
%%% Created : 15 Dec 2011 by Tony Rogvall <tony@rogvall.se>
@@ -17,7 +17,7 @@
-export([listen/1, listen/2, listen/3]).
--export([accept/1, accept/2]).
+-export([accept/1, accept/2, accept/3]).
-export([async_accept/1, async_accept/2]).
-export([connect/2, connect/3, connect/4, connect/5]).
%% -export([async_connect/2, async_connect/3, async_connect/4]).
@@ -85,7 +85,7 @@ listen(Port, Protos=[tcp|_], Opts0) ->
Error
end.
-%%
+%%
%%
%%
connect(Host, Port) ->
@@ -108,7 +108,7 @@ connect(Host, Port, Protos=[tcp|_], Opts0, Timeout) ->
TcpConnectOpts = [{active,false},{packet,0},{mode,binary}|TcpOpts1],
case gen_tcp:connect(Host, Port, TcpConnectOpts, Timeout) of
{ok, S} ->
- X =
+ X =
#exo_socket { mdata = gen_tcp,
mctl = inet,
protocol = Protos,
@@ -214,11 +214,11 @@ connect_upgrade(X, Protos0, Timeout) ->
{SSLOpts0,Opts1} = split_options(ssl_connect_opts(),Opts),
{_,SSLOpts} = split_options([ssl_imp], SSLOpts0),
?dbg("SSL upgrade, options = ~w\n", [SSLOpts]),
- ?dbg("exo_socket: before ssl:connect opts=~w\n",
+ ?dbg("exo_socket: before ssl:connect opts=~w\n",
[getopts(X, [active,packet,mode])]),
case ssl_connect(X#exo_socket.socket, SSLOpts, Timeout) of
{ok,S1} ->
- ?dbg("exo_socket: ssl:connect opt=~w\n",
+ ?dbg("exo_socket: ssl:connect opt=~w\n",
[ssl:getopts(S1, [active,packet,mode])]),
X1 = X#exo_socket { socket=S1,
mdata = ssl,
@@ -227,25 +227,25 @@ connect_upgrade(X, Protos0, Timeout) ->
tags={ssl,ssl_closed,ssl_error}},
connect_upgrade(X1, Protos1, Timeout);
Error={error,_Reason} ->
- ?dbg("exo_socket: ssl:connect error=~w\n",
+ ?dbg("exo_socket: ssl:connect error=~w\n",
[_Reason]),
Error
end;
[http|Protos1] ->
{_, Close,Error} = X#exo_socket.tags,
- X1 = X#exo_socket { packet = http,
+ X1 = X#exo_socket { packet = http,
tags = {http, Close, Error }},
connect_upgrade(X1, Protos1, Timeout);
[] ->
setopts(X, [{mode,X#exo_socket.mode},
{packet,X#exo_socket.packet},
{active,X#exo_socket.active}]),
- ?dbg("exo_socket: after upgrade opts=~w\n",
+ ?dbg("exo_socket: after upgrade opts=~w\n",
[getopts(X, [active,packet,mode])]),
{ok,X}
end.
-
-ssl_connect(Socket, Options, Timeout) ->
+
+ssl_connect(Socket, Options, Timeout) ->
case ssl:connect(Socket, Options, Timeout) of
{error, ssl_not_started} ->
ssl:start(),
@@ -288,28 +288,39 @@ async_socket(Listen, Socket, AuthOpts)
ok ->
{ok,Mod} = inet_db:lookup_socket(Listen#exo_socket.socket),
inet_db:register_socket(Socket, Mod),
- X = Listen#exo_socket { transport=Socket, socket=Socket },
- maybe_auth(
- accept_upgrade(X, tl(X#exo_socket.protocol), infinity),
- server,
- X#exo_socket.opts ++ AuthOpts);
+ {ok, Listen#exo_socket { transport=Socket, socket=Socket,
+ opts = Listen#exo_socket.opts ++ AuthOpts }};
Error ->
prim_inet:close(Socket),
Error
end;
Error ->
+ ?debug("getopts() -> ~p", [Error]),
prim_inet:close(Socket),
Error
end.
-
accept(X) when is_record(X, exo_socket) ->
- accept_upgrade(X, X#exo_socket.protocol, infinity).
-
-accept(X, Timeout) when
- is_record(X, exo_socket),
+ Timeout = proplists:get_value(accept_timeout, X#exo_socket.opts, infinity),
+ maybe_auth(
+ accept_upgrade(X, X#exo_socket.protocol, Timeout),
+ server,
+ X#exo_socket.opts).
+
+accept(X, Timeout) when
+ is_record(X, exo_socket),
(Timeout =:= infnity orelse (is_integer(Timeout) andalso Timeout >= 0)) ->
- accept_upgrade(X, X#exo_socket.protocol, Timeout).
+ maybe_auth(
+ accept_upgrade(X, X#exo_socket.protocol, Timeout),
+ server,
+ X#exo_socket.opts).
+
+accept(X, Protos, Timeout) when
+ is_record(X, exo_socket) ->
+ maybe_auth(
+ accept_upgrade(X, Protos, Timeout),
+ server,
+ X#exo_socket.opts).
accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
?dbg("exo_socket: accept protos=~w\n", [Protos0]),
@@ -327,11 +338,11 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
{SSLOpts0,Opts1} = split_options(ssl_listen_opts(),Opts),
{_,SSLOpts} = split_options([ssl_imp], SSLOpts0),
?dbg("SSL upgrade, options = ~w\n", [SSLOpts]),
- ?dbg("exo_socket: before ssl_accept opt=~w\n",
+ ?dbg("exo_socket: before ssl_accept opt=~w\n",
[getopts(X, [active,packet,mode])]),
case ssl_accept(X#exo_socket.socket, SSLOpts, Timeout) of
{ok,S1} ->
- ?dbg("exo_socket: ssl_accept opt=~w\n",
+ ?dbg("exo_socket: ssl_accept opt=~w\n",
[ssl:getopts(S1, [active,packet,mode])]),
X1 = X#exo_socket{socket=S1,
mdata = ssl,
@@ -340,7 +351,7 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
tags={ssl,ssl_closed,ssl_error}},
accept_upgrade(X1, Protos1, Timeout);
Error={error,_Reason} ->
- ?dbg("exo_socket: ssl:ssl_accept error=~w\n",
+ ?dbg("exo_socket: ssl:ssl_accept error=~w\n",
[_Reason]),
Error
end;
@@ -348,14 +359,14 @@ accept_upgrade(X=#exo_socket { mdata = M }, Protos0, Timeout) ->
accept_probe_ssl(X,Protos1,Timeout);
[http|Protos1] ->
{_, Close,Error} = X#exo_socket.tags,
- X1 = X#exo_socket { packet = http,
+ X1 = X#exo_socket { packet = http,
tags = {http, Close, Error }},
accept_upgrade(X1,Protos1,Timeout);
[] ->
setopts(X, [{mode,X#exo_socket.mode},
{packet,X#exo_socket.packet},
{active,X#exo_socket.active}]),
- ?dbg("exo_socket: after upgrade opts=~w\n",
+ ?dbg("exo_socket: after upgrade opts=~w\n",
[getopts(X, [active,packet,mode])]),
{ok,X}
end.
@@ -393,7 +404,7 @@ accept_probe_ssl(X=#exo_socket { mdata=M, socket=S,
Error
end.
-ssl_accept(Socket, Options, Timeout) ->
+ssl_accept(Socket, Options, Timeout) ->
case ssl:ssl_accept(Socket, Options, Timeout) of
{error, ssl_not_started} ->
ssl:start(),
@@ -417,7 +428,7 @@ request_type(<<ContentType:8, _Version:16, _Length:16, _/binary>>) ->
end;
request_type(_) ->
undefined.
-
+
%%
%% exo_socket wrapper for socket operations
%%
@@ -426,7 +437,7 @@ close(#exo_socket { mdata = M, socket = S}) ->
shutdown(#exo_socket { mdata = M, socket = S}, How) ->
M:shutdown(S, How).
-
+
send(#exo_socket { mdata = M,socket = S, mauth = A,auth_state = Sa} = X, Data) ->
if A == undefined ->
M:send(S, Data);
@@ -520,11 +531,11 @@ tcp_connect_options() ->
ssl_listen_opts() ->
- [versions, verify, verify_fun,
+ [versions, verify, verify_fun,
fail_if_no_peer_cert, verify_client_once,
- depth, cert, certfile, key, keyfile,
+ depth, cert, certfile, key, keyfile,
password, cacerts, cacertfile, dh, dhfile, cihpers,
- %% deprecated soon
+ %% deprecated soon
ssl_imp, %% always new!
%% server
verify_client_once,
@@ -533,9 +544,9 @@ ssl_listen_opts() ->
debug, hibernate_after, erl_dist ].
ssl_connect_opts() ->
- [versions, verify, verify_fun,
+ [versions, verify, verify_fun,
fail_if_no_peer_cert,
- depth, cert, certfile, key, keyfile,
+ depth, cert, certfile, key, keyfile,
password, cacerts, cacertfile, dh, dhfile, cihpers,
debug].
diff --git a/deps/exo/src/exo_socket_server.erl b/deps/exo/src/exo_socket_server.erl
index 08a3e4c..db67c5a 100644
--- a/deps/exo/src/exo_socket_server.erl
+++ b/deps/exo/src/exo_socket_server.erl
@@ -20,7 +20,7 @@
%%
%% methods
-%% init(Socket, Args) ->
+%% init(Socket, Args) ->
%% {ok, State'}
%% {stop, Reason, State'}
%%
@@ -30,7 +30,7 @@
%%
%% close(Socket, State) ->
%% {ok, State'}
-%%
+%%
%% error(Socket, Error, State) ->
%% {ok, State'}
%% {stop, Reason, State'}
@@ -53,7 +53,7 @@
%% -define(debug(Fmt,Args), ok).
%% -define(error(Fmt,Args), error_logger:format(Fmt, Args)).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(state, {
listen, %% #exo_socket{}
@@ -102,15 +102,15 @@
-spec behaviour_info(callbacks) -> list().
behaviour_info(callbacks) ->
[
- {init, 2}, %% init(Socket::socket(), Args::[term()]
+ {init, 2}, %% init(Socket::socket(), Args::[term()]
%% -> {ok,state()} | {stop,reason(),state()}
- {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state())
+ {data, 3}, %% data(Socket::socket(), Data::io_list(), State::state())
%% -> {ok,state()}|{close,state()}|{stop,reason(),state()}
{close, 2}, %% close(Socket::socket(), State::state())
%% -> {ok,state()}
{error, 3}, %% error(Socket::socket(),Error::error(), State:state())
%% -> {ok,state()} | {stop,reason(),state()}
- {control, 4} %% control(Socket::socket(), Request::term(),
+ {control, 4} %% control(Socket::socket(), Request::term(),
%% From::term(), State:state())
%% -> {reply, Reply::term(),state()} | {noreply, state()} |
%% {ignore, state()} | {send, Bin::binary(), state()} |
@@ -175,15 +175,15 @@ init([Port,Protos,Options,Module,Args] = _X) ->
{ok,Listen} ->
case exo_socket:async_accept(Listen) of
{ok, Ref} ->
- {ok, #state{ listen = Listen,
- active = Active,
+ {ok, #state{ listen = Listen,
+ active = Active,
socket_reuse = Reuse,
ref=Ref,
- module=Module,
+ module=Module,
args=Args
}};
{error, Reason} ->
- {stop,Reason}
+ {stop,Reason}
end;
{error,Reason} ->
{stop,Reason}
@@ -196,8 +196,8 @@ init([Port,Protos,Options,Module,Args] = _X) ->
%%
%% @end
%%--------------------------------------------------------------------
--spec handle_call(Request::term(),
- From::{pid(), Tag::term()},
+-spec handle_call(Request::term(),
+ From::{pid(), Tag::term()},
State::#state{}) ->
{reply, Reply::term(), State::#state{}} |
{noreply, State::#state{}} |
@@ -271,28 +271,34 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when
+handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when
(State#state.listen)#exo_socket.socket =:= LSocket,
Ref =:= State#state.ref ->
?debug("<-- ~p~n", [_Msg]),
Listen = State#state.listen,
+ AcceptTimeout = proplists:get_value(accept_timeout, Listen#exo_socket.opts, infinity),
NewAccept = exo_socket:async_accept(Listen),
- case exo_socket:async_socket(Listen, Socket, [delay_auth]) of
+ case exo_socket:async_socket(Listen, Socket, [{delay_auth, true}]) of
{ok, XSocket} ->
- case exo_socket_session:start(XSocket,
+ F = fun() ->
+ exo_socket:accept(
+ XSocket, tl(XSocket#exo_socket.protocol), AcceptTimeout)
+ end,
+ XSocketFun = {XSocket, F},
+ case exo_socket_session:start(XSocketFun,
State#state.module,
State#state.args) of
- {ok,Pid} ->
+ {ok, Pid} ->
exo_socket:controlling_process(XSocket, Pid),
gen_server:cast(Pid, {activate,State#state.active});
_Error ->
exo_socket:close(XSocket)
end;
- _Error ->
- error
+ _Error ->
+ error
end,
case NewAccept of
- {ok,Ref1} ->
+ {ok, Ref1} ->
{noreply, State#state { ref = Ref1 }};
{error, Reason} ->
{stop, Reason, State}
@@ -436,4 +442,3 @@ send_reuse_message(Host, Port, Args, M, MyPort, XSocket, RUSt) ->
ReuseMsg = exo_socket_session:encode_reuse(
MyPort, ReuseOpts),
exo_socket:send(XSocket, ReuseMsg).
-
diff --git a/deps/exo/src/exo_socket_session.erl b/deps/exo/src/exo_socket_session.erl
index 623b301..f4518db 100644
--- a/deps/exo/src/exo_socket_session.erl
+++ b/deps/exo/src/exo_socket_session.erl
@@ -19,21 +19,21 @@
-behaviour(gen_server).
%% API
--export([start/3,
+-export([start/3,
start_link/3]).
%% gen_server callbacks
--export([init/1,
- handle_call/3,
- handle_cast/2,
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
handle_info/2,
- terminate/2,
+ terminate/2,
code_change/3]).
--export([encode_reuse/2,
+-export([encode_reuse/2,
decode_reuse_config/1]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
-record(state, {
module,
@@ -90,6 +90,7 @@ start(XSocket, Module, Args) ->
%% @end
%%--------------------------------------------------------------------
init([XSocket, Module, Args]) ->
+ ?debug("init(~p, ~p, ~p)", [XSocket, Module, Args]),
{ok, #state{ socket=XSocket,
module=Module,
args=Args,
@@ -104,23 +105,23 @@ init([XSocket, Module, Args]) ->
%%
%% @end
%%--------------------------------------------------------------------
--spec handle_call(Request::term(),
- From::{pid(), Tag::term()},
+-spec handle_call(Request::term(),
+ From::{pid(), Tag::term()},
State::#state{}) ->
{reply, Reply::term(), State::#state{}} |
{noreply, State::#state{}} |
{stop, Reason::atom(), Reply::term(), State::#state{}}.
%% No 'local' handle_call
-handle_call(Request, From,
+handle_call(Request, From,
State=#state{module = M, state = MSt, socket = Socket}) ->
?dbg("handle_call: ~p", [Request]),
try M:control(Socket, Request, From, MSt) of
- Result ->
+ Result ->
?dbg("handle_call: reply ~p", [Result]),
mod_reply(Result, From, State)
catch
- error:_Error ->
+ error:_Error ->
?dbg("handle_call: catch reason ~p", [_Error]),
ret({reply, {error, unknown_call}, State})
end.
@@ -172,40 +173,24 @@ send_(Bin, From, #state{socket = S, pending = P} = State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({activate,Active}, State0) ->
+handle_cast({activate,Active}, #state{socket = XSocket0} = State0) ->
?dbg("activate~n", []),
- try exo_socket:authenticate(State0#state.socket) of
- {ok, S} ->
- ?dbg("authentication done~n", []),
- State = State0#state{socket = S},
- case apply(State#state.module, init,
- [State#state.socket,State#state.args]) of
- Ok when element(1, Ok) == ok ->
- CSt0 = element(2, Ok),
- %% enable active mode here (if ever wanted) once is handled,
- %% automatically anyway. exit_on_close is default and
- %% allow session statistics retrieval in the close callback
- SessionOpts = [{active,Active},{exit_on_close, false}],
-
- _Res = exo_socket:setopts(State#state.socket, SessionOpts),
- ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]),
- State1 = State#state { active = Active, state = CSt0 },
- case Ok of
- {_, _, Timeout} ->
- ret({noreply, State1, Timeout});
- {_, _} ->
- ret({noreply, State1})
- end;
- {stop,Reason,CSt1} ->
- {stop, Reason, State#state { state = CSt1 }}
+ case XSocket0 of
+ {#exo_socket{}, Fun} when is_function(Fun, 0) ->
+ try Fun() of
+ {ok, XSocket} ->
+ activate_(Active, State0#state{socket = XSocket});
+ {error, _} = Error ->
+ ?debug("socket fun -> ~p", [Error]),
+ {stop, Error, State0}
+ catch
+ Cat:Exception ->
+ ?debug("caught ~p:~p from socket fun", [Cat, Exception]),
+ {stop, Exception, State0}
end;
- {error, Reason} ->
- {stop, {auth_failure, Reason}, State0}
- catch
- error:Crash ->
- {stop, {auth_failure, Crash}, State0}
+ #exo_socket{}->
+ activate_(Active, State0)
end;
-
handle_cast(_Msg, State) ->
ret({noreply, State}).
@@ -223,9 +208,9 @@ handle_info(timeout, State) ->
exo_socket:shutdown(State#state.socket, write),
?dbg("exo_socket_session: idle_timeout~p~n", [self()]),
{stop, normal, State};
-handle_info({Tag,Socket,Data0}, State) when
+handle_info({Tag,Socket,Data0}, State) when
%% FIXME: put socket tag in State for correct matching
- (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http),
+ (Tag =:= tcp orelse Tag =:= ssl orelse Tag =:= http),
Socket =:= (State#state.socket)#exo_socket.socket ->
?dbg("exo_socket_session: got data ~p\n", [{Tag,Socket,Data0}]),
try exo_socket:auth_incoming(State#state.socket, Data0) of
@@ -247,7 +232,7 @@ handle_info({Tag,Socket}, State) when
{ok,CSt1} ->
{stop, normal, State#state { state = CSt1 }}
end;
-handle_info({Tag,Socket,Error}, State) when
+handle_info({Tag,Socket,Error}, State) when
(Tag =:= tcp_error orelse Tag =:= ssl_error),
Socket =:= (State#state.socket)#exo_socket.socket ->
?dbg("exo_socket_session: got error ~p\n", [{Tag,Socket,Error}]),
@@ -258,7 +243,7 @@ handle_info({Tag,Socket,Error}, State) when
{stop,Reason,CSt1} ->
{stop, Reason, State#state { state = CSt1 }}
end;
-
+
handle_info(_Info, State) ->
?dbg("Got info: ~p\n", [_Info]),
ret({noreply, State}).
@@ -275,9 +260,14 @@ handle_info(_Info, State) ->
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- exo_socket:close(State#state.socket),
+ socket_close(State#state.socket),
ok.
+socket_close({#exo_socket{} = S, _}) ->
+ exo_socket:close(S);
+socket_close(#exo_socket{} = S) ->
+ exo_socket:close(S).
+
%%--------------------------------------------------------------------
%% @private
%% @doc
@@ -293,6 +283,40 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+activate_(Active, State0) ->
+ try exo_socket:authenticate(State0#state.socket) of
+ {ok, S} ->
+ ?dbg("authentication done~n", []),
+ State = State0#state{socket = S},
+ case apply(State#state.module, init,
+ [State#state.socket,State#state.args]) of
+ Ok when element(1, Ok) == ok ->
+ CSt0 = element(2, Ok),
+ %% enable active mode here (if ever wanted) once is handled,
+ %% automatically anyway. exit_on_close is default and
+ %% allow session statistics retrieval in the close callback
+ SessionOpts = [{active,Active},{exit_on_close, false}],
+
+ _Res = exo_socket:setopts(State#state.socket, SessionOpts),
+ ?dbg("exo_socket:setopts(~w) = ~w\n", [SessionOpts, _Res]),
+ State1 = State#state { active = Active, state = CSt0 },
+ case Ok of
+ {_, _, Timeout} ->
+ ret({noreply, State1, Timeout});
+ {_, _} ->
+ ret({noreply, State1})
+ end;
+ {stop,Reason,CSt1} ->
+ {stop, Reason, State#state { state = CSt1 }}
+ end;
+ {error, Reason} ->
+ {stop, {auth_failure, Reason}, State0}
+ catch
+ error:Crash ->
+ {stop, {auth_failure, Crash}, State0}
+ end.
+
+
ret({noreply, #state{idle_timeout = T} = S}) ->
if T==undefined -> {noreply, S};
true -> {noreply, S, T}
@@ -333,7 +357,7 @@ handle_reuse_data(Rest, #state{module = M, state = MSt} = State) ->
handle_socket_data(Data, State) ->
CSt0 = State#state.state,
- ModResult = apply(State#state.module, data,
+ ModResult = apply(State#state.module, data,
[State#state.socket,Data,CSt0]),
?dbg("handle_socket_data: result ~p", [ModResult]),
handle_module_result(ModResult, State).