diff options
-rw-r--r-- | components/dlink_tls/LICENSE | 354 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls.app.src | 27 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_app.erl | 44 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_conn.erl | 362 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_connmgr.erl | 272 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_listener.erl | 81 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_rpc.erl | 842 | ||||
-rw-r--r-- | components/dlink_tls/src/dlink_tls_sup.erl | 39 | ||||
-rw-r--r-- | deps/setup/Makefile | 2 | ||||
-rw-r--r-- | deps/setup/src/setup.erl | 238 | ||||
-rw-r--r-- | deps/setup/src/setup_gen.erl | 14 | ||||
-rw-r--r-- | rebar.config | 1 | ||||
-rw-r--r-- | test/config/tls_backend.config | 14 | ||||
-rw-r--r-- | test/config/tls_sample.config | 16 | ||||
-rw-r--r-- | test/rvi_core_SUITE.erl | 14 |
15 files changed, 2273 insertions, 47 deletions
diff --git a/components/dlink_tls/LICENSE b/components/dlink_tls/LICENSE new file mode 100644 index 0000000..c33dcc7 --- /dev/null +++ b/components/dlink_tls/LICENSE @@ -0,0 +1,354 @@ +Mozilla Public License, version 2.0 + +1. Definitions + +1.1. “Contributor” + + means each individual or legal entity that creates, contributes to the + creation of, or owns Covered Software. + +1.2. “Contributor Version” + + means the combination of the Contributions of others (if any) used by a + Contributor and that particular Contributor’s Contribution. + +1.3. “Contribution” + + means Covered Software of a particular Contributor. + +1.4. “Covered Software” + + means Source Code Form to which the initial Contributor has attached the + notice in Exhibit A, the Executable Form of such Source Code Form, and + Modifications of such Source Code Form, in each case including portions + thereof. + +1.5. “Incompatible With Secondary Licenses” + means + + a. that the initial Contributor has attached the notice described in + Exhibit B to the Covered Software; or + + b. that the Covered Software was made available under the terms of version + 1.1 or earlier of the License, but not also under the terms of a + Secondary License. + +1.6. “Executable Form” + + means any form of the work other than Source Code Form. + +1.7. “Larger Work” + + means a work that combines Covered Software with other material, in a separate + file or files, that is not Covered Software. + +1.8. “License” + + means this document. + +1.9. “Licensable” + + means having the right to grant, to the maximum extent possible, whether at the + time of the initial grant or subsequently, any and all of the rights conveyed by + this License. + +1.10. “Modifications” + + means any of the following: + + a. any file in Source Code Form that results from an addition to, deletion + from, or modification of the contents of Covered Software; or + + b. any new file in Source Code Form that contains any Covered Software. + +1.11. “Patent Claims” of a Contributor + + means any patent claim(s), including without limitation, method, process, + and apparatus claims, in any patent Licensable by such Contributor that + would be infringed, but for the grant of the License, by the making, + using, selling, offering for sale, having made, import, or transfer of + either its Contributions or its Contributor Version. + +1.12. “Secondary License” + + means either the GNU General Public License, Version 2.0, the GNU Lesser + General Public License, Version 2.1, the GNU Affero General Public + License, Version 3.0, or any later versions of those licenses. + +1.13. “Source Code Form” + + means the form of the work preferred for making modifications. + +1.14. “You” (or “Your”) + + means an individual or a legal entity exercising rights under this + License. For legal entities, “You” includes any entity that controls, is + controlled by, or is under common control with You. For purposes of this + definition, “control” means (a) the power, direct or indirect, to cause + the direction or management of such entity, whether by contract or + otherwise, or (b) ownership of more than fifty percent (50%) of the + outstanding shares or beneficial ownership of such entity. + + +2. License Grants and Conditions + +2.1. Grants + + Each Contributor hereby grants You a world-wide, royalty-free, + non-exclusive license: + + a. under intellectual property rights (other than patent or trademark) + Licensable by such Contributor to use, reproduce, make available, + modify, display, perform, distribute, and otherwise exploit its + Contributions, either on an unmodified basis, with Modifications, or as + part of a Larger Work; and + + b. under Patent Claims of such Contributor to make, use, sell, offer for + sale, have made, import, and otherwise transfer either its Contributions + or its Contributor Version. + +2.2. Effective Date + + The licenses granted in Section 2.1 with respect to any Contribution become + effective for each Contribution on the date the Contributor first distributes + such Contribution. + +2.3. Limitations on Grant Scope + + The licenses granted in this Section 2 are the only rights granted under this + License. No additional rights or licenses will be implied from the distribution + or licensing of Covered Software under this License. Notwithstanding Section + 2.1(b) above, no patent license is granted by a Contributor: + + a. for any code that a Contributor has removed from Covered Software; or + + b. for infringements caused by: (i) Your and any other third party’s + modifications of Covered Software, or (ii) the combination of its + Contributions with other software (except as part of its Contributor + Version); or + + c. under Patent Claims infringed by Covered Software in the absence of its + Contributions. + + This License does not grant any rights in the trademarks, service marks, or + logos of any Contributor (except as may be necessary to comply with the + notice requirements in Section 3.4). + +2.4. Subsequent Licenses + + No Contributor makes additional grants as a result of Your choice to + distribute the Covered Software under a subsequent version of this License + (see Section 10.2) or under the terms of a Secondary License (if permitted + under the terms of Section 3.3). + +2.5. Representation + + Each Contributor represents that the Contributor believes its Contributions + are its original creation(s) or it has sufficient rights to grant the + rights to its Contributions conveyed by this License. + +2.6. Fair Use + + This License is not intended to limit any rights You have under applicable + copyright doctrines of fair use, fair dealing, or other equivalents. + +2.7. Conditions + + Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in + Section 2.1. + + +3. Responsibilities + +3.1. Distribution of Source Form + + All distribution of Covered Software in Source Code Form, including any + Modifications that You create or to which You contribute, must be under the + terms of this License. You must inform recipients that the Source Code Form + of the Covered Software is governed by the terms of this License, and how + they can obtain a copy of this License. You may not attempt to alter or + restrict the recipients’ rights in the Source Code Form. + +3.2. Distribution of Executable Form + + If You distribute Covered Software in Executable Form then: + + a. such Covered Software must also be made available in Source Code Form, + as described in Section 3.1, and You must inform recipients of the + Executable Form how they can obtain a copy of such Source Code Form by + reasonable means in a timely manner, at a charge no more than the cost + of distribution to the recipient; and + + b. You may distribute such Executable Form under the terms of this License, + or sublicense it under different terms, provided that the license for + the Executable Form does not attempt to limit or alter the recipients’ + rights in the Source Code Form under this License. + +3.3. Distribution of a Larger Work + + You may create and distribute a Larger Work under terms of Your choice, + provided that You also comply with the requirements of this License for the + Covered Software. If the Larger Work is a combination of Covered Software + with a work governed by one or more Secondary Licenses, and the Covered + Software is not Incompatible With Secondary Licenses, this License permits + You to additionally distribute such Covered Software under the terms of + such Secondary License(s), so that the recipient of the Larger Work may, at + their option, further distribute the Covered Software under the terms of + either this License or such Secondary License(s). + +3.4. Notices + + You may not remove or alter the substance of any license notices (including + copyright notices, patent notices, disclaimers of warranty, or limitations + of liability) contained within the Source Code Form of the Covered + Software, except that You may alter any license notices to the extent + required to remedy known factual inaccuracies. + +3.5. Application of Additional Terms + + You may choose to offer, and to charge a fee for, warranty, support, + indemnity or liability obligations to one or more recipients of Covered + Software. However, You may do so only on Your own behalf, and not on behalf + of any Contributor. You must make it absolutely clear that any such + warranty, support, indemnity, or liability obligation is offered by You + alone, and You hereby agree to indemnify every Contributor for any + liability incurred by such Contributor as a result of warranty, support, + indemnity or liability terms You offer. You may include additional + disclaimers of warranty and limitations of liability specific to any + jurisdiction. + +4. Inability to Comply Due to Statute or Regulation + + If it is impossible for You to comply with any of the terms of this License + with respect to some or all of the Covered Software due to statute, judicial + order, or regulation then You must: (a) comply with the terms of this License + to the maximum extent possible; and (b) describe the limitations and the code + they affect. Such description must be placed in a text file included with all + distributions of the Covered Software under this License. Except to the + extent prohibited by statute or regulation, such description must be + sufficiently detailed for a recipient of ordinary skill to be able to + understand it. + +5. Termination + +5.1. The rights granted under this License will terminate automatically if You + fail to comply with any of its terms. However, if You become compliant, + then the rights granted under this License from a particular Contributor + are reinstated (a) provisionally, unless and until such Contributor + explicitly and finally terminates Your grants, and (b) on an ongoing basis, + if such Contributor fails to notify You of the non-compliance by some + reasonable means prior to 60 days after You have come back into compliance. + Moreover, Your grants from a particular Contributor are reinstated on an + ongoing basis if such Contributor notifies You of the non-compliance by + some reasonable means, this is the first time You have received notice of + non-compliance with this License from such Contributor, and You become + compliant prior to 30 days after Your receipt of the notice. + +5.2. If You initiate litigation against any entity by asserting a patent + infringement claim (excluding declaratory judgment actions, counter-claims, + and cross-claims) alleging that a Contributor Version directly or + indirectly infringes any patent, then the rights granted to You by any and + all Contributors for the Covered Software under Section 2.1 of this License + shall terminate. + +5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user + license agreements (excluding distributors and resellers) which have been + validly granted by You or Your distributors under this License prior to + termination shall survive termination. + +6. Disclaimer of Warranty + + Covered Software is provided under this License on an “as is” basis, without + warranty of any kind, either expressed, implied, or statutory, including, + without limitation, warranties that the Covered Software is free of defects, + merchantable, fit for a particular purpose or non-infringing. The entire + risk as to the quality and performance of the Covered Software is with You. + Should any Covered Software prove defective in any respect, You (not any + Contributor) assume the cost of any necessary servicing, repair, or + correction. This disclaimer of warranty constitutes an essential part of this + License. No use of any Covered Software is authorized under this License + except under this disclaimer. + +7. Limitation of Liability + + Under no circumstances and under no legal theory, whether tort (including + negligence), contract, or otherwise, shall any Contributor, or anyone who + distributes Covered Software as permitted above, be liable to You for any + direct, indirect, special, incidental, or consequential damages of any + character including, without limitation, damages for lost profits, loss of + goodwill, work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses, even if such party shall have been + informed of the possibility of such damages. This limitation of liability + shall not apply to liability for death or personal injury resulting from such + party’s negligence to the extent applicable law prohibits such limitation. + Some jurisdictions do not allow the exclusion or limitation of incidental or + consequential damages, so this exclusion and limitation may not apply to You. + +8. Litigation + + Any litigation relating to this License may be brought only in the courts of + a jurisdiction where the defendant maintains its principal place of business + and such litigation shall be governed by laws of that jurisdiction, without + reference to its conflict-of-law provisions. Nothing in this Section shall + prevent a party’s ability to bring cross-claims or counter-claims. + +9. Miscellaneous + + This License represents the complete agreement concerning the subject matter + hereof. If any provision of this License is held to be unenforceable, such + provision shall be reformed only to the extent necessary to make it + enforceable. Any law or regulation which provides that the language of a + contract shall be construed against the drafter shall not be used to construe + this License against a Contributor. + + +10. Versions of the License + +10.1. New Versions + + Mozilla Foundation is the license steward. Except as provided in Section + 10.3, no one other than the license steward has the right to modify or + publish new versions of this License. Each version will be given a + distinguishing version number. + +10.2. Effect of New Versions + + You may distribute the Covered Software under the terms of the version of + the License under which You originally received the Covered Software, or + under the terms of any subsequent version published by the license + steward. + +10.3. Modified Versions + + If you create software not governed by this License, and you want to + create a new license for such software, you may create and use a modified + version of this License if you rename the license and remove any + references to the name of the license steward (except to note that such + modified license differs from this License). + +10.4. Distributing Source Code Form that is Incompatible With Secondary Licenses + If You choose to distribute Source Code Form that is Incompatible With + Secondary Licenses under the terms of this version of the License, the + notice described in Exhibit B of this License must be attached. + +Exhibit A - Source Code Form License Notice + + This Source Code Form is subject to the + terms of the Mozilla Public License, v. + 2.0. If a copy of the MPL was not + distributed with this file, You can + obtain one at + http://mozilla.org/MPL/2.0/. + +If it is not possible or desirable to put the notice in a particular file, then +You may include the notice in a location (such as a LICENSE file in a relevant +directory) where a recipient would be likely to look for such a notice. + +You may add additional accurate notices of copyright ownership. + +Exhibit B - “Incompatible With Secondary Licenses” Notice + + This Source Code Form is “Incompatible + With Secondary Licenses”, as defined by + the Mozilla Public License, v. 2.0. + diff --git a/components/dlink_tls/src/dlink_tls.app.src b/components/dlink_tls/src/dlink_tls.app.src new file mode 100644 index 0000000..08d92d4 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls.app.src @@ -0,0 +1,27 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +%% -*- erlang -*- +{application, dlink_tls, + [ + {description, ""}, + {vsn, "0.1"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + rvi_common + ]}, + {mod, { dlink_tls_app, []}}, + {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, + {env, [ + {rvi_core_await, [{n,l,dlink_tls}]} + ]} + ]}. diff --git a/components/dlink_tls/src/dlink_tls_app.erl b/components/dlink_tls/src/dlink_tls_app.erl new file mode 100644 index 0000000..f51aa54 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_app.erl @@ -0,0 +1,44 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(dlink_tls_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, + start_phase/3, + stop/1]). + +-include_lib("lager/include/log.hrl"). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + dlink_tls_sup:start_link(). + +start_phase(init, _, _) -> + dlink_tls_rpc:init_rvi_component(); + +start_phase(json_rpc, _, _) -> + dlink_tls_rpc:start_json_server(), + ok; + +start_phase(connection_manager, _, _) -> + dlink_tls_rpc:start_connection_manager(), + ok; + +start_phase(announce, _, _) -> + rvi_common:announce({n, l, dlink_tls}). + +stop(_State) -> + ok. diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl new file mode 100644 index 0000000..48d79a1 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -0,0 +1,362 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +%%%------------------------------------------------------------------- +%%% @author magnus <magnus@t520.home> +%%% @copyright (C) 2014, magnus +%%% @doc +%%% +%%% @end +%%% Created : 12 Sep 2014 by magnus <magnus@t520.home> +%%%------------------------------------------------------------------- +-module(dlink_tls_conn). + +-behaviour(gen_server). +-include_lib("lager/include/log.hrl"). + + +%% API + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([setup/6]). +-export([upgrade/2]). +-export([send/2]). +-export([send/3]). +-export([is_connection_up/1]). +-export([is_connection_up/2]). +-export([terminate_connection/1]). +-export([terminate_connection/2]). + + +-define(SERVER, ?MODULE). + +-record(st, { + ip = {0,0,0,0}, + port = 0, + sock = undefined, + mode = tcp :: tcp | tls, + mod = undefined, + func = undefined, + args = undefined, + pst = undefined %% Payload state + }). + +%%%=================================================================== +%%% API +%%%=================================================================== +%% MFA is to deliver data received on the socket. + +setup(IP, Port, Sock, Mod, Fun, Arg) -> + case gen_server:start_link(?MODULE, {IP, Port, Sock, Mod, Fun, Arg},[]) of + { ok, GenSrvPid } = Res -> + gen_tcp:controlling_process(Sock, GenSrvPid), + gen_server:cast(GenSrvPid, {activate_socket, Sock}), + Res; + + Err -> + Err + end. + +upgrade(Pid, Role) -> + gen_server:call(Pid, {upgrade, Role}). + +send(Pid, Data) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data}). + +send(IP, Port, Data) -> + case dlink_tls_connmgr:find_connection_by_address(IP, Port) of + {ok, Pid} -> + gen_server:cast(Pid, {send, Data}); + + _Err -> + ?info("connection:send(): Connection ~p:~p not found for data: ~p", + [ IP, Port, Data]), + not_found + + end. + +terminate_connection(Pid) when is_pid(Pid) -> + gen_server:call(Pid, terminate_connection). + +terminate_connection(IP, Port) -> + case dlink_tls_connmgr:find_connection_by_address(IP, Port) of + {ok, Pid} -> + gen_server:call(Pid, terminate_connection); + + _Err -> not_found + end. + + +is_connection_up(Pid) when is_pid(Pid) -> + is_process_alive(Pid). + +is_connection_up(IP, Port) -> + case dlink_tls_connmgr:find_connection_by_address(IP, Port) of + {ok, Pid} -> + is_connection_up(Pid); + + _Err -> + false + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +%% MFA used to handle socket closed, socket error and received data +%% When data is received, a separate process is spawned to handle +%% the MFA invocation. +init({IP, Port, Sock, Mod, Fun, Arg}) -> + case IP of + undefined -> ok; + _ -> dlink_tls_connmgr:add_connection(IP, Port, self()) + end, + ?debug("connection:init(): self(): ~p", [self()]), + ?debug("connection:init(): IP: ~p", [IP]), + ?debug("connection:init(): Port: ~p", [Port]), + ?debug("connection:init(): Sock: ~p", [Sock]), + ?debug("connection:init(): Module: ~p", [Mod]), + ?debug("connection:init(): Function: ~p", [Fun]), + ?debug("connection:init(): Arg: ~p", [Arg]), + %% Grab socket control + {ok, #st{ + ip = IP, + port = Port, + sock = Sock, + mod = Mod, + func = Fun, + args = Arg, + pst = undefined + }}. + + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- + + +handle_call(terminate_connection, _From, St) -> + ?debug("~p:handle_call(terminate_connection): Terminating: ~p", + [ ?MODULE, {St#st.ip, St#st.port}]), + + {stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St), + {stop, Reason, ok, NSt}; +handle_call({upgrade, Role} = Req, _From, #st{sock = S} = St) -> + ?debug("~p:handle_call(~p)~n", [?MODULE, Req]), + + {ok, [{active, Last}]} = inet:getopts(S, [active]), + inet:setopts(S, [{active, false}]), + case do_upgrade(S, Role) of + {ok, NewS} -> + ?debug("upgrade to TLS succcessful~n", []), + ssl:setopts(NewS, [{active, Last}]), + {reply, ok, St#st{sock = NewS, mode = tls}}; + Error -> + ?error("Cannot upgrade to TLS: ~p~n", [Error]), + {stop, Error, Error, St} + end; +handle_call(_Request, _From, State) -> + ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast({send, Data}, St) -> + ?debug("~p:handle_call(send): Sending: ~p", + [ ?MODULE, Data]), + case St#st.mode of + tcp -> gen_tcp:send(St#st.sock, Data); + tls -> ssl:send(St#st.sock, Data) + end, + {noreply, St}; + +handle_cast({activate_socket, Sock}, State) -> + Res = inet:setopts(Sock, [{active, once}]), + ?debug("connection:activate_socket(): ~p", [Res]), + {noreply, State}; + + +handle_cast(_Msg, State) -> + ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- + +%% Fill in peername if empty. +handle_info({tcp, Sock, Data}, + #st { ip = undefined } = St) -> + {ok, {IP, Port}} = inet:peername(Sock), + NSt = St#st { ip = inet_parse:ntoa(IP), port = Port }, + ?warning("YESSSS"), + handle_info({tcp, Sock, Data}, NSt); + +handle_info({ssl, Sock, Data}, + #st{ip = IP, port = Port, + mod = Mod, func = Fun, args = Arg, + pst = PST} = State) -> + ?debug("handle_info(data): ~p", [Data]), + case rvi_common:extract_json(Data, PST) of + {[], NPST} -> + ?debug("data incomplete", []), + ssl:setopts(Sock, [{active, once}]), + {noreply, State#st{pst = NPST}}; + {JSONElements, NPST} -> + ?debug("data complete; Processed: ~p", [JSONElements]), + FromPid = self(), + spawn(fun() -> [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg) + || SingleElem <- JSONElements] + end), + ssl:setopts(Sock, [{active, once}]), + {noreply, State#st{pst = NPST}} + end; +handle_info({tcp, Sock, Data}, + #st { ip = IP, + port = Port, + mod = Mod, + func = Fun, + args = Arg, + pst = PST} = State) -> + ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), + ?debug("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, IP, Port]), + + case rvi_common:extract_json(Data, PST) of + { [], NPST } -> + ?debug("~p:handle_info(data incomplete)", [ ?MODULE]), + inet:setopts(Sock, [{active, once}]), + {noreply, State#st { pst = NPST} }; + + { JSONElements, NPST } -> + ?debug("~p:handle_info(data complete): Processed: ~p", [ ?MODULE, JSONElements]), + FromPid = self(), + spawn(fun() -> [ Mod:Fun(FromPid, IP, Port, + data, SingleElem, Arg) || SingleElem <- JSONElements ] + end), + inet:setopts(Sock, [ { active, once } ]), + {noreply, State#st { pst = NPST} } + end; + + + +handle_info({tcp_closed, Sock}, + #st { ip = IP, + port = Port, + mod = Mod, + func = Fun, + args = Arg } = State) -> + ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]), + Mod:Fun(self(), IP, Port,closed, Arg), + gen_tcp:close(Sock), + dlink_tls_connmgr:delete_connection_by_pid(self()), + {stop, normal, State}; + +handle_info({tcp_error, _Sock}, + #st { ip = IP, + port = Port, + mod = Mod, + func = Fun, + args = Arg} = State) -> + + ?debug("~p:handle_info(tcp_error): Address: ~p:~p ", [ ?MODULE, IP, Port]), + Mod:Fun(self(), IP, Port, error, Arg), + dlink_tls_connmgr:delete_connection_by_pid(self()), + {stop, normal, State}; + + +handle_info(_Info, State) -> + ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ?debug("~p:terminate(): Reason: ~p ", [ ?MODULE, _Reason]), + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% FIXME: For now, use the example certs delivered with the OTP SSL appl. +tls_opts(Role) -> + Dir = tls_dir(Role), + [{certfile, filename:join(Dir, "cert.pem")}, + {keyfile, filename:join(Dir, "key.pem")}]. + +tls_dir(Role) when Role==client; + Role==server -> + filename:join([code:lib_dir(ssl), "examples", "certs", "etc", + atom_to_list(Role)]). + +do_upgrade(Sock, client) -> + ssl:connect(Sock, tls_opts(client)); +do_upgrade(Sock, server) -> + ssl:ssl_accept(Sock, tls_opts(server)). diff --git a/components/dlink_tls/src/dlink_tls_connmgr.erl b/components/dlink_tls/src/dlink_tls_connmgr.erl new file mode 100644 index 0000000..4947ee6 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_connmgr.erl @@ -0,0 +1,272 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% +%% +%%%------------------------------------------------------------------- +%%% @author magnus <magnus@t520.home> +%%% @copyright (C) 2014, magnus +%%% @doc +%%% +%%% @end +%%% Created : 12 Sep 2014 by magnus <magnus@t520.home> +%%%------------------------------------------------------------------- +-module(dlink_tls_connmgr). + +-behaviour(gen_server). +-include_lib("lager/include/log.hrl"). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([add_connection/3]). +-export([delete_connection_by_pid/1]). +-export([delete_connection_by_address/2]). +-export([find_connection_by_pid/1]). +-export([find_connection_by_address/2]). +-export([connections/0]). + +-define(SERVER, ?MODULE). + +-record(st, { + conn_by_pid = undefined, + conn_by_addr = undefined + }). + +%%%=================================================================== +%%% API +%%%=================================================================== + +add_connection(IP, Port, Pid) -> + gen_server:call(?SERVER, { add_connection, IP, Port, Pid}). + +delete_connection_by_pid(Pid) -> + gen_server:call(?SERVER, { delete_connection_by_pid, Pid } ). + +delete_connection_by_address(IP, Port) -> + gen_server:call(?SERVER, { delete_connection_by_address, IP, Port } ). + +find_connection_by_pid(Pid) -> + gen_server:call(?SERVER, { find_connection_by_pid, Pid } ). + +find_connection_by_address(IP, Port) -> + gen_server:call(?SERVER, { find_connection_by_address, IP, Port } ). + +connections() -> + gen_server:call(?SERVER, connections). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #st{ + conn_by_pid = dict:new(), %% All managed connection stored by pid + conn_by_addr = dict:new() %% All managed connection stored by address + }}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call({add_connection, IP, Port, Pid}, _From, + #st { conn_by_pid = ConPid, + conn_by_addr = ConAddr} = St) -> + + ?debug("~p:handle_call(add): Adding Pid: ~p, Address: ~p", + [ ?MODULE, Pid, { IP, Port }]), + %% Store so that we can find connection both by pid and by address + NConPid = dict:store(Pid, { IP, Port }, ConPid), + NConAddr = dict:store({ IP, Port }, Pid, ConAddr), + + NSt = St#st { conn_by_pid = NConPid, + conn_by_addr = NConAddr }, + {reply, ok, NSt}; + +%% Delete connection by pid +handle_call({delete_connection_by_pid, Pid}, _From, + #st { conn_by_pid = ConPid, + conn_by_addr = ConAddr} = St) when is_pid(Pid)-> + + %% Find address associated with Pid + case dict:find(Pid, ConPid) of + error -> + ?debug("~p:handle_call(del_by_pid): not found: ~p", + [ ?MODULE, Pid]), + { reply, not_found, St}; + + {ok, Addr } -> + ?debug("~p:handle_call(del_by_pid): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, Addr]), + + NConPid = dict:erase(Pid, ConPid), + NConAddr = dict:erase(Addr, ConAddr), + + NSt = St#st { conn_by_pid = NConPid, + conn_by_addr = NConAddr }, + + {reply, ok, NSt} + end; + + +%% Delete connection by address +handle_call({ delete_connection_by_address, IP, Port}, _From, + #st { conn_by_pid = ConPid, + conn_by_addr = ConAddr} = St) -> + + %% Find Pid associated with Address + case dict:find({IP, Port}, ConAddr) of + error -> + ?debug("~p:handle_call(del_by_addr): not found: ~p", + [ ?MODULE, {IP, Port}]), + { reply, not_found, St}; + + {ok, Pid } -> + ?debug("~p:handle_call(del_by_addr): deleted Pid: ~p, Address: ~p", + [ ?MODULE, Pid, {IP, Port}]), + NConPid = dict:erase(Pid, ConPid), + NConAddr = dict:erase({ IP, Port }, ConAddr), + NSt = St#st { conn_by_pid = NConPid, + conn_by_addr = NConAddr }, + {reply, ok, NSt} + end; + + +%% Find connection by pid +handle_call({ find_connection_by_pid, Pid}, _From, + #st { conn_by_pid = ConPid} = St) when is_pid(Pid)-> + + %% Find address associated with Pid + case dict:find(Pid, ConPid) of + error -> + ?debug("~p:handle_call(find_by_pid): not found: ~p", + [ ?MODULE, Pid]), + { reply, not_found, St}; + + {ok, {IP, Port} } -> + ?debug("~p:handle_call(find_by_addr): Pid: ~p ->: ~p", + [ ?MODULE, Pid, {IP, Port}]), + {reply, {ok, IP, Port}, St} + end; + +%% Find connection by address +handle_call({find_connection_by_address, IP, Port}, _From, + #st { conn_by_addr = ConAddr} = St) -> + + %% Find address associated with Pid + case dict:find({IP, Port}, ConAddr) of + error -> + ?debug("~p:handle_call(find_by_addr): not found: ~p", + [ ?MODULE, {IP, Port}]), + + { reply, not_found, St}; + + {ok, Pid } -> + ?debug("~p:handle_call(find_by_addr): Addr: ~p ->: ~p", + [ ?MODULE, {IP, Port}, Pid]), + {reply, {ok, Pid}, St} + end; + +handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> + {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; + +handle_call(_Request, _From, State) -> + ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl new file mode 100644 index 0000000..64b004f --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_listener.erl @@ -0,0 +1,81 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + +%% Setup a listen socket and manage connections to remote parties. +%% Can also retrieve connections by peer address. +-module(dlink_tls_listener). + +-include_lib("lager/include/log.hrl"). + +-export([start_link/0, + add_listener/4, + remove_listener/3]). + +-export([init/2, handle_call/3, handle_cast/2, handle_info/2]). +-export([terminate/2, sock_opts/0, new_connection/4]). + +-behavior(gen_nb_server). + +start_link() -> + gen_nb_server:start_link(?MODULE, []). + +add_listener(Pid, IpAddr, Port, CompSpec) -> + gen_server:call(Pid, {add_listener, IpAddr, Port, CompSpec}). + +remove_listener(Pid, IpAddr, Port) -> + gen_server:call(Pid, {remove_listener, IpAddr, Port}). + +init([], State) -> + {ok, State}. + +handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> + case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of + {ok, State1} -> + {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; + + Error -> + {reply, Error, gen_nb_server:store_cb_state( CompSpec, State )} + end; + +handle_call({remove_listener, IpAddr, Port}, _From, State) -> + case gen_nb_server:remove_listen_socket({IpAddr, Port}, State) of + {ok, State1} -> + {reply, ok, State1}; + Error -> + {reply, Error, State} + end; + +handle_call(_Msg, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +sock_opts() -> + [list, {active, once}, {packet, 0}]. + +new_connection(IP, Port, Sock, State) -> + ?debug("~p:new_connection(): Peer IP: ~p (ignored)", [?MODULE,IP]), + ?debug("~p:new_connection(): Peer Port: ~p (ignored)", [?MODULE,Port]), + ?debug("~p:new_connection(): Sock: ~p", [?MODULE,Sock]), + + %% IP and Port are garbage. We'll grab peername when we get our + %% first data. + %% Provide component spec as extra arg. + {ok, _P} = dlink_tls_conn:setup( + undefined, 0, Sock, + dlink_tls_rpc, + handle_socket, [gen_nb_server:get_cb_state(State)]), + {ok, State}. diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl new file mode 100644 index 0000000..8752b01 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -0,0 +1,842 @@ +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(dlink_tls_rpc). +-behavior(gen_server). + +-export([handle_rpc/2]). +-export([handle_notification/2]). +-export([handle_socket/6]). +-export([handle_socket/5]). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([start_json_server/0]). +-export([start_connection_manager/0]). + +%% Invoked by service discovery +%% FIXME: Should be rvi_service_discovery behavior +-export([service_available/3, + service_unavailable/3]). +-export([connections/1]). + +-export([setup_data_link/3, + disconnect_data_link/2, + send_data/5]). + + +-include_lib("lager/include/log.hrl"). +-include_lib("rvi_common/include/rvi_common.hrl"). +-include_lib("rvi_common/include/rvi_dlink.hrl"). + +-define(PERSISTENT_CONNECTIONS, persistent_connections). +-define(SERVER_OPTS, server_opts). +-define(DEFAULT_TCP_PORT, 9999). +-define(DEFAULT_RECONNECT_INTERVAL, 5000). +-define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). +-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes +-define(SERVER, ?MODULE). +-define(DLINK_TLS_VERSION, "1.0"). + +-define(CONNECTION_TABLE, rvi_dlink_tls_connections). +-define(SERVICE_TABLE, rvi_dlink_tls_services). + +%% Multiple registrations of the same service, each with a different connection, +%% is possible. +-record(service_entry, { + service = [], %% Name of service + connections = undefined %% PID of connection that can reach this service + }). + +-record(connection_entry, { + connection = undefined, %% PID of connection that has a set of services. + services = [] %% List of service names available through this connection + }). + +-record(st, { + cs = #component_spec{} + }). + + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +init([]) -> + ?info("dlink_tls:init(): Called"), + %% Dig out the bert rpc server setup + + ets:new(?SERVICE_TABLE, [ set, public, named_table, + { keypos, #service_entry.service }]), + + ets:new(?CONNECTION_TABLE, [ set, public, named_table, + { keypos, #connection_entry.connection }]), + + CS = rvi_common:get_component_specification(), + service_discovery_rpc:subscribe(CS, ?MODULE), + + {ok, #st { + cs = CS + } + }. + +start_json_server() -> + rvi_common:start_json_rpc_server(data_link, ?MODULE, dlink_tls_sup). + + +start_connection_manager() -> + CompSpec = rvi_common:get_component_specification(), + {ok, BertOpts } = rvi_common:get_module_config(data_link, + ?MODULE, + ?SERVER_OPTS, + [], + CompSpec), + IP = proplists:get_value(ip, BertOpts, ?DEFAULT_TCP_ADDRESS), + Port = proplists:get_value(port, BertOpts, ?DEFAULT_TCP_PORT), + + ?info("dlink_tls:init_rvi_component(~p): Starting listener.", [self()]), + + %% Fire up listener + dlink_tls_connmgr:start_link(), + {ok,Pid} = dlink_tls_listener:start_link(), + ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), + + %% Add listener port. + case dlink_tls_listener:add_listener(Pid, IP, Port, CompSpec) of + ok -> + ?notice("---- RVI Node External Address: ~s", + [ application:get_env(rvi_core, node_address, undefined)]); + + Err -> + ?error("dlink_tls:init_rvi_component(): Failed to launch listener: ~p", [ Err ]), + ok + end, + ?info("dlink_tls:init_rvi_component(): Setting up persistent connections."), + + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + ?PERSISTENT_CONNECTIONS, + [], + CompSpec), + + + setup_persistent_connections_(PersistentConnections, CompSpec), + ok. + +setup_persistent_connections_([ ], _CompSpec) -> + ok; + + +setup_persistent_connections_([ NetworkAddress | T], CompSpec) -> + ?debug("~p: Will persistently connect connect : ~p", [self(), NetworkAddress]), + [ IP, Port] = string:tokens(NetworkAddress, ":"), + connect_and_retry_remote(IP, Port, CompSpec), + setup_persistent_connections_(T, CompSpec), + ok. + +service_available(CompSpec, SvcName, DataLinkModule) -> + rvi_common:notification(data_link, ?MODULE, + service_available, + [{ service, SvcName }, + { data_link_module, DataLinkModule }], + CompSpec). + +service_unavailable(CompSpec, SvcName, DataLinkModule) -> + rvi_common:notification(data_link, ?MODULE, + service_unavailable, + [{ service, SvcName }, + { data_link_module, DataLinkModule }], + CompSpec). + +connections(_CompSpec) -> + rvi_common:request(data_link, ?MODULE, connections, []). + +setup_data_link(CompSpec, Service, Opts) -> + rvi_common:request(data_link, ?MODULE, setup_data_link, + [ { service, Service }, + { opts, Opts }], + [status, timeout], CompSpec). + +disconnect_data_link(CompSpec, NetworkAddress) -> + rvi_common:request(data_link, ?MODULE, disconnect_data_link, + [ {network_address, NetworkAddress} ], + [status], CompSpec). + + +send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> + rvi_common:request(data_link, ?MODULE, send_data, + [ { proto_mod, ProtoMod }, + { service, Service }, + { data, Data }, + { opts, DataLinkOpts } + ], + [status], CompSpec). + +%% End of behavior + +%% +%% Connect to a remote RVI node. +%% +connect_remote(IP, Port, CompSpec) -> + ?info("connect_remote(~p, ~p)~n", [IP, Port]), + case dlink_tls_connmgr:find_connection_by_address(IP, Port) of + { ok, _Pid } -> + already_connected; + + not_found -> + %% Setup a new outbound connection + ?info("dlink_tls:connect_remote(): Connecting ~p:~p", + [IP, Port]), + + case gen_tcp:connect(IP, Port, [list, {packet, 0}]) of + { ok, Sock } -> + ?info("dlink_tls:connect_remote(): Connected ~p:~p", + [IP, Port]), + + %% Setup a genserver around the new connection. + {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, + ?MODULE, handle_socket, [CompSpec] ), + + %% Send authorize + { LocalIP, LocalPort} = rvi_common:node_address_tuple(), + dlink_tls_conn:send( + Pid, + term_to_json( + {struct, [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, LocalPort }, + { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, + { ?DLINK_ARG_CERTIFICATES, + {array, get_certificates(CompSpec)} }, + { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } + ]})), + ok; + + {error, Err } -> + ?info("dlink_tls:connect_remote(): Failed ~p:~p: ~p", + [IP, Port, Err]), + not_available + end + end. + +connect_and_retry_remote( IP, Port, CompSpec) -> + ?info("dlink_tls:connect_and_retry_remote(): ~p:~p", + [ IP, Port]), + + case connect_remote(IP, list_to_integer(Port), CompSpec) of + ok -> ok; + + Err -> %% Failed to connect. Sleep and try again + ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Failed: ~p", + [IP, Port, Err]), + + ?notice("dlink_tls:connect_and_retry_remote(~p:~p): Will try again in ~p sec", + [IP, Port, ?DEFAULT_RECONNECT_INTERVAL]), + + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, IP, Port, CompSpec), + + not_available + end. + + +announce_local_service_(_CompSpec, [], _Service, _Availability) -> + ok; + +announce_local_service_(CompSpec, + [ConnPid | T], + Service, Availability) -> + + [ ok, JWT ] = authorize_rpc:sign_message( + CompSpec, availability_msg(Availability, [Service])), + Res = dlink_tls_conn:send( + ConnPid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_SIGNATURE, JWT } + ]})), + + ?debug("dlink_tls:announce_local_service(~p: ~p) -> ~p Res: ~p", + [ Availability, Service, ConnPid, Res]), + + %% Move on to next connection. + announce_local_service_(CompSpec, + T, + Service, Availability). + +announce_local_service_(CompSpec, Service, Availability) -> + announce_local_service_(CompSpec, + get_connections(), + Service, Availability). + +%% We lost the socket connection. +%% Unregister all services that were routed to the remote end that just died. +handle_socket(FromPid, undefined, SetupPort, closed, Arg) -> + handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); + +handle_socket(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> + ?info("dlink_tls:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + + NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), + + %% Get all service records associated with the given connection + LostSvcNameList = get_services_by_connection(FromPid), + + delete_connection(FromPid), + + %% Check if this was our last connection supporting each given service. + lists:map( + fun(SvcName) -> + case get_connections_by_service(SvcName) of + [] -> + service_discovery_rpc: + unregister_services(CompSpec, + [SvcName], + ?MODULE); + _ -> ok + end + end, LostSvcNameList), + + {ok, PersistentConnections } = rvi_common:get_module_config(data_link, + ?MODULE, + persistent_connections, + [], + CompSpec), + %% Check if this is a static node. If so, setup a timer for a reconnect + case lists:member(NetworkAddress, PersistentConnections) of + true -> + ?info("dlink_tls:closed(): Reconnect address: ~p", [ NetworkAddress ]), + ?info("dlink_tls:closed(): Reconnect interval: ~p", [ ?DEFAULT_RECONNECT_INTERVAL ]), + [ IP, Port] = string:tokens(NetworkAddress, ":"), + + setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, + IP, Port, CompSpec); + false -> ok + end, + ok; + +handle_socket(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> + ?info("dlink_tls:socket_error(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ok. + +handle_socket(FromPid, PeerIP, PeerPort, data, Payload, [CompSpec]) -> + + ?debug("dlink_tls:data(): Payload ~p", [Payload ]), + {ok, {struct, Elems}} = exo_json:decode_string(Payload), + + ?debug("dlink_tls:data(): Got ~p", [ Elems ]), + + case opt(?DLINK_ARG_CMD, Elems, undefined) of + ?DLINK_CMD_AUTHORIZE -> + [ TransactionID, + RemoteAddress, + RemotePort, + ProtoVersion, + CertificatesTmp, + Signature ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_ADDRESS, + ?DLINK_ARG_PORT, + ?DLINK_ARG_VERSION, + ?DLINK_ARG_CERTIFICATES, + ?DLINK_ARG_SIGNATURE], + Elems, undefined), + + + Certificates = + case CertificatesTmp of + { array, C} -> C; + undefined -> [] + end, + process_authorize(FromPid, PeerIP, PeerPort, + TransactionID, RemoteAddress, RemotePort, + ProtoVersion, Signature, Certificates, CompSpec); + + ?DLINK_CMD_SERVICE_ANNOUNCE -> + [ TransactionID, + ProtoVersion, + Signature ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_VERSION, + ?DLINK_ARG_SIGNATURE], + Elems, undefined), + + Conn = {PeerIP, PeerPort}, + case authorize_rpc:validate_message(CompSpec, Signature, Conn) of + [ok, Msg] -> + process_announce(Msg, FromPid, PeerIP, PeerPort, + TransactionID, ProtoVersion, CompSpec); + _ -> + ?debug("Couldn't validate availability msg from ~p", [Conn]) + end; + + ?DLINK_CMD_RECEIVE -> + [ _TransactionID, + ProtoMod, + Data ] = + opts([?DLINK_ARG_TRANSACTION_ID, + ?DLINK_ARG_MODULE, + ?DLINK_ARG_DATA], + Elems, undefined), + process_data(FromPid, PeerIP, PeerPort, + ProtoMod, Data, CompSpec); + + ?DLINK_CMD_PING -> + ?info("dlink_tls:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort ]), + ok; + + undefined -> + ?warning("dlink_tls:data() cmd undefined, ~p", [ Elems ]), + ok + end. + +%% JSON-RPC entry point +%% CAlled by local exo http server +handle_notification("service_available", Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + + gen_server:cast(?SERVER, { rvi, service_available, + [ SvcName, + DataLinkModule ]}), + + ok; +handle_notification("service_unavailable", Args) -> + {ok, SvcName} = rvi_common:get_json_element(["service"], Args), + {ok, DataLinkModule} = rvi_common:get_json_element(["data_link_module"], Args), + + gen_server:cast(?SERVER, { rvi, service_unavailable, + [ SvcName, + DataLinkModule ]}), + + ok; + +handle_notification(Other, _Args) -> + ?info("dlink_tls:handle_notification(~p): unknown", [ Other ]), + ok. + +handle_rpc("setup_data_link", Args) -> + { ok, Service } = rvi_common:get_json_element(["service"], Args), + + { ok, Opts } = rvi_common:get_json_element(["opts"], Args), + + [ Res, Timeout ] = gen_server:call(?SERVER, { rvi, setup_data_link, + [ Service, Opts ] }), + + {ok, [ {status, rvi_common:json_rpc_status(Res)} , { timeout, Timeout }]}; + +handle_rpc("disconnect_data_link", Args) -> + { ok, NetworkAddress} = rvi_common:get_json_element(["network_address"], Args), + [Res] = gen_server:call(?SERVER, { rvi, disconnect_data_link, [NetworkAddress]}), + {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; + +handle_rpc("send_data", Args) -> + { ok, ProtoMod } = rvi_common:get_json_element(["proto_mod"], Args), + { ok, Service } = rvi_common:get_json_element(["service"], Args), + { ok, Data } = rvi_common:get_json_element(["data"], Args), + { ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args), + [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}), + {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; + +handle_rpc("connections", []) -> + Res = gen_server:call(?SERVER, connections), + {ok, [ {status, ok} | {connections, {array, Res}} ]}; + +handle_rpc(Other, _Args) -> + ?info("dlink_tls:handle_rpc(~p): unknown", [ Other ]), + { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }. + + +handle_cast( {rvi, service_available, [SvcName, local]}, St) -> + ?debug("dlink_tls:service_available(): ~p (local)", [ SvcName ]), + announce_local_service_(St#st.cs, SvcName, available), + {noreply, St}; + + +handle_cast( {rvi, service_available, [SvcName, Mod]}, St) -> + ?debug("dlink_tls:service_available(): ~p (~p) ignored", [ SvcName, Mod ]), + %% We don't care about remote services available through + %% other data link modules + {noreply, St}; + + +handle_cast( {rvi, service_unavailable, [SvcName, local]}, St) -> + announce_local_service_(St#st.cs, SvcName, unavailable), + {noreply, St}; + +handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) -> + %% We don't care about remote services available through + %% other data link modules + {noreply, St}; + + +handle_cast(Other, St) -> + ?warning("dlink_tls:handle_cast(~p): unknown", [ Other ]), + {noreply, St}. + + +handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> + %% Do we already have a connection that support service? + ?info("dlink_tls: setup_data_link (~p, ~p)~n", [Service, Opts]), + case get_connections_by_service(Service) of + [] -> %% Nop[e + case proplists:get_value(target, Opts, undefined) of + undefined -> + ?info("dlink_tls:setup_data_link(~p) Failed: no target given in options.", + [Service]), + { reply, [ok, -1 ], St }; + + Addr -> + [ Address, Port] = string:tokens(Addr, ":"), + + case connect_remote(Address, list_to_integer(Port), St#st.cs) of + ok -> + { reply, [ok, 2000], St }; %% 2 second timeout + + already_connected -> %% We are already connected + { reply, [already_connected, -1], St }; + + Err -> + { reply, [Err, 0], St } + end + end; + + _ -> %% Yes - We do have a connection that knows of service + { reply, [already_connected, -1], St } + end; + + +handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> + [ Address, Port] = string:tokens(NetworkAddress, ":"), + Res = dlink_tls_conn:terminate_connection(Address,Port), + { reply, [ Res ], St }; + + +handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) -> + + %% Resolve connection pid from service + case get_connections_by_service(Service) of + [] -> + { reply, [ no_route ], St}; + + %% FIXME: What to do if we have multiple connections to the same service? + [ConnPid | _T] -> + Res = dlink_tls_conn:send(ConnPid, + term_to_json( + { struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } + ]})), + + { reply, [ Res ], St} + end; + + + + +handle_call({setup_initial_ping, Address, Port, Pid}, _From, St) -> + %% Create a timer to handle periodic pings. + {ok, ServerOpts } = rvi_common:get_module_config(data_link, + ?MODULE, + ?SERVER_OPTS, [], + St#st.cs), + Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), + + ?info("dlink_tls:setup_ping(): ~p:~p will be pinged every ~p msec", + [ Address, Port, Timeout] ), + + erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }), + + {reply, ok, St}; + +handle_call(Other, _From, St) -> + ?warning("dlink_tls:handle_rpc(~p): unknown", [ Other ]), + { reply, { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ]}, St}. + + + +%% Ping time +handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> + + %% Check that connection is up + case dlink_tls_conn:is_connection_up(Pid) of + true -> + ?info("dlink_tls:ping(): Pinging: ~p:~p", [Address, Port]), + dlink_tls_conn:send(Pid, term_to_json({ struct, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]})), + erlang:send_after(Timeout, self(), + { rvi_ping, Pid, Address, Port, Timeout }); + + false -> + ok + end, + {noreply, St}; + +%% Setup static nodes +handle_info({ rvi_setup_persistent_connection, IP, Port, CompSpec }, St) -> + ?info("rvi_setup_persistent_connection, ~p, ~p~n", [IP, Port]), + connect_and_retry_remote(IP, Port, CompSpec), + { noreply, St }; + + +handle_info(Info, St) -> + ?notice("dlink_tls(): Unkown message: ~p", [ Info]), + {noreply, St}. + +terminate(_Reason, _St) -> + ok. +code_change(_OldVsn, St, _Extra) -> + {ok, St}. + +setup_reconnect_timer(MSec, IP, Port, CompSpec) -> + erlang:send_after(MSec, ?MODULE, + { rvi_setup_persistent_connection, + IP, Port, CompSpec }), + ok. + +get_services_by_connection(ConnPid) -> + case ets:lookup(?CONNECTION_TABLE, ConnPid) of + [ #connection_entry { services = SvcNames } ] -> + SvcNames; + [] -> [] + end. + + +get_connections_by_service(Service) -> + case ets:lookup(?SERVICE_TABLE, Service) of + [ #service_entry { connections = Connections } ] -> + Connections; + [] -> [] + end. + + +add_services(SvcNameList, ConnPid) -> + %% Create or replace existing connection table entry + %% with the sum of new and old services. + ets:insert(?CONNECTION_TABLE, + #connection_entry { + connection = ConnPid, + services = SvcNameList ++ get_services_by_connection(ConnPid) + }), + + %% Add the connection to the service entry for each servic. + [ ets:insert(?SERVICE_TABLE, + #service_entry { + service = SvcName, + connections = [ConnPid | get_connections_by_service(SvcName)] + }) || SvcName <- SvcNameList ], + ok. + + +delete_services(ConnPid, SvcNameList) -> + ets:insert(?CONNECTION_TABLE, + #connection_entry { + connection = ConnPid, + services = get_services_by_connection(ConnPid) -- SvcNameList + }), + + %% Loop through all services and update the conn table + %% Update them with a new version where ConnPid has been removed + [ ets:insert(?SERVICE_TABLE, + #service_entry { + service = SvcName, + connections = get_connections_by_service(SvcName) -- [ConnPid] + }) || SvcName <- SvcNameList ], + ok. + +availability_msg(Availability, Services) -> + {struct, [{ ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, {array, Services} }]}. + +status_string(available ) -> ?DLINK_ARG_AVAILABLE; +status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. + +process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, + RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> + ?info("dlink_tls:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), + ?info("dlink_tls:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), + ?info("dlink_tls:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), + ?debug("dlink_tls:authorize(): TransactionID: ~p", [ TransactionID ]), + ?debug("dlink_tls:authorize(): Certificates: ~p", [ Certificates ]), + ?debug("dlink_tls:authorize(): Signature: ~p", [ Signature ]), + + { _NRemoteAddress, _NRemotePort} = Conn = + case { RemoteAddress, RemotePort } of + { "0.0.0.0", 0 } -> + + ?info("dlink_tls:authorize(): Remote is behind firewall. Will use ~p:~p", + [ PeerIP, PeerPort]), + { PeerIP, PeerPort }; + _ -> { RemoteAddress, RemotePort} + end, + + case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of + true -> + connection_authorized(FromPid, Conn, CompSpec); + false -> + %% close connection (how?) + false + end. + +send_authorize(Pid, CompSpec) -> + {LocalIP, LocalPort} = rvi_common:node_address_tuple(), + dlink_tls_conn:send(Pid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, + { ?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION }, + { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, + { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + +connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> + %% If FromPid (the genserver managing the socket) is not yet registered + %% with the connection manager, this is an incoming connection + %% from the client. We should respond with our own authorize followed by + %% a service announce + case dlink_tls_connmgr:find_connection_by_pid(FromPid) of + not_found -> + ?info("dlink_tls:authorize(): New connection!"), + dlink_tls_connmgr:add_connection(RemoteIP, RemotePort, FromPid), + ?debug("dlink_tls:authorize(): Sending authorize."), + _Res = send_authorize(FromPid, CompSpec), + ?debug("dlink_tls:upgrade connection", []), + UgRes = dlink_tls_conn:upgrade(FromPid, server), + ?debug("upgrade result: ~p", [UgRes]), + ok; + _ -> + UgRes = dlink_tls_conn:upgrade(FromPid, client), + ?debug("upgrade result: ~p", [UgRes]), + ok + end, + + %% Send our own servide announcement to the remote server + %% that just authorized to us. + [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), + + [ ok, FilteredServices ] = authorize_rpc:filter_by_service( + CompSpec, LocalServices, Conn), + + %% Send an authorize back to the remote node + ?info("dlink_tls:authorize(): Announcing local services: ~p to remote ~p:~p", + [FilteredServices, RemoteIP, RemotePort]), + + [ ok, JWT ] = authorize_rpc:sign_message( + CompSpec, availability_msg(available, FilteredServices)), + dlink_tls_conn:send(FromPid, + term_to_json( + {struct, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_SIGNATURE, JWT } ]})), + + %% Setup ping interval + gen_server:call(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), + ok. + +process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> + ?debug("dlink_tls:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]), + ?debug("dlink_tls:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), + Proto = list_to_existing_atom(ProtocolMod), + Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, + base64:decode_to_string(Data)). + +process_announce({struct, Elems}, FromPid, IP, Port, TID, _Vsn, CompSpec) -> + [ Avail, + {array, Svcs} ] = + opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), + ?debug("dlink_tls:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), + ?debug("dlink_tls:service_announce(~p): TransactionID: ~p", [Avail,TID]), + ?debug("dlink_tls:service_announce(~p): Services: ~p", [Avail,Svcs]), + case Avail of + ?DLINK_ARG_AVAILABLE -> + add_services(Svcs, FromPid), + service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); + ?DLINK_ARG_UNAVAILABLE -> + delete_services(FromPid, Svcs), + service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) + end, + ok. + +delete_connection(Conn) -> + %% Create or replace existing connection table entry + %% with the sum of new and old services. + SvcNameList = get_services_by_connection(Conn), + + %% Replace each existing connection entry that has + %% SvcName with a new one where the SvcName is removed. + lists:map(fun(SvcName) -> + Existing = get_connections_by_service(SvcName), + ets:insert(?SERVICE_TABLE, # + service_entry { + service = SvcName, + connections = Existing -- [ Conn ] + }) + end, SvcNameList), + + %% Delete the connection + ets:delete(?CONNECTION_TABLE, Conn), + ok. + + + +get_connections('$end_of_table', Acc) -> + Acc; + +get_connections(Key, Acc) -> + get_connections(ets:next(?CONNECTION_TABLE, Key), [ Key | Acc ]). + + +get_connections() -> + get_connections(ets:first(?CONNECTION_TABLE), []). + + +get_authorize_jwt(CompSpec) -> + case authorize_rpc:get_authorize_jwt(CompSpec) of + [ok, JWT] -> + JWT; + [not_found] -> + ?error("No authorize JWT~n", []), + error(cannot_authorize) + end. + +get_certificates(CompSpec) -> + case authorize_rpc:get_certificates(CompSpec) of + [ok, Certs] -> + Certs; + [not_found] -> + ?error("No certificate found~n", []), + error(no_certificate_found) + end. + +validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> + case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of + [ok] -> + true; + [not_found] -> + false + end. + +term_to_json(Term) -> + binary_to_list(iolist_to_binary(exo_json:encode(Term))). + +opt(K, L, Def) -> + case lists:keyfind(K, 1, L) of + {_, V} -> V; + false -> Def + end. + +opts(Keys, Elems, Def) -> + [ opt(K, Elems, Def) || K <- Keys]. diff --git a/components/dlink_tls/src/dlink_tls_sup.erl b/components/dlink_tls/src/dlink_tls_sup.erl new file mode 100644 index 0000000..cd59434 --- /dev/null +++ b/components/dlink_tls/src/dlink_tls_sup.erl @@ -0,0 +1,39 @@ +%% -*- erlang-indent-level: 4; indent-tabs-mode: nil -*- +%% +%% Copyright (C) 2014, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% + + +-module(dlink_tls_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + {ok, { {one_for_one, 5, 10}, + [ + ?CHILD(dlink_tls_rpc, worker) + ]} }. diff --git a/deps/setup/Makefile b/deps/setup/Makefile index e4b0fc1..a62cec5 100644 --- a/deps/setup/Makefile +++ b/deps/setup/Makefile @@ -33,7 +33,7 @@ clean_test: eunit: compile ${REBAR} eunit -test: compile compile_test +test: eunit compile_test ./setup_gen test xtest/test.conf xtest/releases/1 -pa ${PWD}/ebin run_test: diff --git a/deps/setup/src/setup.erl b/deps/setup/src/setup.erl index 54d1ce1..881e5a7 100644 --- a/deps/setup/src/setup.erl +++ b/deps/setup/src/setup.erl @@ -64,6 +64,46 @@ %% * ``'{'$string', Var}''' - Use the string representation of the value %% * ``'{'$binary', Var}''' - Use the binary representation of the value. %% +%% Example: +%% <pre lang="erlang"> +%% 2> application:set_env(setup, vars, [{"PLUS", {apply,erlang,'+',[1,2]}}, +%% 2> {"FOO", {value, {foo,1}}}]). +%% ok +%% 3> application:set_env(stdlib, '$setup_vars', +%% 3> [{"MINUS", {apply,erlang,'-',[4,3]}}, +%% 3> {"BAR", {value, "bar"}}]). +%% ok +%% 4> application:set_env(setup, v1, "/$BAR/$PLUS/$MINUS/$FOO"). +%% ok +%% 5> setup:get_env(setup,v1). +%% {ok,"/$BAR/3/$MINUS/{foo,1}"} +%% 6> application:set_env(stdlib, v1, "/$BAR/$PLUS/$MINUS/$FOO"). +%% ok +%% 7> setup:get_env(stdlib,v1). +%% {ok,"/bar/3/1/{foo,1}"} +%% </pre> +%% +%% In the above example, the first expansion (command no. 5), leaves `$BAR' +%% and `$MINUS' unexpanded, since they are defined in the `stdlib' application, +%% and thus not known to `setup'. In command no. 6, however, they <em>are</em> +%% in context, and are expanded. The variables `$PLUS' and `$FOO' have global +%% context and are expanded in both cases. +%% +%% It is also possible to refer to environment variables in the same +%% application. These are referenced as `"$env(VarName)"'. The corresponding +%% values are expanded in turn - take care not to create expansion loops! +%% The same rules for expansion as above apply. +%% +%% Example: +%% <pre lang="erlang"> +%% 2> application:set_env(setup,foo,"foo"). +%% ok +%% 3> application:set_env(setup,foo_dir,"$HOME/$env(foo)"). +%% ok +%% 4> setup:get_env(setup,foo_dir). +%% {ok,"/Users/uwiger/git/setup/foo"} +%% </pre> +%% %% == Customizing setup == %% The following environment variables can be used to customize `setup': %% * `{home, Dir}' - The topmost directory of the running system. This should @@ -217,7 +257,7 @@ find_env_vars(Env) -> case app_get_env(A, Env) of {ok, Val} when Val =/= undefined -> NewEnv = private_env(A, GEnv), - [{A, expand_env(NewEnv, Val)}]; + [{A, expand_env(NewEnv, Val, A)}]; _ -> [] end @@ -247,7 +287,7 @@ get_env(A, Key, Default) -> %% @end get_all_env(A) -> Vars = private_env(A), - [{K, expand_env(Vars, V)} || + [{K, expand_env(Vars, V, A)} || {K, V} <- application:get_all_env(A)]. -spec expand_value(atom(), any()) -> any(). @@ -257,7 +297,7 @@ get_all_env(A) -> %% {@section Variable expansion}. %% @end expand_value(App, Value) -> - expand_env(private_env(App), Value). + expand_env(private_env(App), Value, App). global_env() -> Acc = [{K, fun() -> env_value(K) end} || @@ -265,7 +305,9 @@ global_env() -> custom_global_env(Acc). custom_global_env(Acc) -> - lists:foldl(fun custom_env1/2, Acc, + lists:foldl(fun(E, Acc1) -> + custom_env1(E, Acc1, setup) + end, Acc, [{K,V} || {K,V} <- app_get_env(setup, vars, []), is_list(K)]). @@ -278,7 +320,9 @@ private_env(A, GEnv) -> custom_private_env(A, Acc ++ GEnv). custom_private_env(A, Acc) -> - lists:foldl(fun custom_env1/2, Acc, + lists:foldl(fun(E, Acc1) -> + custom_env1(E, Acc1, A) + end, Acc, [{K, V} || {K,V} <- app_get_env(A, '$setup_vars', []), is_list(K)]). @@ -300,37 +344,121 @@ app_get_env(A, K, Default) -> app_get_key(A, K) -> application:get_key(A, K). -custom_env1({K, V}, Acc) -> - [{K, fun() -> custom_env_value(K, V, Acc) end} | Acc]. - -expand_env(Vs, {T,"$" ++ S}) when T=='$value'; T=='$string'; T=='$binary' -> +custom_env1({K, V}, Acc, A) -> + [{K, fun() -> custom_env_value(K, V, Acc, A) end} | Acc]. + +expand_env(_, {T,"$env(" ++ S} = X, A) + when T=='$value'; T=='$string'; T=='$binary' -> + try Res = case get_env_name_l(S) of + false -> undefined; + {Name,[]} -> app_get_env(A, Name) + end, + case {Res, T} of + {undefined, '$value'} -> undefined; + {undefined, '$string'} -> ""; + {undefined, '$binary'} -> <<>>; + {{ok,V} , '$value'} -> V; + {{ok,V} , '$string'} -> binary_to_list(stringify(V)); + {{ok,V} , '$binary'} -> stringify(V) + end + catch + error:_ -> X + end; +expand_env(Vs, {T,"$" ++ S}, _) when T=='$value'; T=='$string'; T=='$binary' -> case {lists:keyfind(S, 1, Vs), T} of {false, '$value'} -> undefined; {false, '$string'} -> ""; - {false, '$binary'} -> <<"">>; + {false, '$binary'} -> <<>>; {{_,V}, '$value'} -> V(); {{_,V}, '$string'} -> binary_to_list(stringify(V())); {{_,V}, '$binary'} -> stringify(V()) end; -expand_env(Vs, T) when is_tuple(T) -> - list_to_tuple([expand_env(Vs, X) || X <- tuple_to_list(T)]); -expand_env(Vs, L) when is_list(L) -> +expand_env(Vs, T, A) when is_tuple(T) -> + list_to_tuple([expand_env(Vs, X, A) || X <- tuple_to_list(T)]); +expand_env(Vs, L, A) when is_list(L) -> case setup_lib:is_string(L) of true -> - do_expand_env(L, Vs, list); + do_expand_env(L, Vs, A, list); false -> - [expand_env(Vs, X) || X <- L] + [expand_env(Vs, X, A) || X <- L] end; -expand_env(Vs, B) when is_binary(B) -> - do_expand_env(B, Vs, binary); -expand_env(_, X) -> +expand_env(Vs, B, A) when is_binary(B) -> + do_expand_env(B, Vs, A, binary); +expand_env(_, X, _) -> X. -do_expand_env(X, Vs, Type) -> - lists:foldl(fun({K, Val}, Xx) -> - re:replace(Xx, [$\\, $$ | K], - stringify(Val()), [{return,Type}]) - end, X, Vs). +%% do_expand_env(X, Vs, Type) -> +%% lists:foldl(fun({K, Val}, Xx) -> +%% re:replace(Xx, [$\\, $$ | K], +%% stringify(Val()), [{return,Type}]) +%% end, X, Vs). + +do_expand_env(X, Vs, A, binary) -> + do_expand_env_b(iolist_to_binary(X), Vs, A); +do_expand_env(X, Vs, A, list) -> + binary_to_list(do_expand_env_b(iolist_to_binary(X), Vs, A)). + +do_expand_env_b(<<"$env(", T/binary>>, Vs, A) -> + case get_env_name_b(T) of + {K, T1} -> + case app_get_env(A, K) of + {ok, V} -> + Res = expand_env(Vs, V, A), + <<(stringify(Res))/binary, + (do_expand_env_b(T1, Vs, A))/binary>>; + undefined -> + <<"$env(", (do_expand_env_b(T, Vs, A))/binary>> + end; + false -> + do_expand_env_b(T, Vs, A) + end; +do_expand_env_b(<<"$", T/binary>>, Vs, A) -> + case match_var_b(Vs, T) of + {Res, T1} -> + <<Res/binary, (do_expand_env_b(T1, Vs, A))/binary>>; + false -> + <<"$", (do_expand_env_b(T, Vs, A))/binary>> + end; +do_expand_env_b(<<H, T/binary>>, Vs, A) -> + <<H, (do_expand_env_b(T, Vs, A))/binary>>; +do_expand_env_b(<<>>, _, _) -> + <<>>. + +get_env_name_b(B) -> + get_env_name_b(B, <<>>). + +get_env_name_b(<<")", T/binary>>, Acc) -> + try {binary_to_existing_atom(Acc, latin1), T} + catch + error:_ -> false + end; +get_env_name_b(<<H, T/binary>>, Acc) -> + get_env_name_b(T, <<Acc/binary, H>>); +get_env_name_b(<<>>, _) -> + false. + +get_env_name_l(L) -> + get_env_name_l(L, []). + +get_env_name_l(")" ++ T, Acc) -> + try {list_to_existing_atom(lists:reverse(Acc)), T} + catch + error:_ -> false + end; +get_env_name_l([H|T], Acc) -> + get_env_name_l(T, [H|Acc]); +get_env_name_l([], _) -> + false. + +match_var_b([{K,V}|T], B) -> + case re:split(B, "^" ++ K, [{return, binary}]) of + [_] -> + match_var_b(T, B); + [<<>>, Rest] -> + {stringify(V()), Rest} + end; +match_var_b([], _) -> + false. env_value("LOG_DIR") -> log_dir(); env_value("DATA_DIR") -> data_dir(); @@ -340,13 +468,13 @@ env_value("APP", A) -> A; env_value("PRIV_DIR", A) -> priv_dir(A); env_value("LIB_DIR" , A) -> lib_dir(A). -custom_env_value(_K, {value, V}, _Vs) -> +custom_env_value(_K, {value, V}, _Vs, _A) -> V; -custom_env_value(_K, {expand, V}, Vs) -> - expand_env(Vs, V); -custom_env_value(K, {apply, M, F, A}, _Vs) -> +custom_env_value(_K, {expand, V}, Vs, A) -> + expand_env(Vs, V, A); +custom_env_value(K, {apply, M, F, As}, _Vs, _A) -> %% Not ideal, but don't want to introduce exceptions in get_env() - try apply(M, F, A) + try apply(M, F, As) catch error:_ -> {error, {custom_setup_env, K}} @@ -1263,15 +1391,15 @@ read_config_script(F, Name, Opts) -> {'CWD', filename:absname(Dir)}, {'OPTIONS', Opts}])) of {ok, Conf} when is_list(Conf) -> - expand_config_script(Conf, Name, lists:reverse(Opts)); + expand_config_script(Conf, Name, [], Opts); Error -> setup_lib:abort("Error reading conf (~s): ~p~n", [F, Error]) end. -expand_config_script([{include, F}|Opts], Name, Acc) -> - Acc1 = read_config_script(F, Name, lists:reverse(Acc)), - expand_config_script(Opts, Name, Acc1); -expand_config_script([{include_lib, LibF}|Opts], Name, Acc) -> +expand_config_script([{include, F}|T], Name, Acc, Opts) -> + Incl = read_config_script(F, Name, Opts), + expand_config_script(T, Name, [Incl|Acc], Opts); +expand_config_script([{include_lib, LibF}|T], Name, Acc, Opts) -> case filename:split(LibF) of [App|Tail] -> try code:lib_dir(to_atom(App)) of @@ -1281,9 +1409,9 @@ expand_config_script([{include_lib, LibF}|Opts], Name, Acc) -> [LibF, App]); LibDir when is_list(LibDir) -> FullName = filename:join([LibDir | Tail]), - Acc1 = read_config_script( - FullName, Name, lists:reverse(Acc)), - expand_config_script(Opts, Name, Acc1) + Incl = read_config_script( + FullName, Name, Opts), + expand_config_script(T, Name, [Incl|Acc], Opts) catch error:_ -> setup_lib:abort( @@ -1293,10 +1421,10 @@ expand_config_script([{include_lib, LibF}|Opts], Name, Acc) -> [] -> setup_lib:abort("Invalid include conf: no file specified~n", []) end; -expand_config_script([H|T], Name, Acc) -> - expand_config_script(T, Name, [H|Acc]); -expand_config_script([], _, Acc) -> - lists:reverse(Acc). +expand_config_script([H|T], Name, Acc, Opts) -> + expand_config_script(T, Name, [H|Acc], Opts); +expand_config_script([], _, Acc, _) -> + lists:flatten(lists:reverse(Acc)). to_atom(B) when is_binary(B) -> binary_to_existing_atom(B, latin1); @@ -1325,7 +1453,8 @@ setup_test_() -> end, [ ?_test(t_find_hooks()), - ?_test(t_expand_vars()) + ?_test(t_expand_vars()), + ?_test(t_nested_includes()) ]}. t_find_hooks() -> @@ -1356,17 +1485,40 @@ t_expand_vars() -> application:set_env(stdlib, '$setup_vars', [{"MINUS", {apply,erlang,'-',[4,3]}}, {"BAR", {value, "bar"}}]), - application:set_env(setup, v1, "/$BAR/$PLUS/$MINUS/$FOO"), + application:set_env(setup, envy, 17), + application:set_env(setup, v1, "/$BAR/$PLUS/$MINUS/$FOO/$env(envy)"), application:set_env(setup, v2, {'$value', "$FOO"}), + application:set_env(setup, v3, {'$string', "$env(envy)"}), application:set_env(stdlib, v1, {'$string', "$FOO"}), application:set_env(stdlib, v2, {'$binary', "$FOO"}), application:set_env(stdlib, v3, {"$PLUS", "$MINUS", "$BAR"}), %% $BAR and $MINUS are not in setup's context - {ok, "/$BAR/3/$MINUS/{foo,1}"} = setup:get_env(setup, v1), + {ok, "/$BAR/3/$MINUS/{foo,1}/17"} = setup:get_env(setup, v1), {ok, {foo,1}} = setup:get_env(setup, v2), + {ok, "17"} = setup:get_env(setup, v3), {ok, "{foo,1}"} = setup:get_env(stdlib, v1), {ok, <<"{foo,1}">>} = setup:get_env(stdlib,v2), {ok, {"3", "1", "bar"}} = setup:get_env(stdlib,v3), ok. +t_nested_includes() -> + to_file_("a.config", [{apps,[kernel,stdlib,setup]}, + {env,[{setup,[{a,1}]}]}]), + to_file_("b.config", [{include,"a.config"}, + {set_env, [{setup, [{a,2}]}]}]), + to_file_("c.config", [{include, "b.config"}, + {set_env, [{setup, [{a,3}]}]}]), + [{apps,[kernel,stdlib,setup]}, + {env, [{setup, [{a,1}]}]}, + {set_env, [{setup, [{a,2}]}]}, + {set_env, [{setup, [{a,3}]}]}] = + setup:read_config_script("c.config", nested, []). + +to_file_(F, Term) -> + {ok, Fd} = file:open(F, [write]), + try io:fwrite(Fd, "~p.~n", [Term]) + after + file:close(Fd) + end. + -endif. diff --git a/deps/setup/src/setup_gen.erl b/deps/setup/src/setup_gen.erl index e0137fe..7a74107 100644 --- a/deps/setup/src/setup_gen.erl +++ b/deps/setup/src/setup_gen.erl @@ -171,12 +171,13 @@ run(Options) -> ?if_verbose(io:fwrite("Options = ~p~n", [Options])), Config = read_config(Options), ?if_verbose(io:fwrite("Config = ~p~n", [Config])), - FullOpts = Options ++ Config, + FullOpts = insert_config(Config, Options), + ?if_verbose(io:fwrite("FullOpts = ~p~n", [FullOpts])), {Name, OutDir, RelDir, RelVsn, GenTarget} = name_and_target(FullOpts), ensure_dir(RelDir), Roots = roots(FullOpts), ?if_verbose(io:fwrite("Roots = ~p~n", [Roots])), - check_config(Config), + check_config(FullOpts), Env = env_vars(FullOpts), InstEnv = install_env(Env, FullOpts), add_paths(Roots, FullOpts), @@ -202,6 +203,14 @@ run(Options) -> setup_lib:write_eterm("setup_gen.eterm", FullOpts) end). +insert_config(Conf, Options) -> + lists:flatmap( + fun({conf, _} = C) -> + [C|Conf]; + (Other) -> + [Other] + end, Options). + name_and_target(FullOpts) -> Name = option(name, FullOpts), case proplists:get_value(target, FullOpts, false) of @@ -797,6 +806,7 @@ app_vsn(A, V) -> Path = code:get_path(), Found = [D || D <- Path, is_app(AppStr, D)], Sorted = setup_lib:sort_vsns(lists:usort(Found), AppStr), + ?if_verbose(io:fwrite("Sorted = ~p~n", [Sorted])), match_app_vsn(Sorted, V, AppStr). match_app_vsn(Vsns, latest, App) -> diff --git a/rebar.config b/rebar.config index 49a5149..6711ec8 100644 --- a/rebar.config +++ b/rebar.config @@ -11,6 +11,7 @@ "components/dlink_bt", "components/dlink_sms", "components/dlink_tcp", + "components/dlink_tls", "components/proto_bert", "components/proto_json", "components/schedule", diff --git a/test/config/tls_backend.config b/test/config/tls_backend.config new file mode 100644 index 0000000..16f892b --- /dev/null +++ b/test/config/tls_backend.config @@ -0,0 +1,14 @@ +%% -*- erlang -*- +[ + {include_lib, "rvi_core/test/config/backend.config"}, + {add_apps, [dlink_tls]}, + {set_env, + [ + {rvi_core, + [ + { [routing_rules, ""], {proto_json, dlink_tls_rpc} }, + { [components, data_link], [{dlink_tls_rpc, gen_server, + [{server_opts, [{port, 8007}]}]}]} + ]} + ]} +]. diff --git a/test/config/tls_sample.config b/test/config/tls_sample.config new file mode 100644 index 0000000..2ba5562 --- /dev/null +++ b/test/config/tls_sample.config @@ -0,0 +1,16 @@ +%% -*- erlang -*- +[ + {include_lib, "rvi_core/test/config/sample.config"}, + {add_apps, [dlink_tls]}, + {set_env, + [ + {rvi_core, + [ + { [routing_rules, ""], {proto_json, dlink_tls_rpc} }, + { [components, data_link], [{dlink_tls_rpc, gen_server, + [{server_opts, [{port, 9007}]}, + {persistent_connections, + ["localhost:8007"]}]}]} + ]} + ]} +]. diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl index 07870ae..1d4c67a 100644 --- a/test/rvi_core_SUITE.erl +++ b/test/rvi_core_SUITE.erl @@ -17,6 +17,8 @@ t_install_sample_node/1, t_install_sms_backend_node/1, t_install_sms_sample_node/1, + t_install_tls_backend_node/1, + t_install_tls_sample_node/1, t_start_basic_backend/1, t_start_basic_sample/1, t_register_lock_service/1, @@ -44,7 +46,9 @@ groups() -> t_install_backend_node, t_install_sample_node, t_install_sms_backend_node, - t_install_sms_sample_node + t_install_sms_sample_node, + t_install_tls_backend_node, + t_install_tls_sample_node ]}, {test_run, [], [ @@ -120,6 +124,14 @@ t_install_sms_sample_node(Config) -> install_rvi_node("sms_sample", env(), [root(), "/test/config/sms_sample.config"]). +t_install_tls_backend_node(Config) -> + install_rvi_node("tls_backend", env(), + [root(), "/test/config/tls_backend.config"]). + +t_install_tls_sample_node(Config) -> + install_rvi_node("tls_sample", env(), + [root(), "/test/config/tls_sample.config"]). + t_start_basic_backend(Config) -> cmd([scripts(), "/rvi_node.sh -d -n basic_backend"]), await_started("basic_backend"), |