summaryrefslogtreecommitdiff
path: root/components/dlink_tls/src/dlink_tls_conn.erl
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:17:30 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:22:36 -0800
commit37dd6cef3e8abdee0829aabf121a2ca5dd35b14e (patch)
treeb7174d0d66f2db6f9a0d724213ad19e427e22847 /components/dlink_tls/src/dlink_tls_conn.erl
parent7922125aba23033945e3b55a4bf78ef8e84521d0 (diff)
downloadrvi_core-37dd6cef3e8abdee0829aabf121a2ca5dd35b14e.tar.gz
fragmentation tests
Diffstat (limited to 'components/dlink_tls/src/dlink_tls_conn.erl')
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl217
1 files changed, 158 insertions, 59 deletions
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index c051afc..447581d 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -27,11 +27,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([setup/6]).
--export([upgrade/3,
- async_upgrade/3]).
+-export([setup/7]).
+-export([upgrade/2,
+ async_upgrade/2]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
@@ -40,7 +41,6 @@
-define(SERVER, ?MODULE).
-define(PACKET_MOD, dlink_data_msgpack).
--define(MAX_MSG_SIZE, infinity).
-record(st, {
ip = {0,0,0,0},
@@ -49,11 +49,11 @@
mode = tcp :: tcp | tls,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ frag_opts = [],
mod = undefined,
func = undefined,
cs,
- role = server :: client | server,
- msg_size = ?MAX_MSG_SIZE :: infinity | pos_integer()
+ role = server :: client | server
}).
%%%===================================================================
@@ -61,8 +61,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
- Params = {IP, Port, Sock, Mod, Fun, CompSpec},
+setup(Role, IP, Port, Sock, Mod, Fun, CompSpec) when Role==client;
+ Role==server ->
+ Params = {Role, IP, Port, Sock, Mod, Fun, CompSpec},
?debug("setup() IP = ~p; Port = ~p; Mod = ~p; Fun = ~p", [IP, Port, Mod, Fun]),
?debug("CompSpec = ~p", [CompSpec]),
case gen_server:start_link(?MODULE, Params ,[]) of
@@ -75,16 +76,18 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
Err
end.
-upgrade(Pid, Role, CompSpec) when Role==client; Role==server ->
- gen_server:call(Pid, {upgrade, Role, CompSpec}).
+upgrade(Pid, Role) when Role==client; Role==server ->
+ gen_server:call(Pid, {upgrade, Role}).
-async_upgrade(Pid, Role, CompSpec) when Role==client;
- Role==server ->
- gen_server:cast(Pid, {upgrade, Role, CompSpec}).
+async_upgrade(Pid, Role) when Role==client;
+ Role==server ->
+ gen_server:cast(Pid, {upgrade, Role}).
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(IP, Port, Data) ->
case dlink_tls_connmgr:find_connection_by_address(IP, Port) of
{ok, Pid} ->
@@ -97,6 +100,9 @@ send(IP, Port, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -139,7 +145,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
+init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> dlink_tls_connmgr:add_connection(IP, Port, self())
@@ -153,6 +159,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
?debug("packet_mod = ~p", [PktMod]),
PktSt = PktMod:init(CompSpec),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -160,12 +168,27 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
mod = Mod,
packet_mod = PktMod,
packet_st = PktSt,
+ frag_opts = FragOpts,
func = Fun,
- cs = CompSpec
+ cs = rvi_common:set_value(role, Role, CompSpec)
}}.
get_module_config(Key, Default, CS) ->
- rvi_common:get_module_config(data_link, dlink_tls_rpc, Key, Default, CS).
+ ModConf = fun() ->
+ rvi_common:get_module_config(
+ data_link, dlink_tls_rpc, Key, Default, CS)
+ end,
+ case rvi_common:get_value(tls_opts, undefined, CS) of
+ undefined -> ModConf();
+ Opts ->
+ case lists:keyfind(Key, 1, Opts) of
+ false ->
+ ModConf();
+ {_, Val} ->
+ Val
+ end
+ end.
+
%%--------------------------------------------------------------------
%% @private
@@ -183,18 +206,18 @@ get_module_config(Key, Default, CS) ->
%%--------------------------------------------------------------------
-handle_call(terminate_connection, _From, St) ->
+handle_call(terminate_connection, _From, #st{} = St) ->
?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.ip, St#st.port}]),
{stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St),
{stop, Reason, ok, NSt};
-handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
+handle_call({upgrade, Role} = Req, _From, #st{cs = CS} = St) ->
?debug("~p:handle_call(~p)~n", [?MODULE, Req]),
%% deliberately crash (for now) if upgrade fails.
- {Reply, St1} = handle_upgrade(Role, CompSpec, St),
+ {Reply, #st{} = St1} = handle_upgrade(Role, CS, St),
{reply, Reply, St1};
-handle_call(_Request, _From, State) ->
+handle_call(_Request, _From, #st{} = State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
Reply = ok,
{reply, Reply, State}.
@@ -209,11 +232,11 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({upgrade, Role, CompSpec}, St) ->
- {_, St1} = handle_upgrade(Role, CompSpec, St),
+handle_cast({upgrade, Role}, #st{cs = CS} = St) ->
+ {_, #st{} = St1} = handle_upgrade(Role, CS, St),
{noreply, St1};
handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
- ?debug("~p:handle_call(send): Sending: ~p",
+ ?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, abbrev(Data)]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
?debug("Encoded~n~s", [Encoded]),
@@ -222,15 +245,29 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
tls -> ssl:send(St#st.sock, Encoded)
end,
{noreply, St#st{packet_st = PSt1}};
-
-handle_cast({activate_socket, Sock}, State) ->
+handle_cast({send, Data, Opts} = Req, #st{packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast(~p, ...), FragOpts = ~p", [Req, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, fun() ->
+ do_send(Bin, St1)
+ end),
+ {noreply, St1};
+handle_cast({send_data, Data}, #st{} = St) ->
+ %% don't encode; just send
+ ?debug("send_data, ~w", [authorize_keys:abbrev_bin(Data)]),
+ do_send(Data, St),
+ {noreply, St};
+handle_cast({activate_socket, Sock}, #st{} = State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
{noreply, State};
-handle_cast(_Msg, State) ->
- ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
+handle_cast(_Msg, #st{} = State) ->
+ ?warning("~p:handle_cast(): Unknown cast: ~p~nSt=~p", [ ?MODULE, _Msg, State]),
{noreply, State}.
%%--------------------------------------------------------------------
@@ -255,6 +292,7 @@ handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port,
packet_mod = PMod, packet_st = PSt} = State) ->
?debug("handle_info(data): Data: ~p", [abbrev(Data)]),
?debug("handle_info(data): From: ~p:~p ", [ IP, Port]),
+ ?debug("handle_info(data): PMod: ~p", [PMod]),
case PMod:decode(Data, fun(Elems) ->
handle_elems(Elems, State)
end, PSt) of
@@ -283,15 +321,18 @@ handle_info({tcp, Sock, Data},
{stop, Reason, State}
end;
-handle_info({tcp_closed, Sock},
+handle_info({Evt, Sock},
#st { ip = IP,
port = Port,
mod = Mod,
func = Fun,
- cs = CS} = State) ->
- ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]),
+ cs = CS} = State) when Evt==tcp_closed; Evt==ssl_closed ->
+ ?debug("~p:handle_info(~w): Address: ~p:~p ", [ ?MODULE, Evt, IP, Port]),
Mod:Fun(self(), IP, Port,closed, CS),
- gen_tcp:close(Sock),
+ case Evt of
+ tcp_closed -> gen_tcp:close(Sock);
+ ssl_closed -> ssl:close(Sock)
+ end,
dlink_tls_connmgr:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -308,7 +349,7 @@ handle_info({tcp_error, _Sock},
{stop, normal, State};
-handle_info(_Info, State) ->
+handle_info(_Info, #st{} = State) ->
?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]),
{noreply, State}.
@@ -341,15 +382,20 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+
+do_send(Bin, #st{sock = Sock, mode = tcp}) ->
+ gen_tcp:send(Sock, Bin);
+do_send(Bin, #st{sock = Sock, mode = tls}) ->
+ ssl:send(Sock, Bin).
+
handle_upgrade(Role, CompSpec, #st{sock = S} = St) ->
- %% {ok, [{active, Last}]} = inet:getopts(S, [active]),
inet:setopts(S, [{active, false}]),
case do_upgrade(S, Role, CompSpec) of
- {ok, NewS} ->
+ {DoVerify, {ok, NewS}} ->
?debug("upgrade to TLS succcessful~n", []),
ssl:setopts(NewS, [{active, once}]),
{ok, {IP, Port}} = ssl:peername(NewS),
- {ok, PeerCert} = ssl:peercert(NewS),
+ PeerCert = get_peercert(DoVerify, NewS),
?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
NewCS = rvi_common:set_value(
dlink_tls_role, Role,
@@ -357,35 +403,73 @@ handle_upgrade(Role, CompSpec, #st{sock = S} = St) ->
{ok, St#st{sock = NewS, mode = tls, role = Role,
ip = inet_parse:ntoa(IP), port = Port,
cs = NewCS}};
- Error ->
+ {_, Error} ->
?error("Cannot upgrade to TLS: ~p~n", [Error]),
error({cannot_upgrade, Error})
end.
+get_peercert(DoVerify, S) ->
+ case ssl:peercert(S) of
+ {ok, PeerCert} ->
+ PeerCert;
+ {error, _} when DoVerify == false ->
+ undefined
+ end.
+
do_upgrade(Sock, client, CompSpec) ->
- Opts = tls_opts(client, CompSpec),
+ {DoVerify, Opts} = tls_opts(client, CompSpec),
?debug("TLS Opts = ~p", [Opts]),
- ssl:connect(Sock, Opts);
+ {DoVerify, ssl:connect(Sock, Opts)};
do_upgrade(Sock, server, CompSpec) ->
- Opts = tls_opts(client, CompSpec),
+ {DoVerify, Opts} = tls_opts(client, CompSpec),
?debug("TLS Opts = ~p", [Opts]),
- ssl:ssl_accept(Sock, Opts).
-
-%% FIXME: For now, use the example certs delivered with the OTP SSL appl.
-tls_opts(Role, _CompSpec) ->
- {ok, DevCert} = setup:get_env(rvi_core, device_cert),
- {ok, DevKey} = setup:get_env(rvi_core, device_key),
- {ok, CACert} = setup:get_env(rvi_core, root_cert),
- [
- {verify, verify_peer},
- {certfile, DevCert},
- {keyfile, DevKey},
- {cacertfile, CACert},
- {verify_fun, {fun verify_fun/3, public_root_key()}},
- {partial_chain, fun(X) ->
- partial_chain(Role, X)
- end}
- ].
+ {DoVerify, ssl:ssl_accept(Sock, Opts)}.
+
+tls_opts(Role, CompSpec) ->
+ TlsOpts = rvi_common:get_value(tls_opts, [], CompSpec),
+ Opt = fun(K) -> opt(K, TlsOpts,
+ fun() ->
+ ok(setup:get_env(rvi_core, K))
+ end)
+ end,
+ case VOpt = lists:keyfind(verify, 1, TlsOpts) of
+ {verify, false} when Role == server ->
+ {false, [
+ {verify, verify_none},
+ {certfile, Opt(device_cert)},
+ {keyfile, Opt(device_key)},
+ {cacertfile, Opt(root_cert)}
+ ]};
+ {verify, false} ->
+ {false, [
+ {verify, verify_none}
+ ]};
+ _ when VOpt==false; VOpt == {verify, true} -> % {verify,true} default
+ {true, [
+ {verify, verify_peer},
+ {certfile, Opt(device_cert)},
+ {keyfile, Opt(device_key)},
+ {cacertfile, Opt(root_cert)},
+ {verify_fun, opt(verify_fun, TlsOpts,
+ {fun verify_fun/3, public_root_key()})},
+ {partial_chain, opt(partial_chain, TlsOpts,
+ fun(X) ->
+ partial_chain(Role, X)
+ end)}
+ ]}
+ end.
+
+opt(Key, Opts, Def) ->
+ case lists:keyfind(Key, 1, Opts) of
+ false when is_function(Def, 0) -> Def();
+ false -> Def;
+ {_, V} -> V
+ end.
+
+ok({ok, V}) ->
+ V;
+ok(Other) ->
+ error({badmatch, Other}).
public_root_key() ->
authorize_keys:provisioning_key().
@@ -425,11 +509,26 @@ partial_chain(_, Certs) ->
?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]),
{trusted_ca, hd(Certs)}.
-handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS,
- ip = IP, port = Port}) ->
+handle_elems(Elements, #st{frag_opts = FragOpts} = St) ->
+ MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts),
+ ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]),
+ case MaybeF of
+ true ->
+ %% It was a fragment, but not a complete message yet
+ St;
+ {true, Msg} ->
+ #st{packet_mod = PMod, packet_st = PSt} = St,
+ PMod:decode(Msg, fun(Elems) ->
+ got_msg(Elems, St)
+ end, PSt);
+ false ->
+ got_msg(Elements, St)
+ end.
+
+got_msg(Elements, #st{ip = IP, port = Port, mod = Mod, func = Fun, cs = CS} = St) ->
?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]),
Mod:Fun(self(), IP, Port, data, Elements, CS),
- ok.
+ St.
verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS,
signature = Sig}, PubKey) ->