diff options
27 files changed, 413 insertions, 320 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index a362c7b..f651a60 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -197,14 +197,16 @@ handle_call_({validate_message, JWT, Conn}, _, S) -> {reply, validate_message_(JWT, Conn), S}; handle_call_({validate_service_call, Svc, Conn}, _, S) -> {reply, validate_service_call_(Svc, Conn), S}; -handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn, PeerCert, LogId}, _, S) -> +handle_call_({save_cred, Cred, JWT, {IP, Port} = Conn0, PeerCert, LogId}, _, S) -> + Conn = normalize_conn(Conn0), + ?debug("save_cred: ~p (Conn=~p, PeerCert=~p)", [Cred, Conn, abbrev(PeerCert)]), case process_cred_struct(Cred, JWT, PeerCert) of invalid -> log(LogId, warning, "cred INVALID Conn=~s:~w", [IP, Port]), {reply, {error, invalid}, S}; #cred{} = C -> ets:insert(?CREDS, {{Conn, C#cred.id}, C}), - log(LogId, result, "cred stored ~s Conn=~s:~w", [abbrev_bin(C#cred.id), IP, Port]), + log(LogId, result, "cred stored ~s Conn=~p", [abbrev_bin(C#cred.id), Conn]), {reply, ok, S} end; handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> @@ -244,14 +246,26 @@ creds_by_conn(Conn) -> ?debug("rough selection: ~p~n", [[{abbrev_bin(C),I} || {C,I} <- Creds]]), [C || {C,V} <- Creds, check_validity(V, UTC)]. -cred_recs_by_conn(Conn) -> - ?debug("cred_recs_by_conn(~p)~n", [Conn]), +cred_recs_by_conn(Conn0) -> + Conn = normalize_conn(Conn0), + ?debug("cred_recs_by_conn(~p)~nAll = ~p", [Conn, abbrev(ets:tab2list(?CREDS))]), UTC = rvi_common:utc_timestamp(), Creds = ets:select(?CREDS, [{ {{Conn,'_'}, '$1'}, [], ['$1'] }]), ?debug("rough selection: ~p~n", [[abbrev_bin(C#cred.id) || C <- Creds]]), [C || C <- Creds, check_validity(C#cred.validity, UTC)]. +normalize_conn(local) -> + local; +normalize_conn({IP, Port} = Conn) when is_binary(IP), is_binary(Port) -> + Conn; +normalize_conn({IP, Port}) -> + {to_bin(IP), to_bin(Port)}. + +to_bin(B) when is_binary(B) -> B; +to_bin(L) when is_list(L) -> iolist_to_binary(L); +to_bin(I) when is_integer(I) -> integer_to_binary(I). + filter_by_service_(Services, Conn) -> ?debug("Filter: creds = ~p", [[{K,abbrev_payload(V)} || {K,V} <- ets:tab2list(?CREDS)]]), Invoke = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{right_to_invoke = '$1', diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl index 6b3a64e..bcfa199 100644 --- a/components/dlink_bt/src/bt_connection.erl +++ b/components/dlink_bt/src/bt_connection.erl @@ -138,7 +138,7 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) -> {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS), PktSt = PktMod:init(CS), {ok, #st{ - remote_addr = BTAddr, + remote_addr = bt_addr(Mode, BTAddr), channel = Channel, rfcomm_ref = undefined, mode = Mode, @@ -394,9 +394,10 @@ handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod, inet_db:register_socket(Sock, inet_tcp), inet:setopts(Sock, [{active, once}]), {ok, {BTAddr, Channel}} = inet:peername(Sock), + ?debug("peername (tcp): ~p:~p", [BTAddr, Channel]), Mod:Fun(self(), BTAddr, Channel, accepted, Arg), {noreply, St#st{rfcomm_ref = Sock, - remote_addr = BTAddr}}; + remote_addr = bt_addr(tcp, BTAddr)}}; handle_info(_Info, State) -> ?warning("~p:handle_info(): Unknown info: ~p", [ ?MODULE, _Info]), @@ -442,3 +443,10 @@ handle_elements(Elements, #st{remote_addr = BTAddr, ?debug("data complete; processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg). + + +bt_addr(tcp, Addr) -> + {ok, IP} = inet:ip(Addr), + inet_parse:ntoa(IP); +bt_addr(bt, Addr) -> + Addr. diff --git a/components/dlink_bt/src/bt_listener.erl b/components/dlink_bt/src/bt_listener.erl index a1f1a49..06efd3d 100644 --- a/components/dlink_bt/src/bt_listener.erl +++ b/components/dlink_bt/src/bt_listener.erl @@ -42,7 +42,7 @@ accept_ack(Result, LRef, Addr, Chan) -> ok. sock_opts() -> - [binary, {active, once}, {packet, 0}]. + [{reuseaddr, true}, binary, {active, once}, {packet, 0}]. init(Mode) -> diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index 69310a7..c675387 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -651,11 +651,19 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> - {ok,[{address, Address }]} = bt_drv:local_info([address]), + {Address, Channel} = + case Mode = get_mode(CompSpec) of + bt -> + {ok,[{address, Addr}]} = bt_drv:local_info([address]), + {bt_address_to_string(Addr), SetupChannel}; + tcp -> + {IP, Port} = rvi_common:node_address_tuple(), + {IP, integer_to_binary(Port)} + end, bt_connection:send(Pid, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, - { ?DLINK_ARG_PORT, SetupChannel }, + { ?DLINK_ARG_ADDRESS, Address }, + { ?DLINK_ARG_PORT, Channel }, { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } | log_id_tail(CompSpec)]). diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index b24215c..7229b3c 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -231,7 +231,7 @@ handle_info({tcp, Sock, Data}, port = Port, packet_mod = PMod, packet_st = PSt} = State) -> - ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), + ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]), case PMod:decode(Data, fun(Elems) -> handle_elements(Elems, State) end, PSt) of diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl index 6a9f1e0..e16f789 100644 --- a/components/dlink_tcp/src/connection_manager.erl +++ b/components/dlink_tcp/src/connection_manager.erl @@ -2,10 +2,10 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% -%% +%% %%%------------------------------------------------------------------- %%% @author magnus <magnus@t520.home> %%% @copyright (C) 2014, magnus @@ -33,7 +33,10 @@ -export([find_connection_by_address/2]). -export([connections/0]). --define(SERVER, ?MODULE). +-define(SERVER, ?MODULE). + +-define(PID_TAB, dlink_tcp_conn_by_pid). +-define(ADDR_TAB, dlink_tcp_conn_by_addr). -record(st, { conn_by_pid = undefined, @@ -70,6 +73,7 @@ connections() -> %% @end %%-------------------------------------------------------------------- start_link() -> + create_ets(), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== @@ -93,6 +97,18 @@ init([]) -> conn_by_addr = dict:new() %% All managed connection stored by address }}. +create_ets() -> + maybe_create(?PID_TAB), + maybe_create(?ADDR_TAB). + +maybe_create(Tab) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, [public, named_table, set]); + _ -> + Tab + end. + %%-------------------------------------------------------------------- %% @private %% @doc @@ -107,106 +123,84 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call({add_connection, IP, Port, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - - ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", +handle_call({add_connection, IP, Port, Pid}, _From, St) -> + ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", [ ?MODULE, Pid, { IP, Port }]), %% Store so that we can find connection both by pid and by address - NConPid = dict:store(Pid, { IP, Port }, ConPid), - NConAddr = dict:store({ IP, Port }, Pid, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt}; + ets_insert(?PID_TAB, {Pid, {IP, Port}}), + ets_insert(?ADDR_TAB, {{IP, Port}, Pid}), + {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) when is_pid(Pid)-> - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(del_by_pid): not found: ~p", + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(del_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - - {ok, Addr } -> - ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, Addr]), - - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase(Addr, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, + [{_, Addr}] -> + ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, Addr]), - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Delete connection by address -handle_call({ delete_connection_by_address, IP, Port}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> %% Find Pid associated with Address - case dict:find({IP, Port}, ConAddr) of - error -> - ?debug("~p:handle_call(del_by_addr): not found: ~p", - [ ?MODULE, {IP, Port}]), + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> + ?debug("~p:handle_call(del_by_addr): not found: ~p", + [ ?MODULE, Addr]), { reply, not_found, St}; - - {ok, Pid } -> - ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", - [ ?MODULE, Pid, {IP, Port}]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase({ IP, Port }, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt} - end; + [{_, Pid}] -> + ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, Addr]), + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} + end; %% Find connection by pid -handle_call({ find_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> - +handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(find_by_pid): not found: ~p", + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(find_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - - {ok, {IP, Port} } -> - ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", + + [{_, {IP, Port}}] -> + ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", [ ?MODULE, Pid, {IP, Port}]), {reply, {ok, IP, Port}, St} end; %% Find connection by address -handle_call({find_connection_by_address, IP, Port}, _From, - #st { conn_by_addr = ConAddr} = St) -> - +handle_call({find_connection_by_address, IP, Port}, _From, St) -> %% Find address associated with Pid - case dict:find({IP, Port}, ConAddr) of - error -> - ?debug("~p:handle_call(find_by_addr): not found: ~p", + case ets_lookup(?ADDR_TAB, {IP, Port}) of + [] -> + ?debug("~p:handle_call(find_by_addr): not found: ~p", [ ?MODULE, {IP, Port}]), { reply, not_found, St}; - - {ok, Pid } -> - ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", + + [{_, Pid}] -> + ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", [ ?MODULE, {IP, Port}, Pid]), {reply, {ok, Pid}, St} end; -handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> - {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; +handle_call(connections, _From, St) -> + {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1']}]), St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), @@ -269,3 +263,16 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% Ets wrapper functions to simplify tracing. +ets_lookup(Tab, Key) -> + ets:lookup(Tab, Key). + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pattern) -> + ets:select(Tab, Pattern). diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 83e0a24..68d32d7 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -94,11 +94,11 @@ start_json_server() -> start_connection_manager() -> %% Fire up listener CompSpec = rvi_common:get_component_specification(), - connection_manager:start_link(), + %% connection_manager:start_link(), ?info("dlink_tcp:init_rvi_component(~p): Starting listener.", [self()]), - {ok,Pid} = listener:start_link(), + %% {ok,Pid} = listener:start_link(), %% - setup_initial_listeners(Pid, CompSpec), + setup_initial_listeners(CompSpec), ?info("dlink_tcp:init_rvi_component(): Setting up persistent connections."), @@ -112,7 +112,7 @@ start_connection_manager() -> ok. -setup_initial_listeners(Pid, CompSpec) -> +setup_initial_listeners(CompSpec) -> case rvi_common:get_module_config(data_link, ?MODULE, ?SERVER_OPTS, @@ -123,7 +123,7 @@ setup_initial_listeners(Pid, CompSpec) -> ?info("dlink_tcp:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), %% %% Add listener port. - case listener:add_listener(Pid, IP, Port, CompSpec) of + case listener:add_listener(IP, Port, CompSpec) of ok -> ?notice("---- RVI Node External Address: ~s", [ application:get_env(rvi_core, node_address, undefined)]); diff --git a/components/dlink_tcp/src/dlink_tcp_sup.erl b/components/dlink_tcp/src/dlink_tcp_sup.erl index edb9c82..04f5255 100644 --- a/components/dlink_tcp/src/dlink_tcp_sup.erl +++ b/components/dlink_tcp/src/dlink_tcp_sup.erl @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -34,6 +34,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ + ?CHILD(connection_manager, worker), + ?CHILD(listener, worker), ?CHILD(dlink_tcp_rpc, worker) ]} }. - diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl index af72189..ae0a605 100644 --- a/components/dlink_tcp/src/gen_nb_server.erl +++ b/components/dlink_tcp/src/gen_nb_server.erl @@ -43,6 +43,7 @@ terminate/2, code_change/3]). +-include_lib("lager/include/log.hrl"). -define(SERVER, ?MODULE). -record(state, {cb, @@ -203,11 +204,13 @@ code_change(_OldVsn, State, _Extra) -> %% Result = {ok, port()} | {error, any()} listen_on(CallbackModule, IpAddr, Port) -> SockOpts = [{reuseaddr, true}, {ip, convert(IpAddr)}] ++ CallbackModule:sock_opts(), + ?debug("listen on ~p:~p, Opts = ~p", [IpAddr, Port, SockOpts]), case gen_tcp:listen(Port, SockOpts) of {ok, LSock} -> {ok, _Ref} = prim_inet:async_accept(LSock, -1), {ok, LSock}; Err -> + ?debug("listen error: ~p", [Err]), Err end. diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 4512a59..6def59a 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -13,29 +13,56 @@ -include_lib("lager/include/log.hrl"). -export([start_link/0, - add_listener/4, - remove_listener/3]). + add_listener/3, + remove_listener/2]). -export([init/2, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2, sock_opts/0, new_connection/4]). -behavior(gen_nb_server). +-define(TAB, dlink_tcp_listener_tab). + start_link() -> - gen_nb_server:start_link(?MODULE, []). + create_ets(), + gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []). + +create_ets() -> + case ets:info(?TAB, name) of + undefined -> ets:new(?TAB, [public, named_table, set]); + _ -> ?TAB + end. -add_listener(Pid, IpAddr, Port, CompSpec) -> - gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}). +add_listener(IpAddr, Port, CompSpec) -> + gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}). -remove_listener(Pid, IpAddr, Port) -> - gen_server:call(Pid, {remove_listener, IpAddr, Port}). +remove_listener(IpAddr, Port) -> + gen_server:call(?MODULE, {remove_listener, IpAddr, Port}). init([], State) -> - {ok, State}. + case ets_select(?TAB, [{ '_', [], ['$_'] }]) of + [] -> + {ok, State}; + Addrs -> + lists:foldl( + fun({{_, _} = Addr}, Acc) -> + case gen_nb_server:add_listen_socket(Addr, Acc) of + {ok, Acc1} -> + Acc1; + _Error -> + ets_delete(?TAB, Addr), + Acc + end; + ({cs, CS}, Acc) -> + gen_nb_server:store_cb_state(CS, Acc) + end, State, Addrs) + end. handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> + ets_insert(?TAB, {cs, CompSpec}), case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_insert(?TAB, {{IpAddr,Port}}), {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; Error -> @@ -45,6 +72,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> handle_call({remove_listener, IpAddr, Port}, _From, State) -> case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_delete(?TAB, {IpAddr, Port}), {reply, ok, State1}; Error -> {reply, Error, State} @@ -77,3 +105,12 @@ new_connection(IP, Port, Sock, State) -> dlink_tcp_rpc, handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index 93266b1..be55dc6 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -28,7 +28,8 @@ terminate/2, code_change/3]). -export([setup/6]). --export([upgrade/3]). +-export([upgrade/3, + async_upgrade/3]). -export([send/2]). -export([send/3]). -export([is_connection_up/1]). @@ -77,6 +78,10 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) -> upgrade(Pid, Role, CompSpec) when Role==client; Role==server -> gen_server:call(Pid, {upgrade, Role, CompSpec}). +async_upgrade(Pid, Role, CompSpec) when Role==client; + Role==server -> + gen_server:cast(Pid, {upgrade, Role, CompSpec}). + send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). @@ -186,26 +191,9 @@ handle_call(terminate_connection, _From, St) -> {stop, Reason, ok, NSt}; handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) -> ?debug("~p:handle_call(~p)~n", [?MODULE, Req]), - - {ok, [{active, Last}]} = inet:getopts(S, [active]), - inet:setopts(S, [{active, false}]), - case do_upgrade(S, Role, CompSpec) of - {ok, NewS} -> - ?debug("upgrade to TLS succcessful~n", []), - ssl:setopts(NewS, [{active, Last}]), - {ok, {IP, Port}} = ssl:peername(NewS), - {ok, PeerCert} = ssl:peercert(NewS), - ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), - NewCS = rvi_common:set_value( - dlink_tls_role, Role, - rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)), - {reply, ok, St#st{sock = NewS, mode = tls, role = Role, - ip = inet_parse:ntoa(IP), port = Port, - cs = NewCS}}; - Error -> - ?error("Cannot upgrade to TLS: ~p~n", [Error]), - {stop, Error, Error, St} - end; + %% deliberately crash (for now) if upgrade fails. + {Reply, St1} = handle_upgrade(Role, CompSpec, St), + {reply, Reply, St1}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), Reply = ok, @@ -221,6 +209,9 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- +handle_cast({upgrade, Role, CompSpec}, St) -> + {_, St1} = handle_upgrade(Role, CompSpec, St), + {noreply, St1}; handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_call(send): Sending: ~p", [ ?MODULE, abbrev(Data)]), @@ -350,6 +341,26 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +handle_upgrade(Role, CompSpec, #st{sock = S} = St) -> + {ok, [{active, Last}]} = inet:getopts(S, [active]), + inet:setopts(S, [{active, false}]), + case do_upgrade(S, Role, CompSpec) of + {ok, NewS} -> + ?debug("upgrade to TLS succcessful~n", []), + ssl:setopts(NewS, [{active, Last}]), + {ok, {IP, Port}} = ssl:peername(NewS), + {ok, PeerCert} = ssl:peercert(NewS), + ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), + NewCS = rvi_common:set_value( + dlink_tls_role, Role, + rvi_common:set_value(dlink_tls_peer_cert, PeerCert, CompSpec)), + {ok, St#st{sock = NewS, mode = tls, role = Role, + ip = inet_parse:ntoa(IP), port = Port, + cs = NewCS}}; + Error -> + ?error("Cannot upgrade to TLS: ~p~n", [Error]), + error({cannot_upgrade, Error}) + end. do_upgrade(Sock, client, CompSpec) -> Opts = tls_opts(client, CompSpec), diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl index 4947ee6..31e51bd 100644 --- a/components/dlink_tls/src/dlink_tls_connmgr.erl +++ b/components/dlink_tls/src/dlink_tls_connmgr.erl @@ -35,11 +35,10 @@ -export([connections/0]). -define(SERVER, ?MODULE). +-define(PID_TAB, dlink_tls_pid_tab). +-define(ADDR_TAB, dlink_tls_addr_tab). --record(st, { - conn_by_pid = undefined, - conn_by_addr = undefined - }). +-record(st, {}). %%%=================================================================== %%% API @@ -71,8 +70,21 @@ connections() -> %% @end %%-------------------------------------------------------------------- start_link() -> + create_ets(), gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +create_ets() -> + maybe_create(?PID_TAB), + maybe_create(?ADDR_TAB). + +maybe_create(Tab) -> + case ets:info(Tab, name) of + undefined -> + ets:new(Tab, [public, named_table, set]); + _ -> + Tab + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -89,10 +101,7 @@ start_link() -> %% @end %%-------------------------------------------------------------------- init([]) -> - {ok, #st{ - conn_by_pid = dict:new(), %% All managed connection stored by pid - conn_by_addr = dict:new() %% All managed connection stored by address - }}. + {ok, #st{}}. %%-------------------------------------------------------------------- %% @private @@ -108,106 +117,83 @@ init([]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call({add_connection, IP, Port, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({add_connection, IP, Port, Pid}, _From, St) -> + Addr = {IP, Port}, ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", - [ ?MODULE, Pid, { IP, Port }]), + [ ?MODULE, Pid, Addr]), %% Store so that we can find connection both by pid and by address - NConPid = dict:store(Pid, { IP, Port }, ConPid), - NConAddr = dict:store({ IP, Port }, Pid, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt}; + ets_insert(?PID_TAB, {Pid, Addr}), + ets_insert(?ADDR_TAB, {Addr, Pid}), + {reply, ok, St}; %% Delete connection by pid -handle_call({delete_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) when is_pid(Pid)-> - +handle_call({delete_connection_by_pid, Pid}, _From, St) when is_pid(Pid) -> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> + case ets_lookup(?PID_TAB, Pid) of + [] -> ?debug("~p:handle_call(del_by_pid): not found: ~p", [ ?MODULE, Pid]), { reply, not_found, St}; - {ok, Addr } -> + [{_, Addr}] -> ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", [ ?MODULE, Pid, Addr]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase(Addr, ConAddr), - - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Delete connection by address -handle_call({ delete_connection_by_address, IP, Port}, _From, - #st { conn_by_pid = ConPid, - conn_by_addr = ConAddr} = St) -> - +handle_call({ delete_connection_by_address, IP, Port}, _From, St) -> %% Find Pid associated with Address - case dict:find({IP, Port}, ConAddr) of - error -> + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> ?debug("~p:handle_call(del_by_addr): not found: ~p", [ ?MODULE, {IP, Port}]), { reply, not_found, St}; - - {ok, Pid } -> + [{_, Pid}] -> ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", [ ?MODULE, Pid, {IP, Port}]), - NConPid = dict:erase(Pid, ConPid), - NConAddr = dict:erase({ IP, Port }, ConAddr), - NSt = St#st { conn_by_pid = NConPid, - conn_by_addr = NConAddr }, - {reply, ok, NSt} + ets_delete(?PID_TAB, Pid), + ets_delete(?ADDR_TAB, Addr), + {reply, ok, St} end; %% Find connection by pid -handle_call({ find_connection_by_pid, Pid}, _From, - #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> - +handle_call({ find_connection_by_pid, Pid}, _From, St) when is_pid(Pid)-> %% Find address associated with Pid - case dict:find(Pid, ConPid) of - error -> - ?debug("~p:handle_call(find_by_pid): not found: ~p", - [ ?MODULE, Pid]), + case ets_lookup(?PID_TAB, Pid) of + [] -> + ?debug("~p:handle_call(find_by_pid): not found: ~p~n~p", + [ ?MODULE, Pid, ets:tab2list(?PID_TAB)]), { reply, not_found, St}; - - {ok, {IP, Port} } -> + [{_, {IP, Port}}] -> ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", [ ?MODULE, Pid, {IP, Port}]), {reply, {ok, IP, Port}, St} end; %% Find connection by address -handle_call({find_connection_by_address, IP, Port}, _From, - #st { conn_by_addr = ConAddr} = St) -> - +handle_call({find_connection_by_address, IP, Port}, _From, St) -> %% Find address associated with Pid - case dict:find({IP, Port}, ConAddr) of - error -> + Addr = {IP, Port}, + case ets_lookup(?ADDR_TAB, Addr) of + [] -> ?debug("~p:handle_call(find_by_addr): not found: ~p", - [ ?MODULE, {IP, Port}]), - + [ ?MODULE, Addr]), { reply, not_found, St}; - - {ok, Pid } -> + [{_, Pid}] -> ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", - [ ?MODULE, {IP, Port}, Pid]), + [ ?MODULE, Addr, Pid]), {reply, {ok, Pid}, St} end; -handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> - {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; +handle_call(connections, _From, St) -> + {reply, ets_select(?ADDR_TAB, [{ {'$1','_'}, [], ['$1'] }]), St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), @@ -270,3 +256,15 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +ets_lookup(Tab, Key) -> + ets:lookup(Tab, Key). + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl index 0effc66..7fdc4d2 100644 --- a/components/dlink_tls/src/dlink_tls_listener.erl +++ b/components/dlink_tls/src/dlink_tls_listener.erl @@ -14,29 +14,57 @@ -include_lib("lager/include/log.hrl"). -export([start_link/0, - add_listener/4, - remove_listener/3]). + add_listener/3, + remove_listener/2]). -export([init/2, handle_call/3, handle_cast/2, handle_info/2]). -export([terminate/2, sock_opts/0, new_connection/4]). -behavior(gen_nb_server). +-define(TAB, dlink_tls_listener_tab). + start_link() -> - gen_nb_server:start_link(?MODULE, []). + create_tabs(), + gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []). -add_listener(Pid, IpAddr, Port, CompSpec) -> - gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}). +add_listener(IpAddr, Port, CompSpec) -> + gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}). -remove_listener(Pid, IpAddr, Port) -> - gen_server:call(Pid, {remove_listener, IpAddr, Port}). +remove_listener(IpAddr, Port) -> + gen_server:call(?MODULE, {remove_listener, IpAddr, Port}). init([], State) -> - {ok, State}. + State1 = + lists:foldl( + fun({{_,_}} = Addr, Acc) -> + case gen_nb_server:add_listen_socket(Addr, Acc) of + {ok, Acc1} -> + ets_insert(?TAB, {Addr}), + Acc1; + _Error -> + ets_delete(?TAB, Addr), + Acc + end; + ({cs, CS}, Acc) -> + ets_insert(?TAB, {cs, CS}), + gen_nb_server:store_cb_state(CS, Acc) + end, State, ets_select(?TAB, [{ '_', [], ['$_'] }])), + {ok, State1}. + +create_tabs() -> + case ets:info(?TAB, name) of + undefined -> + ets:new(?TAB, [public, named_table, set]); + _ -> + ?TAB + end. handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> + ets_insert(?TAB, {cs, CompSpec}), case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_insert(?TAB, {{IpAddr, Port}}), {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; Error -> @@ -46,6 +74,7 @@ handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> handle_call({remove_listener, IpAddr, Port}, _From, State) -> case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of {ok, State1} -> + ets_delete(?TAB, {IpAddr, Port}), {reply, ok, State1}; Error -> {reply, Error, State} @@ -79,5 +108,15 @@ new_connection(IP, Port, Sock, State) -> undefined, 0, Sock, dlink_tls_rpc, handle_socket, CompSpec), - dlink_tls_conn:upgrade(P, server, CompSpec), + dlink_tls_conn:async_upgrade(P, server, CompSpec), {ok, State}. + + +ets_insert(Tab, Obj) -> + ets:insert(Tab, Obj). + +ets_delete(Tab, Key) -> + ets:delete(Tab, Key). + +ets_select(Tab, Pat) -> + ets:select(Tab, Pat). diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 2f47792..c0fb3a5 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -103,10 +103,10 @@ start_connection_manager() -> ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), %% Fire up listener - dlink_tls_connmgr:start_link(), - {ok,Pid} = dlink_tls_listener:start_link(), + %% dlink_tls_connmgr:start_link(), + %% {ok,Pid} = dlink_tls_listener:start_link(), - setup_initial_listeners(Pid, TlsOpts, CompSpec), + setup_initial_listeners(TlsOpts, CompSpec), ?info("dlink_tls:init_rvi_component(): Setting up persistent connections."), @@ -118,14 +118,14 @@ start_connection_manager() -> setup_persistent_connections_(PersistentConnections, CompSpec), ok. -setup_initial_listeners(Pid, [], CompSpec) -> +setup_initial_listeners([], CompSpec) -> ?debug("no initial listeners", []); -setup_initial_listeners(Pid, [_|_] = TlsOpts, CompSpec) -> +setup_initial_listeners([_|_] = TlsOpts, CompSpec) -> IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), %% Add listener port. ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), - case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of + case dlink_tls_listener:add_listener(IP, Port, CompSpec) of ok -> ?notice("---- RVI Node External Address: ~s", [ application:get_env(rvi_core, node_address, undefined)]); @@ -212,12 +212,18 @@ connect_remote(IP, Port, CompSpec) -> %% Setup a genserver around the new connection. {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, ?MODULE, handle_socket, CompSpec), - UgRes = dlink_tls_conn:upgrade(Pid, client, CompSpec), - ?debug("Upgrade result = ~p", [UgRes]), - %% Send authorize - send_authorize(Pid, CompSpec), - ok; - + try dlink_tls_conn:upgrade(Pid, client, CompSpec) of + ok -> + ?debug("Upgrade result = ~p", [ok]), + %% Send authorize + send_authorize(Pid, CompSpec), + ok + catch + error:Error -> + ?error("TLS upgrade (~p,~p) failed ~p", + [IP, Port, Error]), + not_available + end; {error, Err } -> ?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p", [IP, Port, Err]), diff --git a/components/dlink_tls/src/dlink_tls_sup.erl b/components/dlink_tls/src/dlink_tls_sup.erl index cd59434..2ede068 100644 --- a/components/dlink_tls/src/dlink_tls_sup.erl +++ b/components/dlink_tls/src/dlink_tls_sup.erl @@ -35,5 +35,7 @@ start_link() -> init([]) -> {ok, { {one_for_one, 5, 10}, [ + ?CHILD(dlink_tls_connmgr, worker), + ?CHILD(dlink_tls_listener, worker), ?CHILD(dlink_tls_rpc, worker) ]} }. diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl index 2b1f59c..e718e4d 100644 --- a/components/proto_msgpack/src/proto_msgpack_rpc.erl +++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl @@ -75,6 +75,7 @@ receive_message(CompSpec, {IP, Port}, Data) -> %% CAlled by local exo http server handle_rpc("send_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args), {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args), {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args), @@ -89,7 +90,8 @@ handle_rpc("send_message", Args) -> ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters]}), + Parameters, + LogId]}), {ok, [ {status, rvi_common:json_rpc_status(ok)} ]}; @@ -99,10 +101,14 @@ handle_rpc(Other, _Args) -> handle_notification("receive_message", Args) -> + LogId = rvi_common:get_json_log_id(Args), {ok, Data} = rvi_common:get_json_element(["data"], Args), {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args), {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args), - gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}), + gen_server:cast(?SERVER, { rvi, receive_message, [Data, + RemoteIP, + RemotePort, + LogId]}), ok; handle_notification(Other, _Args) -> @@ -117,7 +123,8 @@ handle_call({rvi, send_message, ProtoOpts, DataLinkMod, DataLinkOpts, - Parameters]}, _From, St) -> + Parameters + | LogId]}, _From, St) -> ?debug(" protocol:send(): transaction id: ~p~n", [TID]), ?debug(" protocol:send(): service name: ~p~n", [ServiceName]), ?debug(" protocol:send(): timeout: ~p~n", [Timeout]), @@ -139,7 +146,7 @@ handle_call(Other, _From, St) -> %% Convert list-based data to binary. -handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) -> +handle_cast({rvi, receive_message, [Payload, IP, Port | LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), {ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts), diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl index 962252c..067485c 100644 --- a/components/rvi_common/src/rvi_log.erl +++ b/components/rvi_common/src/rvi_log.erl @@ -357,7 +357,7 @@ purge_id() -> '$end_of_table' -> %% Should not be possible ... ok; - {Tid} -> + Tid -> ets:delete(?IDS, Tid), ets:match_delete(?EVENTS, #evt{id = {Tid,'_'}, _ = '_'}) end. diff --git a/components/service_edge/src/wse_server.erl b/components/service_edge/src/wse_server.erl index e9897a7..3ce1598 100644 --- a/components/service_edge/src/wse_server.erl +++ b/components/service_edge/src/wse_server.erl @@ -47,7 +47,7 @@ connection, %% 'Connection' key, %% "Sec-WebSocket-Key" protocol, %% "Sec-WebSocket-Protocol" - origin, %% + origin, %% version, %% "Sec-WebSocket-Version" cookie, %% 'Cookie' hs = [] @@ -77,10 +77,10 @@ %% but is included here to make the example self-contained -start(Port, M, F, A) when is_integer(Port) -> +start(Port, M, F, A) when is_integer(Port) -> start_([{cb, {M,F,A}}, {port,Port}]). -start(Port,M,F,A, Opts) when is_integer(Port) -> +start(Port,M,F,A, Opts) when is_integer(Port) -> start_([{port,Port}, {cb, {M,F,A}}] ++ Opts). start_(Opts) -> spawn(fun() -> init(Opts) end). @@ -88,7 +88,7 @@ start_(Opts) -> spawn(fun() -> init(Opts) end). stop(RegName) when is_atom(RegName) -> RegName ! stop. - + init(Opts) -> Port = proplists:get_value(port, Opts, ?WSE_DEFAULT_PORT), @@ -130,7 +130,7 @@ accept_loop(Listen,Opts,Pid) -> gen_tcp:close(Listen), exit(stopped) end. - + accept(Parent, Listen, Opts) -> ?debug("Accept ~p\n", [Listen]), case gen_tcp:accept(Listen) of @@ -146,7 +146,7 @@ accept(Parent, Listen, Opts) -> send(Pid, Data) -> - try + try Pid ! { send, Data }, ok catch @@ -157,7 +157,7 @@ send(Pid, Data) -> close(Pid) -> - try + try Pid ! close, ok catch @@ -284,7 +284,6 @@ ws_loop(Buf, Socket, S) -> receive %% WebSocket stuff {tcp, Socket, Data} -> - ?debug("tcp ~w: ~p", [Socket, Data]), ws_data(Buf, Data, Socket, S); {tcp_closed, Socket} -> @@ -293,14 +292,14 @@ ws_loop(Buf, Socket, S) -> {'EXIT',Pid,Reason} -> case get(parent) of - Pid -> + Pid -> ?debug("exit from parent ~w reason=~p\n", [Pid, Reason]), exit(Reason); _ -> ?debug("exit from ~w reason=~p\n", [Pid, Reason]), ws_loop(Buf, Socket, S) end; - + Message -> ?debug("handle_local: ~p - ~p", [Message, S]), case handle_local(Message, Socket, S) of @@ -313,23 +312,20 @@ ws_loop(Buf, Socket, S) -> end end. - + ws_data(Buf, Data, Socket, S) -> case <<Buf/binary, Data/binary>> of %% masked data <<Fin:1,_Rsv:3,Op:4,1:1,126:7,L:16,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket, Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); <<Fin:1,_Rsv:3,Op:4,1:1,127:7,L:64,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket,Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); <<Fin:1,_Rsv:3,Op:4,1:1,L:7,M:4/binary,Frag:L/binary,Buf1/binary>> -> - ?debug("unmask fragment: mask=~p, frag=~p", [M, Frag]), Frag1 = ws_mask(M, Frag), S1 = ws_fragment(Socket,Fin, Op, Frag1, S), ws_data(Buf1, <<>>, Socket, S1); @@ -364,9 +360,7 @@ ws_mask(<<M:32>>, Frag) -> ws_fragment(Socket,1, Op, Frag, S) -> Payload = iolist_to_binary(lists:reverse([Frag|S#s.fs])), - ?debug("op=~w, unmasked payload = ~p", [ws_opcode(Op),Payload]), Message = ws_decode(Payload,Op), - ?debug("handle_remote: ~p", [Message]), handle_remote(Message, Socket, S#s { fs=[] }); ws_fragment(_Socket, 0, _Op, Frag, S) -> @@ -405,7 +399,7 @@ ws_make_frame(Fin, Op, Mask, Data) -> <<Fin:1,0:3,Op:4,M:1,127:7,L:64,Mask/binary,Data/binary>> end. - + handle_local({ send,Data},Socket,S0) -> ?debug("wse_server:send(): ~p", [ Data]), gen_tcp:send(Socket, ws_make_server_frame(Data, S0#s.type)), @@ -492,7 +486,7 @@ stop_pong_timer(S0) -> if is_reference(Tmr) -> erlang:cancel_timer(Tmr), receive - {timeout,Tmr,pong} -> + {timeout,Tmr,pong} -> ok after 0 -> ok diff --git a/deps/exo/src/exo_socket_server.erl b/deps/exo/src/exo_socket_server.erl index db67c5a..d134a45 100644 --- a/deps/exo/src/exo_socket_server.erl +++ b/deps/exo/src/exo_socket_server.erl @@ -280,9 +280,9 @@ handle_info({inet_async, LSocket, Ref, {ok,Socket}} = _Msg, State) when NewAccept = exo_socket:async_accept(Listen), case exo_socket:async_socket(Listen, Socket, [{delay_auth, true}]) of {ok, XSocket} -> - F = fun() -> + F = fun(X) -> exo_socket:accept( - XSocket, tl(XSocket#exo_socket.protocol), AcceptTimeout) + X, tl(X#exo_socket.protocol), AcceptTimeout) end, XSocketFun = {XSocket, F}, case exo_socket_session:start(XSocketFun, diff --git a/deps/exo/src/exo_socket_session.erl b/deps/exo/src/exo_socket_session.erl index f4518db..189be61 100644 --- a/deps/exo/src/exo_socket_session.erl +++ b/deps/exo/src/exo_socket_session.erl @@ -176,8 +176,8 @@ send_(Bin, From, #state{socket = S, pending = P} = State) -> handle_cast({activate,Active}, #state{socket = XSocket0} = State0) -> ?dbg("activate~n", []), case XSocket0 of - {#exo_socket{}, Fun} when is_function(Fun, 0) -> - try Fun() of + {#exo_socket{} = X, Fun} when is_function(Fun, 1) -> + try Fun(X) of {ok, XSocket} -> activate_(Active, State0#state{socket = XSocket}); {error, _} = Error -> diff --git a/priv/test_config/bt_backend.config b/priv/test_config/bt_backend.config index e5d207f..45cc5db 100644 --- a/priv/test_config/bt_backend.config +++ b/priv/test_config/bt_backend.config @@ -10,7 +10,7 @@ { [routing_rules, ""], [{proto_json_rpc, dlink_bt_rpc}] }, { [components, data_link], [{dlink_bt_rpc, gen_server, [{server_opts, [{test_mode, tcp}, - {port, 8007}]}]}]} + {port, 8807}]}]}]} ]} ]} ]. diff --git a/priv/test_config/bt_sample.config b/priv/test_config/bt_sample.config index 962f8a3..59ae564 100644 --- a/priv/test_config/bt_sample.config +++ b/priv/test_config/bt_sample.config @@ -11,7 +11,7 @@ [{server_opts, [{test_mode, tcp}, {port, 9007}]}, {persistent_connections, - ["localhost:8007"]}]}]} + ["localhost:8807"]}]}]} ]} ]} ]. diff --git a/priv/test_config/tls_backend.config b/priv/test_config/tls_backend.config index c5709ed..ad0d498 100644 --- a/priv/test_config/tls_backend.config +++ b/priv/test_config/tls_backend.config @@ -7,7 +7,7 @@ [ { [routing_rules, ""], [{proto_msgpack_rpc, dlink_tls_rpc}] }, { [components, data_link], [{dlink_tls_rpc, gen_server, - [{server_opts, [{port, 8007}]}]}]}, + [{server_opts, [{port, 8807}]}]}]}, { [components, protocol], [{proto_msgpack_rpc, gen_server, []}] } ]} ]} diff --git a/priv/test_config/tls_sample.config b/priv/test_config/tls_sample.config index 1917309..be7f0f1 100644 --- a/priv/test_config/tls_sample.config +++ b/priv/test_config/tls_sample.config @@ -9,7 +9,7 @@ { [components, data_link], [{dlink_tls_rpc, gen_server, [{server_opts, [{port, 9007}]}, {persistent_connections, - ["localhost:8007"]}]}]}, + ["localhost:8807"]}]}]}, { [components, protocol], [{ proto_msgpack_rpc, gen_server, [] }] } ]} ]} diff --git a/priv/test_config/tlsj_backend.config b/priv/test_config/tlsj_backend.config index cd0694f..69b5126 100644 --- a/priv/test_config/tlsj_backend.config +++ b/priv/test_config/tlsj_backend.config @@ -7,7 +7,7 @@ [ { [routing_rules, ""], [{proto_json_rpc, dlink_tls_rpc}] }, { [components, data_link], [{dlink_tls_rpc, gen_server, - [{server_opts, [{port, 8007}]}, + [{server_opts, [{port, 8807}]}, {packet_mod, dlink_data_json} ]}]}, { [components, protocol], [{proto_json_rpc, gen_server, []}] } diff --git a/priv/test_config/tlsj_sample.config b/priv/test_config/tlsj_sample.config index db2abc5..829c13c 100644 --- a/priv/test_config/tlsj_sample.config +++ b/priv/test_config/tlsj_sample.config @@ -10,7 +10,7 @@ [{server_opts, [{port, 9007}]}, {packet_mod, dlink_data_json}, {persistent_connections, - ["localhost:8007"]}]}]}, + ["localhost:8807"]}]}]}, { [components, protocol], [{ proto_json_rpc, gen_server, [] }] } ]} ]} diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl index 9846ae3..79281e2 100644 --- a/test/rvi_core_SUITE.erl +++ b/test/rvi_core_SUITE.erl @@ -39,6 +39,7 @@ ]). -include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). -define(DATA, rvi_core_data). @@ -46,9 +47,9 @@ all() -> [ {group, test_install}, {group, test_run}, - {group, test_run_bt}, {group, test_run_tls}, - {group, test_run_tlsj} + {group, test_run_tlsj}, + {group, test_run_bt} ]. groups() -> @@ -79,15 +80,6 @@ groups() -> t_remote_call_lock_service, t_no_errors ]}, - {test_run_bt, [], - [ - t_start_bt_backend, - t_start_bt_sample, - t_register_lock_service, - t_call_lock_service, - t_remote_call_lock_service, - t_no_errors - ]}, {test_run_tls, [], [ t_start_tls_backend, @@ -105,6 +97,15 @@ groups() -> t_call_lock_service, t_remote_call_lock_service, t_no_errors + ]}, + {test_run_bt, [], + [ + t_start_bt_backend, + t_start_bt_sample, + t_register_lock_service, + t_call_lock_service, + t_remote_call_lock_service, + t_no_errors ]} ]. @@ -133,6 +134,7 @@ init_per_group(Grp, Config) -> test_run -> ["basic_backend", "basic_sample"]; test_run_bt -> ["bt_backend", "bt_sample"]; test_run_tls -> ["tls_backend", "tls_sample"]; + test_run_tlsj -> ["tlsj_backend", "tlsj_sample"]; _ -> [] end, [{test_dir, CWD}, {test_nodes, TestNodes} | Config]. @@ -208,92 +210,42 @@ t_install_bt_backend_node(_Config) -> t_install_bt_sample_node(_Config) -> install_sample_node("bt_sample", "bt_sample.config"). +generic_start(Name) -> + F = filename:join([".", Name, "start_me.sh"]), + Cmd = [env(), + " ./", Name, "/rvi.sh", + " -s ", Name, + " -l ./", Name, "/rvi/log", + " -d ./", Name, + " -c ./", Name, "/priv/test_config/", Name, ".config", + " $1"], + ok = save_cmd(F, Cmd), + cmd([F, " start"]), + await_started(Name). + t_start_basic_backend(_Config) -> - cmd([env(), - " ./basic_backend/rvi.sh" - " -s basic_backend" - " -l ./basic_backend/rvi/log" - " -d ./basic_backend" - " -c ./basic_backend/priv/test_config/basic_backend.config" - " start"]), - await_started("basic_backend"), - ok. + generic_start("basic_backend"). t_start_basic_sample(_Config) -> - cmd([env(), - " ./basic_sample/rvi.sh" - " -s basic_sample" - " -l ./basic_sample/rvi/log" - " -d ./basic_sample" - " -c ./basic_sample/priv/test_config/basic_sample.config" - " start"]), - await_started("basic_sample"), - ok. + generic_start("basic_sample"). t_start_bt_backend(_Config) -> - cmd([env(), - " ./bt_backend/rvi.sh -s bt_backend" - " -l ./bt_backend/rvi/log" - " -d ./bt_backend" - " -c ./bt_backend/priv/test_config/bt_backend.config" - " start"]), - await_started("bt_backend"), - ok. + generic_start("bt_backend"). t_start_bt_sample(_Config) -> - cmd([env(), - " ./bt_sample/rvi.sh" - " -s bt_sample" - " -l ./bt_sample/rvi/log" - " -d ./bt_sample" - " -c ./bt_sample/priv/test_config/bt_sample.config" - " start"]), - await_started("bt_sample"), - ok. + generic_start("bt_sample"). t_start_tls_backend(_Config) -> - cmd([env(), - " ./tls_backend/rvi.sh" - " -s tls_backend" - " -l ./tls_backend/rvi/log" - " -d ./tls_backend" - " -c ./tls_backend/priv/test_config/tls_backend.config" - " start"]), - await_started("tls_backend"), - ok. + generic_start("tls_backend"). t_start_tls_sample(_Config) -> - cmd([env(), - " ./tls_sample/rvi.sh" - " -s tls_sample" - " -l ./tls_sample/rvi/log" - " -d ./tls_sample" - " -c ./tls_sample/priv/test_config/tls_sample.config" - " start"]), - await_started("tls_sample"), - ok. + generic_start("tls_sample"). t_start_tlsj_backend(_Config) -> - cmd([env(), - " ./tlsj_backend/rvi.sh" - " -s tlsj_backend" - " -l ./tlsj_backend/rvi/log" - " -d ./tlsj_backend" - " -c ./tlsj_backend/priv/test_config/tlsj_backend.config" - " start"]), - await_started("tlsj_backend"), - ok. + generic_start("tlsj_backend"). t_start_tlsj_sample(_Config) -> - cmd([env(), - " ./tlsj_sample/rvi.sh" - " -s tlsj_sample" - " -l ./tlsj_sample/rvi/log" - " -d ./tlsj_sample" - " -c ./tlsj_sample/priv/test_config/tlsj_sample.config" - " start"]), - await_started("tlsj_sample"), - ok. + generic_start("tlsj_sample"). t_register_lock_service(_Config) -> Pid = @@ -326,7 +278,7 @@ t_remote_call_lock_service(_Config) -> [{_, Svc}] = lookup({service, lock}), ok = fetch( Svc, - {match, <<"Service invoked![\\s]*args: {u'arg1': u'val1'}">>}), + {match, <<"Service invoked![\\s]*args: {u'arg1': u'val2'}">>}), ct:log("Verified service invoked~n", []), CallRes = fetch(CallPid), verify_call_res(join_stdout_msgs(CallRes)), @@ -559,6 +511,12 @@ cmd(C) -> cmd(C, Opts) -> {ok, _Res} = cmd_(C, Opts). +save_cmd(F, Cmd) -> + ct:log("save_cmd ~s:~n~s", [F, Cmd]), + ok = file:write_file(F, iolist_to_binary(Cmd)), + {ok, FI} = file:read_file_info(F), + ok = file:write_file_info(F, FI#file_info{mode = 8#755}, []). + cmd_(C0, Opts) -> C = binary_to_list(iolist_to_binary(C0)), CmdRes = exec:run(C, [sync, stdout, stderr] ++ Opts), |