summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--components/authorize/src/authorize_keys.erl22
-rw-r--r--components/dlink_bt/src/bt_connection.erl12
-rw-r--r--components/dlink_bt/src/bt_listener.erl2
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl14
-rw-r--r--components/dlink_tcp/src/connection.erl2
-rw-r--r--components/dlink_tcp/src/connection_manager.erl145
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl10
-rw-r--r--components/dlink_tcp/src/dlink_tcp_sup.erl5
-rw-r--r--components/dlink_tcp/src/gen_nb_server.erl3
-rw-r--r--components/dlink_tcp/src/listener.erl53
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl53
-rw-r--r--components/dlink_tls/src/dlink_tls_connmgr.erl132
-rw-r--r--components/dlink_tls/src/dlink_tls_listener.erl57
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl30
-rw-r--r--components/dlink_tls/src/dlink_tls_sup.erl2
-rw-r--r--components/proto_msgpack/src/proto_msgpack_rpc.erl15
-rw-r--r--components/rvi_common/src/rvi_log.erl2
-rw-r--r--components/service_edge/src/wse_server.erl30
-rw-r--r--deps/exo/src/exo_socket_server.erl4
-rw-r--r--deps/exo/src/exo_socket_session.erl4
-rw-r--r--priv/test_config/bt_backend.config2
-rw-r--r--priv/test_config/bt_sample.config2
-rw-r--r--priv/test_config/tls_backend.config2
-rw-r--r--priv/test_config/tls_sample.config2
-rw-r--r--priv/test_config/tlsj_backend.config2
-rw-r--r--priv/test_config/tlsj_sample.config2
-rw-r--r--test/rvi_core_SUITE.erl124
27 files changed, 413 insertions, 320 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index a362c7b..f651a60 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -197,14 +197,16 @@ handle_call_({validate_message, JWT, Conn}, _, S) ->
{reply, validate_message_(JWT, Conn), S};
handle_call_({validate_service_call, Svc, Conn}, _, S) ->
{reply, validate_service_call_(Svc, Conn), S};
-handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) ->
+handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn0, PeerCert, LogId}, _, S) ->
+ Conn = normalize_conn(Conn0),
+ ?debug("save_cred: ~p (Conn=~p, PeerCert=~p)", [Cred, Conn, abbrev(PeerCert)]),
case process_cred_struct(Cred, JWT, PeerCert) of
invalid ->
log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]),
{reply, {error, invalid}, S};
#cred{} = C ->
ets:insert(?CREDS, {{Conn, C#cred.id}, C}),
- log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]),
+ log(LogId, result, "cred stored ~s Conn=~p", [abbrev_bin(C#cred.id), Conn]),
{reply, ok, S}
end;
handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
@@ -244,14 +246,26 @@ creds_by_conn(Conn) ->
?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]),
[C || {C,V} <- Creds, check_validity(V, UTC)].
-cred_recs_by_conn(Conn) ->
- ?debug("cred_recs_by_conn(~p)~n", [Conn]),
+cred_recs_by_conn(Conn0) ->
+ Conn = normalize_conn(Conn0),
+ ?debug("cred_recs_by_conn(~p)~nAll = ~p", [Conn, abbrev(ets:tab2list(?CREDS))]),
UTC = rvi_common:utc_timestamp(),
Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'},
[], ['$1'] }]),
?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]),
[C || C <- Creds, check_validity(C#cred.validity, UTC)].
+normalize_conn(local) ->
+ local;
+normalize_conn({IP, Port} = Conn) when is_binary(IP), is_binary(Port) ->
+ Conn;
+normalize_conn({IP, Port}) ->
+ {to_bin(IP), to_bin(Port)}.
+
+to_bin(B) when is_binary(B) -> B;
+to_bin(L) when is_list(L) -> iolist_to_binary(L);
+to_bin(I) when is_integer(I) -> integer_to_binary(I).
+
filter_by_service_(Services, Conn) ->
?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]),
Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1',
diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl
index 6b3a64e..bcfa199 100644
--- a/components/dlink_bt/src/bt_connection.erl
+++ b/components/dlink_bt/src/bt_connection.erl
@@ -138,7 +138,7 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) ->
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
PktSt = PktMod:init(CS),
{ok, #st{
- remote_addr = BTAddr,
+ remote_addr = bt_addr(Mode, BTAddr),
channel = Channel,
rfcomm_ref = undefined,
mode = Mode,
@@ -394,9 +394,10 @@ handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod,
inet_db:register_socket(Sock, inet_tcp),
inet:setopts(Sock, [{active, once}]),
{ok, {BTAddr, Channel}} = inet:peername(Sock),
+ ?debug("peername (tcp): ~p:~p", [BTAddr, Channel]),
Mod:Fun(self(), BTAddr, Channel, accepted, Arg),
{noreply, St#st{rfcomm_ref = Sock,
- remote_addr = BTAddr}};
+ remote_addr = bt_addr(tcp, BTAddr)}};
handle_info(_Info, State) ->
?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]),
@@ -442,3 +443,10 @@ handle_elements(Elements, #st{remote_addr = BTAddr,
?debug("data complete; processed: ~p",
[authorize_keys:abbrev(Elements)]),
Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg).
+
+
+bt_addr(tcp, Addr) ->
+ {ok, IP} = inet:ip(Addr),
+ inet_parse:ntoa(IP);
+bt_addr(bt, Addr) ->
+ Addr.
diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl
index a1f1a49..06efd3d 100644
--- a/components/dlink_bt/src/bt_listener.erl
+++ b/components/dlink_bt/src/bt_listener.erl
@@ -42,7 +42,7 @@ accept_ack(Result, LRef, Addr, Chan) ->
ok.
sock_opts() ->
- [binary, {active, once}, {packet, 0}].
+ [{reuseaddr, true}, binary, {active, once}, {packet, 0}].
init(Mode) ->
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index 69310a7..c675387 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -651,11 +651,19 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
- {ok,[{address, Address }]} = bt_drv:local_info([address]),
+ {Address, Channel} =
+ case Mode = get_mode(CompSpec) of
+ bt ->
+ {ok,[{address, Addr}]} = bt_drv:local_info([address]),
+ {bt_address_to_string(Addr), SetupChannel};
+ tcp ->
+ {IP, Port} = rvi_common:node_address_tuple(),
+ {IP, integer_to_binary(Port)}
+ end,
bt_connection:send(Pid,
[{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
- { ?DLINK_ARG_PORT, SetupChannel },
+ { ?DLINK_ARG_ADDRESS, Address },
+ { ?DLINK_ARG_PORT, Channel },
{ ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
{ ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
| log_id_tail(CompSpec)]).
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index b24215c..7229b3c 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -231,7 +231,7 @@ handle_info({tcp, Sock, Data},
port = Port,
packet_mod = PMod,
packet_st = PSt} = State) ->
- ?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
+ ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]),
case PMod:decode(Data, fun(Elems) ->
handle_elements(Elems, State)
end, PSt) of
diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl
index 6a9f1e0..e16f789 100644
--- a/components/dlink_tcp/src/connection_manager.erl
+++ b/components/dlink_tcp/src/connection_manager.erl
@@ -2,10 +2,10 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
-%%
+%%
%%%-------------------------------------------------------------------
%%% @author magnus <magnus@t520.home>
%%% @copyright (C) 2014, magnus
@@ -33,7 +33,10 @@
-export([find_connection_by_address/2]).
-export([connections/0]).
--define(SERVER, ?MODULE).
+-define(SERVER, ?MODULE).
+
+-define(PID_TAB, dlink_tcp_conn_by_pid).
+-define(ADDR_TAB, dlink_tcp_conn_by_addr).
-record(st, {
conn_by_pid = undefined,
@@ -70,6 +73,7 @@ connections() ->
%% @end
%%--------------------------------------------------------------------
start_link() ->
+ create_ets(),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
@@ -93,6 +97,18 @@ init([]) ->
conn_by_addr = dict:new() %% All managed connection stored by address
}}.
+create_ets() ->
+ maybe_create(?PID_TAB),
+ maybe_create(?ADDR_TAB).
+
+maybe_create(Tab) ->
+ case ets:info(Tab, name) of
+ undefined ->
+ ets:new(Tab, [public, named_table, set]);
+ _ ->
+ Tab
+ end.
+
%%--------------------------------------------------------------------
%% @private
%% @doc
@@ -107,106 +123,84 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call({add_connection, IP, Port, Pid}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) ->
-
- ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p",
+handle_call({add_connection, IP, Port, Pid}, _From, St) ->
+ ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p",
[ ?MODULE, Pid, { IP, Port }]),
%% Store so that we can find connection both by pid and by address
- NConPid = dict:store(Pid, { IP, Port }, ConPid),
- NConAddr = dict:store({ IP, Port }, Pid, ConAddr),
-
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
- {reply, ok, NSt};
+ ets_insert(?PID_TAB, {Pid, {IP, Port}}),
+ ets_insert(?ADDR_TAB, {{IP, Port}, Pid}),
+ {reply, ok, St};
%% Delete connection by pid
-handle_call({delete_connection_by_pid, Pid}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) when is_pid(Pid)->
-
+handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid)->
%% Find address associated with Pid
- case dict:find(Pid, ConPid) of
- error ->
- ?debug("~p:handle_call(del_by_pid): not found: ~p",
+ case ets_lookup(?PID_TAB, Pid) of
+ [] ->
+ ?debug("~p:handle_call(del_by_pid): not found: ~p",
[ ?MODULE, Pid]),
{ reply, not_found, St};
-
- {ok, Addr } ->
- ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p",
- [ ?MODULE, Pid, Addr]),
-
- NConPid = dict:erase(Pid, ConPid),
- NConAddr = dict:erase(Addr, ConAddr),
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
+ [{_, Addr}] ->
+ ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p",
+ [ ?MODULE, Pid, Addr]),
- {reply, ok, NSt}
+ ets_delete(?PID_TAB, Pid),
+ ets_delete(?ADDR_TAB, Addr),
+ {reply, ok, St}
end;
%% Delete connection by address
-handle_call({ delete_connection_by_address, IP, Port}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) ->
-
+handle_call({ delete_connection_by_address, IP, Port}, _From, St) ->
%% Find Pid associated with Address
- case dict:find({IP, Port}, ConAddr) of
- error ->
- ?debug("~p:handle_call(del_by_addr): not found: ~p",
- [ ?MODULE, {IP, Port}]),
+ Addr = {IP, Port},
+ case ets_lookup(?ADDR_TAB, Addr) of
+ [] ->
+ ?debug("~p:handle_call(del_by_addr): not found: ~p",
+ [ ?MODULE, Addr]),
{ reply, not_found, St};
-
- {ok, Pid } ->
- ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p",
- [ ?MODULE, Pid, {IP, Port}]),
- NConPid = dict:erase(Pid, ConPid),
- NConAddr = dict:erase({ IP, Port }, ConAddr),
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
- {reply, ok, NSt}
- end;
+ [{_, Pid}] ->
+ ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p",
+ [ ?MODULE, Pid, Addr]),
+ ets_delete(?PID_TAB, Pid),
+ ets_delete(?ADDR_TAB, Addr),
+ {reply, ok, St}
+ end;
%% Find connection by pid
-handle_call({ find_connection_by_pid, Pid}, _From,
- #st { conn_by_pid = ConPid} = St) when is_pid(Pid)->
-
+handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)->
%% Find address associated with Pid
- case dict:find(Pid, ConPid) of
- error ->
- ?debug("~p:handle_call(find_by_pid): not found: ~p",
+ case ets_lookup(?PID_TAB, Pid) of
+ [] ->
+ ?debug("~p:handle_call(find_by_pid): not found: ~p",
[ ?MODULE, Pid]),
{ reply, not_found, St};
-
- {ok, {IP, Port} } ->
- ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p",
+
+ [{_, {IP, Port}}] ->
+ ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p",
[ ?MODULE, Pid, {IP, Port}]),
{reply, {ok, IP, Port}, St}
end;
%% Find connection by address
-handle_call({find_connection_by_address, IP, Port}, _From,
- #st { conn_by_addr = ConAddr} = St) ->
-
+handle_call({find_connection_by_address, IP, Port}, _From, St) ->
%% Find address associated with Pid
- case dict:find({IP, Port}, ConAddr) of
- error ->
- ?debug("~p:handle_call(find_by_addr): not found: ~p",
+ case ets_lookup(?ADDR_TAB, {IP, Port}) of
+ [] ->
+ ?debug("~p:handle_call(find_by_addr): not found: ~p",
[ ?MODULE, {IP, Port}]),
{ reply, not_found, St};
-
- {ok, Pid } ->
- ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p",
+
+ [{_, Pid}] ->
+ ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p",
[ ?MODULE, {IP, Port}, Pid]),
{reply, {ok, Pid}, St}
end;
-handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) ->
- {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St};
+handle_call(connections, _From, St) ->
+ {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1']}]), St};
handle_call(_Request, _From, State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
@@ -269,3 +263,16 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+
+%% Ets wrapper functions to simplify tracing.
+ets_lookup(Tab, Key) ->
+ ets:lookup(Tab, Key).
+
+ets_insert(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
+ets_delete(Tab, Key) ->
+ ets:delete(Tab, Key).
+
+ets_select(Tab, Pattern) ->
+ ets:select(Tab, Pattern).
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 83e0a24..68d32d7 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -94,11 +94,11 @@ start_json_server() ->
start_connection_manager() ->
%% Fire up listener
CompSpec = rvi_common:get_component_specification(),
- connection_manager:start_link(),
+ %% connection_manager:start_link(),
?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]),
- {ok,Pid} = listener:start_link(),
+ %% {ok,Pid} = listener:start_link(),
%%
- setup_initial_listeners(Pid, CompSpec),
+ setup_initial_listeners(CompSpec),
?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."),
@@ -112,7 +112,7 @@ start_connection_manager() ->
ok.
-setup_initial_listeners(Pid, CompSpec) ->
+setup_initial_listeners(CompSpec) ->
case rvi_common:get_module_config(data_link,
?MODULE,
?SERVER_OPTS,
@@ -123,7 +123,7 @@ setup_initial_listeners(Pid, CompSpec) ->
?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
%%
%% Add listener port.
- case listener:add_listener(Pid, IP, Port, CompSpec) of
+ case listener:add_listener(IP, Port, CompSpec) of
ok ->
?notice("---- RVI Node External Address: ~s",
[ application:get_env(rvi_core, node_address, undefined)]);
diff --git a/components/dlink_tcp/src/dlink_tcp_sup.erl b/components/dlink_tcp/src/dlink_tcp_sup.erl
index edb9c82..04f5255 100644
--- a/components/dlink_tcp/src/dlink_tcp_sup.erl
+++ b/components/dlink_tcp/src/dlink_tcp_sup.erl
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -34,6 +34,7 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10},
[
+ ?CHILD(connection_manager, worker),
+ ?CHILD(listener, worker),
?CHILD(dlink_tcp_rpc, worker)
]} }.
-
diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl
index af72189..ae0a605 100644
--- a/components/dlink_tcp/src/gen_nb_server.erl
+++ b/components/dlink_tcp/src/gen_nb_server.erl
@@ -43,6 +43,7 @@
terminate/2,
code_change/3]).
+-include_lib("lager/include/log.hrl").
-define(SERVER, ?MODULE).
-record(state, {cb,
@@ -203,11 +204,13 @@ code_change(_OldVsn, State, _Extra) ->
%% Result = {ok, port()} | {error, any()}
listen_on(CallbackModule, IpAddr, Port) ->
SockOpts = [{reuseaddr, true}, {ip, convert(IpAddr)}] ++ CallbackModule:sock_opts(),
+ ?debug("listen on ~p:~p, Opts = ~p", [IpAddr, Port, SockOpts]),
case gen_tcp:listen(Port, SockOpts) of
{ok, LSock} ->
{ok, _Ref} = prim_inet:async_accept(LSock, -1),
{ok, LSock};
Err ->
+ ?debug("listen error: ~p", [Err]),
Err
end.
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 4512a59..6def59a 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -13,29 +13,56 @@
-include_lib("lager/include/log.hrl").
-export([start_link/0,
- add_listener/4,
- remove_listener/3]).
+ add_listener/3,
+ remove_listener/2]).
-export([init/2, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, sock_opts/0, new_connection/4]).
-behavior(gen_nb_server).
+-define(TAB, dlink_tcp_listener_tab).
+
start_link() ->
- gen_nb_server:start_link(?MODULE, []).
+ create_ets(),
+ gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []).
+
+create_ets() ->
+ case ets:info(?TAB, name) of
+ undefined -> ets:new(?TAB, [public, named_table, set]);
+ _ -> ?TAB
+ end.
-add_listener(Pid, IpAddr, Port, CompSpec) ->
- gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}).
+add_listener(IpAddr, Port, CompSpec) ->
+ gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}).
-remove_listener(Pid, IpAddr, Port) ->
- gen_server:call(Pid, {remove_listener, IpAddr, Port}).
+remove_listener(IpAddr, Port) ->
+ gen_server:call(?MODULE, {remove_listener, IpAddr, Port}).
init([], State) ->
- {ok, State}.
+ case ets_select(?TAB, [{ '_', [], ['$_'] }]) of
+ [] ->
+ {ok, State};
+ Addrs ->
+ lists:foldl(
+ fun({{_, _} = Addr}, Acc) ->
+ case gen_nb_server:add_listen_socket(Addr, Acc) of
+ {ok, Acc1} ->
+ Acc1;
+ _Error ->
+ ets_delete(?TAB, Addr),
+ Acc
+ end;
+ ({cs, CS}, Acc) ->
+ gen_nb_server:store_cb_state(CS, Acc)
+ end, State, Addrs)
+ end.
handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) ->
+ ets_insert(?TAB, {cs, CompSpec}),
case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of
{ok, State1} ->
+ ets_insert(?TAB, {{IpAddr,Port}}),
{reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )};
Error ->
@@ -45,6 +72,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) ->
handle_call({remove_listener, IpAddr, Port}, _From, State) ->
case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of
{ok, State1} ->
+ ets_delete(?TAB, {IpAddr, Port}),
{reply, ok, State1};
Error ->
{reply, Error, State}
@@ -77,3 +105,12 @@ new_connection(IP, Port, Sock, State) ->
dlink_tcp_rpc,
handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.
+
+ets_insert(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
+ets_delete(Tab, Key) ->
+ ets:delete(Tab, Key).
+
+ets_select(Tab, Pat) ->
+ ets:select(Tab, Pat).
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index 93266b1..be55dc6 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -28,7 +28,8 @@
terminate/2, code_change/3]).
-export([setup/6]).
--export([upgrade/3]).
+-export([upgrade/3,
+ async_upgrade/3]).
-export([send/2]).
-export([send/3]).
-export([is_connection_up/1]).
@@ -77,6 +78,10 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
upgrade(Pid, Role, CompSpec) when Role==client; Role==server ->
gen_server:call(Pid, {upgrade, Role, CompSpec}).
+async_upgrade(Pid, Role, CompSpec) when Role==client;
+ Role==server ->
+ gen_server:cast(Pid, {upgrade, Role, CompSpec}).
+
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
@@ -186,26 +191,9 @@ handle_call(terminate_connection, _From, St) ->
{stop, Reason, ok, NSt};
handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
?debug("~p:handle_call(~p)~n", [?MODULE, Req]),
-
- {ok, [{active, Last}]} = inet:getopts(S, [active]),
- inet:setopts(S, [{active, false}]),
- case do_upgrade(S, Role, CompSpec) of
- {ok, NewS} ->
- ?debug("upgrade to TLS succcessful~n", []),
- ssl:setopts(NewS, [{active, Last}]),
- {ok, {IP, Port}} = ssl:peername(NewS),
- {ok, PeerCert} = ssl:peercert(NewS),
- ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
- NewCS = rvi_common:set_value(
- dlink_tls_role, Role,
- rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)),
- {reply, ok, St#st{sock = NewS, mode = tls, role = Role,
- ip = inet_parse:ntoa(IP), port = Port,
- cs = NewCS}};
- Error ->
- ?error("Cannot upgrade to TLS: ~p~n", [Error]),
- {stop, Error, Error, St}
- end;
+ %% deliberately crash (for now) if upgrade fails.
+ {Reply, St1} = handle_upgrade(Role, CompSpec, St),
+ {reply, Reply, St1};
handle_call(_Request, _From, State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
Reply = ok,
@@ -221,6 +209,9 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
+handle_cast({upgrade, Role, CompSpec}, St) ->
+ {_, St1} = handle_upgrade(Role, CompSpec, St),
+ {noreply, St1};
handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_call(send): Sending: ~p",
[ ?MODULE, abbrev(Data)]),
@@ -350,6 +341,26 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+handle_upgrade(Role, CompSpec, #st{sock = S} = St) ->
+ {ok, [{active, Last}]} = inet:getopts(S, [active]),
+ inet:setopts(S, [{active, false}]),
+ case do_upgrade(S, Role, CompSpec) of
+ {ok, NewS} ->
+ ?debug("upgrade to TLS succcessful~n", []),
+ ssl:setopts(NewS, [{active, Last}]),
+ {ok, {IP, Port}} = ssl:peername(NewS),
+ {ok, PeerCert} = ssl:peercert(NewS),
+ ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
+ NewCS = rvi_common:set_value(
+ dlink_tls_role, Role,
+ rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)),
+ {ok, St#st{sock = NewS, mode = tls, role = Role,
+ ip = inet_parse:ntoa(IP), port = Port,
+ cs = NewCS}};
+ Error ->
+ ?error("Cannot upgrade to TLS: ~p~n", [Error]),
+ error({cannot_upgrade, Error})
+ end.
do_upgrade(Sock, client, CompSpec) ->
Opts = tls_opts(client, CompSpec),
diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl
index 4947ee6..31e51bd 100644
--- a/components/dlink_tls/src/dlink_tls_connmgr.erl
+++ b/components/dlink_tls/src/dlink_tls_connmgr.erl
@@ -35,11 +35,10 @@
-export([connections/0]).
-define(SERVER, ?MODULE).
+-define(PID_TAB, dlink_tls_pid_tab).
+-define(ADDR_TAB, dlink_tls_addr_tab).
--record(st, {
- conn_by_pid = undefined,
- conn_by_addr = undefined
- }).
+-record(st, {}).
%%%===================================================================
%%% API
@@ -71,8 +70,21 @@ connections() ->
%% @end
%%--------------------------------------------------------------------
start_link() ->
+ create_ets(),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+create_ets() ->
+ maybe_create(?PID_TAB),
+ maybe_create(?ADDR_TAB).
+
+maybe_create(Tab) ->
+ case ets:info(Tab, name) of
+ undefined ->
+ ets:new(Tab, [public, named_table, set]);
+ _ ->
+ Tab
+ end.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -89,10 +101,7 @@ start_link() ->
%% @end
%%--------------------------------------------------------------------
init([]) ->
- {ok, #st{
- conn_by_pid = dict:new(), %% All managed connection stored by pid
- conn_by_addr = dict:new() %% All managed connection stored by address
- }}.
+ {ok, #st{}}.
%%--------------------------------------------------------------------
%% @private
@@ -108,106 +117,83 @@ init([]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call({add_connection, IP, Port, Pid}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) ->
-
+handle_call({add_connection, IP, Port, Pid}, _From, St) ->
+ Addr = {IP, Port},
?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p",
- [ ?MODULE, Pid, { IP, Port }]),
+ [ ?MODULE, Pid, Addr]),
%% Store so that we can find connection both by pid and by address
- NConPid = dict:store(Pid, { IP, Port }, ConPid),
- NConAddr = dict:store({ IP, Port }, Pid, ConAddr),
-
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
- {reply, ok, NSt};
+ ets_insert(?PID_TAB, {Pid, Addr}),
+ ets_insert(?ADDR_TAB, {Addr, Pid}),
+ {reply, ok, St};
%% Delete connection by pid
-handle_call({delete_connection_by_pid, Pid}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) when is_pid(Pid)->
-
+handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid) ->
%% Find address associated with Pid
- case dict:find(Pid, ConPid) of
- error ->
+ case ets_lookup(?PID_TAB, Pid) of
+ [] ->
?debug("~p:handle_call(del_by_pid): not found: ~p",
[ ?MODULE, Pid]),
{ reply, not_found, St};
- {ok, Addr } ->
+ [{_, Addr}] ->
?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p",
[ ?MODULE, Pid, Addr]),
- NConPid = dict:erase(Pid, ConPid),
- NConAddr = dict:erase(Addr, ConAddr),
-
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
-
- {reply, ok, NSt}
+ ets_delete(?PID_TAB, Pid),
+ ets_delete(?ADDR_TAB, Addr),
+ {reply, ok, St}
end;
%% Delete connection by address
-handle_call({ delete_connection_by_address, IP, Port}, _From,
- #st { conn_by_pid = ConPid,
- conn_by_addr = ConAddr} = St) ->
-
+handle_call({ delete_connection_by_address, IP, Port}, _From, St) ->
%% Find Pid associated with Address
- case dict:find({IP, Port}, ConAddr) of
- error ->
+ Addr = {IP, Port},
+ case ets_lookup(?ADDR_TAB, Addr) of
+ [] ->
?debug("~p:handle_call(del_by_addr): not found: ~p",
[ ?MODULE, {IP, Port}]),
{ reply, not_found, St};
-
- {ok, Pid } ->
+ [{_, Pid}] ->
?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p",
[ ?MODULE, Pid, {IP, Port}]),
- NConPid = dict:erase(Pid, ConPid),
- NConAddr = dict:erase({ IP, Port }, ConAddr),
- NSt = St#st { conn_by_pid = NConPid,
- conn_by_addr = NConAddr },
- {reply, ok, NSt}
+ ets_delete(?PID_TAB, Pid),
+ ets_delete(?ADDR_TAB, Addr),
+ {reply, ok, St}
end;
%% Find connection by pid
-handle_call({ find_connection_by_pid, Pid}, _From,
- #st { conn_by_pid = ConPid} = St) when is_pid(Pid)->
-
+handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)->
%% Find address associated with Pid
- case dict:find(Pid, ConPid) of
- error ->
- ?debug("~p:handle_call(find_by_pid): not found: ~p",
- [ ?MODULE, Pid]),
+ case ets_lookup(?PID_TAB, Pid) of
+ [] ->
+ ?debug("~p:handle_call(find_by_pid): not found: ~p~n~p",
+ [ ?MODULE, Pid, ets:tab2list(?PID_TAB)]),
{ reply, not_found, St};
-
- {ok, {IP, Port} } ->
+ [{_, {IP, Port}}] ->
?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p",
[ ?MODULE, Pid, {IP, Port}]),
{reply, {ok, IP, Port}, St}
end;
%% Find connection by address
-handle_call({find_connection_by_address, IP, Port}, _From,
- #st { conn_by_addr = ConAddr} = St) ->
-
+handle_call({find_connection_by_address, IP, Port}, _From, St) ->
%% Find address associated with Pid
- case dict:find({IP, Port}, ConAddr) of
- error ->
+ Addr = {IP, Port},
+ case ets_lookup(?ADDR_TAB, Addr) of
+ [] ->
?debug("~p:handle_call(find_by_addr): not found: ~p",
- [ ?MODULE, {IP, Port}]),
-
+ [ ?MODULE, Addr]),
{ reply, not_found, St};
-
- {ok, Pid } ->
+ [{_, Pid}] ->
?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p",
- [ ?MODULE, {IP, Port}, Pid]),
+ [ ?MODULE, Addr, Pid]),
{reply, {ok, Pid}, St}
end;
-handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) ->
- {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St};
+handle_call(connections, _From, St) ->
+ {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1'] }]), St};
handle_call(_Request, _From, State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
@@ -270,3 +256,15 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+
+ets_lookup(Tab, Key) ->
+ ets:lookup(Tab, Key).
+
+ets_insert(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
+ets_delete(Tab, Key) ->
+ ets:delete(Tab, Key).
+
+ets_select(Tab, Pat) ->
+ ets:select(Tab, Pat).
diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl
index 0effc66..7fdc4d2 100644
--- a/components/dlink_tls/src/dlink_tls_listener.erl
+++ b/components/dlink_tls/src/dlink_tls_listener.erl
@@ -14,29 +14,57 @@
-include_lib("lager/include/log.hrl").
-export([start_link/0,
- add_listener/4,
- remove_listener/3]).
+ add_listener/3,
+ remove_listener/2]).
-export([init/2, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, sock_opts/0, new_connection/4]).
-behavior(gen_nb_server).
+-define(TAB, dlink_tls_listener_tab).
+
start_link() ->
- gen_nb_server:start_link(?MODULE, []).
+ create_tabs(),
+ gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []).
-add_listener(Pid, IpAddr, Port, CompSpec) ->
- gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}).
+add_listener(IpAddr, Port, CompSpec) ->
+ gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}).
-remove_listener(Pid, IpAddr, Port) ->
- gen_server:call(Pid, {remove_listener, IpAddr, Port}).
+remove_listener(IpAddr, Port) ->
+ gen_server:call(?MODULE, {remove_listener, IpAddr, Port}).
init([], State) ->
- {ok, State}.
+ State1 =
+ lists:foldl(
+ fun({{_,_}} = Addr, Acc) ->
+ case gen_nb_server:add_listen_socket(Addr, Acc) of
+ {ok, Acc1} ->
+ ets_insert(?TAB, {Addr}),
+ Acc1;
+ _Error ->
+ ets_delete(?TAB, Addr),
+ Acc
+ end;
+ ({cs, CS}, Acc) ->
+ ets_insert(?TAB, {cs, CS}),
+ gen_nb_server:store_cb_state(CS, Acc)
+ end, State, ets_select(?TAB, [{ '_', [], ['$_'] }])),
+ {ok, State1}.
+
+create_tabs() ->
+ case ets:info(?TAB, name) of
+ undefined ->
+ ets:new(?TAB, [public, named_table, set]);
+ _ ->
+ ?TAB
+ end.
handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) ->
+ ets_insert(?TAB, {cs, CompSpec}),
case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of
{ok, State1} ->
+ ets_insert(?TAB, {{IpAddr, Port}}),
{reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )};
Error ->
@@ -46,6 +74,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) ->
handle_call({remove_listener, IpAddr, Port}, _From, State) ->
case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of
{ok, State1} ->
+ ets_delete(?TAB, {IpAddr, Port}),
{reply, ok, State1};
Error ->
{reply, Error, State}
@@ -79,5 +108,15 @@ new_connection(IP, Port, Sock, State) ->
undefined, 0, Sock,
dlink_tls_rpc,
handle_socket, CompSpec),
- dlink_tls_conn:upgrade(P, server, CompSpec),
+ dlink_tls_conn:async_upgrade(P, server, CompSpec),
{ok, State}.
+
+
+ets_insert(Tab, Obj) ->
+ ets:insert(Tab, Obj).
+
+ets_delete(Tab, Key) ->
+ ets:delete(Tab, Key).
+
+ets_select(Tab, Pat) ->
+ ets:select(Tab, Pat).
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index 2f47792..c0fb3a5 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -103,10 +103,10 @@ start_connection_manager() ->
?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]),
%% Fire up listener
- dlink_tls_connmgr:start_link(),
- {ok,Pid} = dlink_tls_listener:start_link(),
+ %% dlink_tls_connmgr:start_link(),
+ %% {ok,Pid} = dlink_tls_listener:start_link(),
- setup_initial_listeners(Pid, TlsOpts, CompSpec),
+ setup_initial_listeners(TlsOpts, CompSpec),
?info("dlink_tls:init_rvi_component(): Setting up persistent connections."),
@@ -118,14 +118,14 @@ start_connection_manager() ->
setup_persistent_connections_(PersistentConnections, CompSpec),
ok.
-setup_initial_listeners(Pid, [], CompSpec) ->
+setup_initial_listeners([], CompSpec) ->
?debug("no initial listeners", []);
-setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) ->
+setup_initial_listeners([_|_] = TlsOpts, CompSpec) ->
IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
%% Add listener port.
?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
- case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of
+ case dlink_tls_listener:add_listener(IP, Port, CompSpec) of
ok ->
?notice("---- RVI Node External Address: ~s",
[ application:get_env(rvi_core, node_address, undefined)]);
@@ -212,12 +212,18 @@ connect_remote(IP, Port, CompSpec) ->
%% Setup a genserver around the new connection.
{ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock,
?MODULE, handle_socket, CompSpec),
- UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec),
- ?debug("Upgrade result = ~p", [UgRes]),
- %% Send authorize
- send_authorize(Pid, CompSpec),
- ok;
-
+ try dlink_tls_conn:upgrade(Pid, client, CompSpec) of
+ ok ->
+ ?debug("Upgrade result = ~p", [ok]),
+ %% Send authorize
+ send_authorize(Pid, CompSpec),
+ ok
+ catch
+ error:Error ->
+ ?error("TLS upgrade (~p,~p) failed ~p",
+ [IP, Port, Error]),
+ not_available
+ end;
{error, Err } ->
?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p",
[IP, Port, Err]),
diff --git a/components/dlink_tls/src/dlink_tls_sup.erl b/components/dlink_tls/src/dlink_tls_sup.erl
index cd59434..2ede068 100644
--- a/components/dlink_tls/src/dlink_tls_sup.erl
+++ b/components/dlink_tls/src/dlink_tls_sup.erl
@@ -35,5 +35,7 @@ start_link() ->
init([]) ->
{ok, { {one_for_one, 5, 10},
[
+ ?CHILD(dlink_tls_connmgr, worker),
+ ?CHILD(dlink_tls_listener, worker),
?CHILD(dlink_tls_rpc, worker)
]} }.
diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl
index 2b1f59c..e718e4d 100644
--- a/components/proto_msgpack/src/proto_msgpack_rpc.erl
+++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl
@@ -75,6 +75,7 @@ receive_message(CompSpec, {IP, Port}, Data) ->
%% CAlled by local exo http server
handle_rpc("send_message", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, TID} = rvi_common:get_json_element(["transaction_id"], Args),
{ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args),
{ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
@@ -89,7 +90,8 @@ handle_rpc("send_message", Args) ->
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters]}),
+ Parameters,
+ LogId]}),
{ok, [ {status, rvi_common:json_rpc_status(ok)} ]};
@@ -99,10 +101,14 @@ handle_rpc(Other, _Args) ->
handle_notification("receive_message", Args) ->
+ LogId = rvi_common:get_json_log_id(Args),
{ok, Data} = rvi_common:get_json_element(["data"], Args),
{ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args),
{ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args),
- gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}),
+ gen_server:cast(?SERVER, { rvi, receive_message, [Data,
+ RemoteIP,
+ RemotePort,
+ LogId]}),
ok;
handle_notification(Other, _Args) ->
@@ -117,7 +123,8 @@ handle_call({rvi, send_message,
ProtoOpts,
DataLinkMod,
DataLinkOpts,
- Parameters]}, _From, St) ->
+ Parameters
+ | LogId]}, _From, St) ->
?debug(" protocol:send(): transaction id: ~p~n", [TID]),
?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
@@ -139,7 +146,7 @@ handle_call(Other, _From, St) ->
%% Convert list-based data to binary.
-handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) ->
+handle_cast({rvi, receive_message, [Payload, IP, Port | LogId]} = Msg, St) ->
?debug("~p:handle_cast(~p)", [?MODULE, Msg]),
{ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts),
diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl
index 962252c..067485c 100644
--- a/components/rvi_common/src/rvi_log.erl
+++ b/components/rvi_common/src/rvi_log.erl
@@ -357,7 +357,7 @@ purge_id() ->
'$end_of_table' ->
%% Should not be possible ...
ok;
- {Tid} ->
+ Tid ->
ets:delete(?IDS, Tid),
ets:match_delete(?EVENTS, #evt{id = {Tid,'_'}, _ = '_'})
end.
diff --git a/components/service_edge/src/wse_server.erl b/components/service_edge/src/wse_server.erl
index e9897a7..3ce1598 100644
--- a/components/service_edge/src/wse_server.erl
+++ b/components/service_edge/src/wse_server.erl
@@ -47,7 +47,7 @@
connection, %% 'Connection'
key, %% "Sec-WebSocket-Key"
protocol, %% "Sec-WebSocket-Protocol"
- origin, %%
+ origin, %%
version, %% "Sec-WebSocket-Version"
cookie, %% 'Cookie'
hs = []
@@ -77,10 +77,10 @@
%% but is included here to make the example self-contained
-start(Port, M, F, A) when is_integer(Port) ->
+start(Port, M, F, A) when is_integer(Port) ->
start_([{cb, {M,F,A}}, {port,Port}]).
-start(Port,M,F,A, Opts) when is_integer(Port) ->
+start(Port,M,F,A, Opts) when is_integer(Port) ->
start_([{port,Port}, {cb, {M,F,A}}] ++ Opts).
start_(Opts) -> spawn(fun() -> init(Opts) end).
@@ -88,7 +88,7 @@ start_(Opts) -> spawn(fun() -> init(Opts) end).
stop(RegName) when is_atom(RegName) ->
RegName ! stop.
-
+
init(Opts) ->
Port = proplists:get_value(port, Opts, ?WSE_DEFAULT_PORT),
@@ -130,7 +130,7 @@ accept_loop(Listen,Opts,Pid) ->
gen_tcp:close(Listen),
exit(stopped)
end.
-
+
accept(Parent, Listen, Opts) ->
?debug("Accept ~p\n", [Listen]),
case gen_tcp:accept(Listen) of
@@ -146,7 +146,7 @@ accept(Parent, Listen, Opts) ->
send(Pid, Data) ->
- try
+ try
Pid ! { send, Data },
ok
catch
@@ -157,7 +157,7 @@ send(Pid, Data) ->
close(Pid) ->
- try
+ try
Pid ! close,
ok
catch
@@ -284,7 +284,6 @@ ws_loop(Buf, Socket, S) ->
receive
%% WebSocket stuff
{tcp, Socket, Data} ->
- ?debug("tcp ~w: ~p", [Socket, Data]),
ws_data(Buf, Data, Socket, S);
{tcp_closed, Socket} ->
@@ -293,14 +292,14 @@ ws_loop(Buf, Socket, S) ->
{'EXIT',Pid,Reason} ->
case get(parent) of
- Pid ->
+ Pid ->
?debug("exit from parent ~w reason=~p\n", [Pid, Reason]),
exit(Reason);
_ ->
?debug("exit from ~w reason=~p\n", [Pid, Reason]),
ws_loop(Buf, Socket, S)
end;
-
+
Message ->
?debug("handle_local: ~p - ~p", [Message, S]),
case handle_local(Message, Socket, S) of
@@ -313,23 +312,20 @@ ws_loop(Buf, Socket, S) ->
end
end.
-
+
ws_data(Buf, Data, Socket, S) ->
case <<Buf/binary, Data/binary>> of
%% masked data
<<Fin:1,_Rsv:3,Op:4,1:1,126:7,L:16,M:4/binary,Frag:L/binary,Buf1/binary>> ->
- ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]),
Frag1 = ws_mask(M, Frag),
S1 = ws_fragment(Socket, Fin, Op, Frag1, S),
ws_data(Buf1, <<>>, Socket, S1);
<<Fin:1,_Rsv:3,Op:4,1:1,127:7,L:64,M:4/binary,Frag:L/binary,Buf1/binary>> ->
- ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]),
Frag1 = ws_mask(M, Frag),
S1 = ws_fragment(Socket,Fin, Op, Frag1, S),
ws_data(Buf1, <<>>, Socket, S1);
<<Fin:1,_Rsv:3,Op:4,1:1,L:7,M:4/binary,Frag:L/binary,Buf1/binary>> ->
- ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]),
Frag1 = ws_mask(M, Frag),
S1 = ws_fragment(Socket,Fin, Op, Frag1, S),
ws_data(Buf1, <<>>, Socket, S1);
@@ -364,9 +360,7 @@ ws_mask(<<M:32>>, Frag) ->
ws_fragment(Socket,1, Op, Frag, S) ->
Payload = iolist_to_binary(lists:reverse([Frag|S#s.fs])),
- ?debug("op=~w, unmasked payload = ~p", [ws_opcode(Op),Payload]),
Message = ws_decode(Payload,Op),
- ?debug("handle_remote: ~p", [Message]),
handle_remote(Message, Socket, S#s { fs=[] });
ws_fragment(_Socket, 0, _Op, Frag, S) ->
@@ -405,7 +399,7 @@ ws_make_frame(Fin, Op, Mask, Data) ->
<<Fin:1,0:3,Op:4,M:1,127:7,L:64,Mask/binary,Data/binary>>
end.
-
+
handle_local({ send,Data},Socket,S0) ->
?debug("wse_server:send(): ~p", [ Data]),
gen_tcp:send(Socket, ws_make_server_frame(Data, S0#s.type)),
@@ -492,7 +486,7 @@ stop_pong_timer(S0) ->
if is_reference(Tmr) ->
erlang:cancel_timer(Tmr),
receive
- {timeout,Tmr,pong} ->
+ {timeout,Tmr,pong} ->
ok
after 0 ->
ok
diff --git a/deps/exo/src/exo_socket_server.erl b/deps/exo/src/exo_socket_server.erl
index db67c5a..d134a45 100644
--- a/deps/exo/src/exo_socket_server.erl
+++ b/deps/exo/src/exo_socket_server.erl
@@ -280,9 +280,9 @@ handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when
NewAccept = exo_socket:async_accept(Listen),
case exo_socket:async_socket(Listen, Socket, [{delay_auth, true}]) of
{ok, XSocket} ->
- F = fun() ->
+ F = fun(X) ->
exo_socket:accept(
- XSocket, tl(XSocket#exo_socket.protocol), AcceptTimeout)
+ X, tl(X#exo_socket.protocol), AcceptTimeout)
end,
XSocketFun = {XSocket, F},
case exo_socket_session:start(XSocketFun,
diff --git a/deps/exo/src/exo_socket_session.erl b/deps/exo/src/exo_socket_session.erl
index f4518db..189be61 100644
--- a/deps/exo/src/exo_socket_session.erl
+++ b/deps/exo/src/exo_socket_session.erl
@@ -176,8 +176,8 @@ send_(Bin, From, #state{socket = S, pending = P} = State) ->
handle_cast({activate,Active}, #state{socket = XSocket0} = State0) ->
?dbg("activate~n", []),
case XSocket0 of
- {#exo_socket{}, Fun} when is_function(Fun, 0) ->
- try Fun() of
+ {#exo_socket{} = X, Fun} when is_function(Fun, 1) ->
+ try Fun(X) of
{ok, XSocket} ->
activate_(Active, State0#state{socket = XSocket});
{error, _} = Error ->
diff --git a/priv/test_config/bt_backend.config b/priv/test_config/bt_backend.config
index e5d207f..45cc5db 100644
--- a/priv/test_config/bt_backend.config
+++ b/priv/test_config/bt_backend.config
@@ -10,7 +10,7 @@
{ [routing_rules, ""], [{proto_json_rpc, dlink_bt_rpc}] },
{ [components, data_link], [{dlink_bt_rpc, gen_server,
[{server_opts, [{test_mode, tcp},
- {port, 8007}]}]}]}
+ {port, 8807}]}]}]}
]}
]}
].
diff --git a/priv/test_config/bt_sample.config b/priv/test_config/bt_sample.config
index 962f8a3..59ae564 100644
--- a/priv/test_config/bt_sample.config
+++ b/priv/test_config/bt_sample.config
@@ -11,7 +11,7 @@
[{server_opts, [{test_mode, tcp},
{port, 9007}]},
{persistent_connections,
- ["localhost:8007"]}]}]}
+ ["localhost:8807"]}]}]}
]}
]}
].
diff --git a/priv/test_config/tls_backend.config b/priv/test_config/tls_backend.config
index c5709ed..ad0d498 100644
--- a/priv/test_config/tls_backend.config
+++ b/priv/test_config/tls_backend.config
@@ -7,7 +7,7 @@
[
{ [routing_rules, ""], [{proto_msgpack_rpc, dlink_tls_rpc}] },
{ [components, data_link], [{dlink_tls_rpc, gen_server,
- [{server_opts, [{port, 8007}]}]}]},
+ [{server_opts, [{port, 8807}]}]}]},
{ [components, protocol], [{proto_msgpack_rpc, gen_server, []}] }
]}
]}
diff --git a/priv/test_config/tls_sample.config b/priv/test_config/tls_sample.config
index 1917309..be7f0f1 100644
--- a/priv/test_config/tls_sample.config
+++ b/priv/test_config/tls_sample.config
@@ -9,7 +9,7 @@
{ [components, data_link], [{dlink_tls_rpc, gen_server,
[{server_opts, [{port, 9007}]},
{persistent_connections,
- ["localhost:8007"]}]}]},
+ ["localhost:8807"]}]}]},
{ [components, protocol], [{ proto_msgpack_rpc, gen_server, [] }] }
]}
]}
diff --git a/priv/test_config/tlsj_backend.config b/priv/test_config/tlsj_backend.config
index cd0694f..69b5126 100644
--- a/priv/test_config/tlsj_backend.config
+++ b/priv/test_config/tlsj_backend.config
@@ -7,7 +7,7 @@
[
{ [routing_rules, ""], [{proto_json_rpc, dlink_tls_rpc}] },
{ [components, data_link], [{dlink_tls_rpc, gen_server,
- [{server_opts, [{port, 8007}]},
+ [{server_opts, [{port, 8807}]},
{packet_mod, dlink_data_json}
]}]},
{ [components, protocol], [{proto_json_rpc, gen_server, []}] }
diff --git a/priv/test_config/tlsj_sample.config b/priv/test_config/tlsj_sample.config
index db2abc5..829c13c 100644
--- a/priv/test_config/tlsj_sample.config
+++ b/priv/test_config/tlsj_sample.config
@@ -10,7 +10,7 @@
[{server_opts, [{port, 9007}]},
{packet_mod, dlink_data_json},
{persistent_connections,
- ["localhost:8007"]}]}]},
+ ["localhost:8807"]}]}]},
{ [components, protocol], [{ proto_json_rpc, gen_server, [] }] }
]}
]}
diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl
index 9846ae3..79281e2 100644
--- a/test/rvi_core_SUITE.erl
+++ b/test/rvi_core_SUITE.erl
@@ -39,6 +39,7 @@
]).
-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
-define(DATA, rvi_core_data).
@@ -46,9 +47,9 @@ all() ->
[
{group, test_install},
{group, test_run},
- {group, test_run_bt},
{group, test_run_tls},
- {group, test_run_tlsj}
+ {group, test_run_tlsj},
+ {group, test_run_bt}
].
groups() ->
@@ -79,15 +80,6 @@ groups() ->
t_remote_call_lock_service,
t_no_errors
]},
- {test_run_bt, [],
- [
- t_start_bt_backend,
- t_start_bt_sample,
- t_register_lock_service,
- t_call_lock_service,
- t_remote_call_lock_service,
- t_no_errors
- ]},
{test_run_tls, [],
[
t_start_tls_backend,
@@ -105,6 +97,15 @@ groups() ->
t_call_lock_service,
t_remote_call_lock_service,
t_no_errors
+ ]},
+ {test_run_bt, [],
+ [
+ t_start_bt_backend,
+ t_start_bt_sample,
+ t_register_lock_service,
+ t_call_lock_service,
+ t_remote_call_lock_service,
+ t_no_errors
]}
].
@@ -133,6 +134,7 @@ init_per_group(Grp, Config) ->
test_run -> ["basic_backend", "basic_sample"];
test_run_bt -> ["bt_backend", "bt_sample"];
test_run_tls -> ["tls_backend", "tls_sample"];
+ test_run_tlsj -> ["tlsj_backend", "tlsj_sample"];
_ -> []
end,
[{test_dir, CWD}, {test_nodes, TestNodes} | Config].
@@ -208,92 +210,42 @@ t_install_bt_backend_node(_Config) ->
t_install_bt_sample_node(_Config) ->
install_sample_node("bt_sample", "bt_sample.config").
+generic_start(Name) ->
+ F = filename:join([".", Name, "start_me.sh"]),
+ Cmd = [env(),
+ " ./", Name, "/rvi.sh",
+ " -s ", Name,
+ " -l ./", Name, "/rvi/log",
+ " -d ./", Name,
+ " -c ./", Name, "/priv/test_config/", Name, ".config",
+ " $1"],
+ ok = save_cmd(F, Cmd),
+ cmd([F, " start"]),
+ await_started(Name).
+
t_start_basic_backend(_Config) ->
- cmd([env(),
- " ./basic_backend/rvi.sh"
- " -s basic_backend"
- " -l ./basic_backend/rvi/log"
- " -d ./basic_backend"
- " -c ./basic_backend/priv/test_config/basic_backend.config"
- " start"]),
- await_started("basic_backend"),
- ok.
+ generic_start("basic_backend").
t_start_basic_sample(_Config) ->
- cmd([env(),
- " ./basic_sample/rvi.sh"
- " -s basic_sample"
- " -l ./basic_sample/rvi/log"
- " -d ./basic_sample"
- " -c ./basic_sample/priv/test_config/basic_sample.config"
- " start"]),
- await_started("basic_sample"),
- ok.
+ generic_start("basic_sample").
t_start_bt_backend(_Config) ->
- cmd([env(),
- " ./bt_backend/rvi.sh -s bt_backend"
- " -l ./bt_backend/rvi/log"
- " -d ./bt_backend"
- " -c ./bt_backend/priv/test_config/bt_backend.config"
- " start"]),
- await_started("bt_backend"),
- ok.
+ generic_start("bt_backend").
t_start_bt_sample(_Config) ->
- cmd([env(),
- " ./bt_sample/rvi.sh"
- " -s bt_sample"
- " -l ./bt_sample/rvi/log"
- " -d ./bt_sample"
- " -c ./bt_sample/priv/test_config/bt_sample.config"
- " start"]),
- await_started("bt_sample"),
- ok.
+ generic_start("bt_sample").
t_start_tls_backend(_Config) ->
- cmd([env(),
- " ./tls_backend/rvi.sh"
- " -s tls_backend"
- " -l ./tls_backend/rvi/log"
- " -d ./tls_backend"
- " -c ./tls_backend/priv/test_config/tls_backend.config"
- " start"]),
- await_started("tls_backend"),
- ok.
+ generic_start("tls_backend").
t_start_tls_sample(_Config) ->
- cmd([env(),
- " ./tls_sample/rvi.sh"
- " -s tls_sample"
- " -l ./tls_sample/rvi/log"
- " -d ./tls_sample"
- " -c ./tls_sample/priv/test_config/tls_sample.config"
- " start"]),
- await_started("tls_sample"),
- ok.
+ generic_start("tls_sample").
t_start_tlsj_backend(_Config) ->
- cmd([env(),
- " ./tlsj_backend/rvi.sh"
- " -s tlsj_backend"
- " -l ./tlsj_backend/rvi/log"
- " -d ./tlsj_backend"
- " -c ./tlsj_backend/priv/test_config/tlsj_backend.config"
- " start"]),
- await_started("tlsj_backend"),
- ok.
+ generic_start("tlsj_backend").
t_start_tlsj_sample(_Config) ->
- cmd([env(),
- " ./tlsj_sample/rvi.sh"
- " -s tlsj_sample"
- " -l ./tlsj_sample/rvi/log"
- " -d ./tlsj_sample"
- " -c ./tlsj_sample/priv/test_config/tlsj_sample.config"
- " start"]),
- await_started("tlsj_sample"),
- ok.
+ generic_start("tlsj_sample").
t_register_lock_service(_Config) ->
Pid =
@@ -326,7 +278,7 @@ t_remote_call_lock_service(_Config) ->
[{_, Svc}] = lookup({service, lock}),
ok = fetch(
Svc,
- {match, <<"Service invoked![\\s]*args: {u'arg1': u'val1'}">>}),
+ {match, <<"Service invoked![\\s]*args: {u'arg1': u'val2'}">>}),
ct:log("Verified service invoked~n", []),
CallRes = fetch(CallPid),
verify_call_res(join_stdout_msgs(CallRes)),
@@ -559,6 +511,12 @@ cmd(C) ->
cmd(C, Opts) ->
{ok, _Res} = cmd_(C, Opts).
+save_cmd(F, Cmd) ->
+ ct:log("save_cmd ~s:~n~s", [F, Cmd]),
+ ok = file:write_file(F, iolist_to_binary(Cmd)),
+ {ok, FI} = file:read_file_info(F),
+ ok = file:write_file_info(F, FI#file_info{mode = 8#755}, []).
+
cmd_(C0, Opts) ->
C = binary_to_list(iolist_to_binary(C0)),
CmdRes = exec:run(C, [sync, stdout, stderr] ++ Opts),