summaryrefslogtreecommitdiff
path: root/components
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:17:30 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:22:36 -0800
commit37dd6cef3e8abdee0829aabf121a2ca5dd35b14e (patch)
treeb7174d0d66f2db6f9a0d724213ad19e427e22847 /components
parent7922125aba23033945e3b55a4bf78ef8e84521d0 (diff)
downloadrvi_core-37dd6cef3e8abdee0829aabf121a2ca5dd35b14e.tar.gz
fragmentation tests
Diffstat (limited to 'components')
-rw-r--r--components/authorize/src/authorize_keys.erl3
-rw-r--r--components/dlink/src/dlink_data.erl93
-rw-r--r--components/dlink/src/dlink_data_json.erl2
-rw-r--r--components/dlink/src/dlink_data_msgpack.erl7
-rw-r--r--components/dlink_bt/src/bt_connection.erl88
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl8
-rw-r--r--components/dlink_tcp/src/connection.erl99
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl61
-rw-r--r--components/dlink_tcp/src/gen_nb_server.erl11
-rw-r--r--components/dlink_tcp/src/listener.erl2
-rw-r--r--components/dlink_tls/src/dlink_tls_conn.erl217
-rw-r--r--components/dlink_tls/src/dlink_tls_listener.erl27
-rw-r--r--components/dlink_tls/src/dlink_tls_rpc.erl62
-rw-r--r--components/proto_json/src/proto_json_rpc.erl89
-rw-r--r--components/proto_msgpack/src/proto_msgpack_rpc.erl3
-rw-r--r--components/rvi_common/include/rvi_msgpack_rpc.hrl17
-rw-r--r--components/rvi_common/src/rvi_common.erl161
-rw-r--r--components/rvi_common/src/rvi_frag.erl353
-rw-r--r--components/rvi_common/src/rvi_log.erl50
-rw-r--r--components/rvi_common/src/rvi_msgpack_rpc.erl165
-rw-r--r--components/rvi_common/src/rvi_msgpack_rpc_server.erl185
21 files changed, 1367 insertions, 336 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index 593f69c..a7bf84a 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -360,6 +360,8 @@ match_svc_([H|T], [H|T1]) ->
match_svc_(T, T1);
match_svc_(["+"|T], [_|T1]) ->
match_svc_(T, T1);
+match_svc_([[]], _) ->
+ true;
match_svc_([], _) ->
true;
match_svc_(_, _) ->
@@ -681,7 +683,6 @@ abbrev_jwt({Hdr, Body} = X) ->
abbrev_jwt(X) ->
X.
-
abbrev_pl(#cred{} = Payload) ->
list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B);
([{_,_}|_]=L) -> abbrev_payload(L);
diff --git a/components/dlink/src/dlink_data.erl b/components/dlink/src/dlink_data.erl
new file mode 100644
index 0000000..d16d923
--- /dev/null
+++ b/components/dlink/src/dlink_data.erl
@@ -0,0 +1,93 @@
+-module(dlink_data).
+
+-export([decode/5,
+ encode/3]).
+
+-include_lib("lager/include/log.hrl").
+
+decode(Data, F, St, Mod, FragOpts) when is_function(F,1) ->
+ DecodeRes = case St of
+ <<>> when Data == <<>> ->
+ {ok, <<>>};
+ <<>> -> do_decode(Data);
+ Rest when is_binary(Rest) ->
+ do_decode(<<Rest/binary, Data/binary>>);
+ Cont when is_function(Cont, 1) ->
+ Cont(Data)
+ end,
+ case DecodeRes of
+ Cont1 when is_function(Cont1, 1) ->
+ {ok, Cont1};
+ {ok, Rest1} ->
+ {ok, Rest1};
+ {ok, Decoded, Rest1} ->
+ decoded(Decoded, Rest1, F, Mod, FragOpts);
+ {error, _} = Err ->
+ Err
+ end.
+
+encode(Msg, PMod, PSt) ->
+ PMod:encode(Msg, PSt).
+
+do_decode(Data) ->
+ case Data of
+ <<8:4,_:4,_/binary>> ->
+ %% msgpack map
+ ?debug("detected msgpack map", []),
+ msgpack_decode(Data);
+ <<H, _/binary>> when H==16#de; H==16#df ->
+ %% msgpack map 16 or map 32
+ ?debug("detected msgpack map 16 or map 32", []),
+ msgpack_decode(Data);
+ _ ->
+ ?debug("assuming json", []),
+ jsx_decode(Data)
+ end.
+
+decoded(Decoded, Rest, F, Mod, FragOpts) ->
+ case rvi_frag:maybe_fragment(Decoded, Mod, FragOpts) of
+ true ->
+ {ok, Rest};
+ {true, Msg} ->
+ case do_decode(Msg) of
+ {ok, DecMsg, <<>>} ->
+ F(DecMsg),
+ decode(Rest, F, <<>>, Mod, FragOpts);
+ {error, _} = Err1 ->
+ Err1
+ end;
+ false ->
+ F(Decoded),
+ decode(Rest, F, <<>>, Mod, FragOpts)
+ end.
+
+msgpack_decode(Data) ->
+ case msgpack:unpack_stream(Data, [jsx]) of
+ {error, incomplete} ->
+ fun(NewData) ->
+ msgpack_decode(
+ <<Data/binary, NewData/binary>>)
+ end;
+ {error, E} ->
+ {error, E};
+ {Decoded, Rest} when is_binary(Rest) ->
+ {ok, Decoded, Rest}
+ end.
+
+jsx_decode(Data) ->
+ try jsx_decode_res(jsx:decode(Data, [stream, return_tail]))
+ catch
+ error:E ->
+ ?error("jsx decode failed: ~p", [E]),
+ {error, E}
+ end.
+
+jsx_decode_res(Res) ->
+ case Res of
+ {incomplete, Cont} ->
+ fun(NewData) ->
+ jsx_decode_res(Cont(NewData))
+ end;
+ {with_tail, Decoded, Rest} ->
+ {ok, Decoded, Rest}
+ end.
diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl
index 6a68e48..2442b1c 100644
--- a/components/dlink/src/dlink_data_json.erl
+++ b/components/dlink/src/dlink_data_json.erl
@@ -6,7 +6,7 @@
port_options/0]).
-init(_Opts) ->
+init(_) ->
[].
port_options() ->
diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl
index 253da55..14139bc 100644
--- a/components/dlink/src/dlink_data_msgpack.erl
+++ b/components/dlink/src/dlink_data_msgpack.erl
@@ -1,20 +1,21 @@
-module(dlink_data_msgpack).
--export([init/1,
+-export([init/0, init/1,
decode/3,
encode/2]).
-export([port_options/0]).
-record(st, {opts = [{allow_atom, pack},
- {enable_str, true},
jsx],
buf = <<>>}).
port_options() ->
[binary, {packet, 0}].
-init(_CS) ->
+init(_) -> init().
+
+init() ->
#st{}.
decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) ->
diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl
index bcfa199..399e70d 100644
--- a/components/dlink_bt/src/bt_connection.erl
+++ b/components/dlink_bt/src/bt_connection.erl
@@ -29,6 +29,7 @@
-export([accept/6]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
@@ -47,6 +48,8 @@
mode = bt,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ decode_st = <<>>,
+ frag_opts = [],
mod,
func,
args
@@ -74,6 +77,8 @@ accept(Channel, ListenRef, Mode, Mod, Fun, Arg) ->
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(Addr, Channel, Data) ->
case bt_connection_manager:find_connection_by_address(Addr, Channel) of
{ok, Pid} ->
@@ -86,6 +91,9 @@ send(Addr, Channel, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -137,6 +145,10 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) ->
gen_server:cast(self(), connect),
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS),
PktSt = PktMod:init(CS),
+ DefFragMod = dlink_data_msgpack,
+ DefFragSt = dlink_data_msgpack:init([]),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {DefFragMod, DefFragSt}}], CS),
{ok, #st{
remote_addr = bt_addr(Mode, BTAddr),
channel = Channel,
@@ -144,9 +156,10 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) ->
mode = Mode,
mod = Mod,
func = Fun,
- args = CS,
+ args = rvi_common:set_value(role, client, CS),
packet_mod = PktMod,
- packet_st = PktSt
+ packet_st = PktSt,
+ frag_opts = FragOpts
}};
@@ -182,7 +195,7 @@ init({accept, Channel, ListenRef, Mode, Mod, Fun, CS}) ->
mode = Mode,
mod = Mod,
func = Fun,
- args = CS,
+ args = rvi_common:set_value(role, server, CS),
packet_mod = PktMod,
packet_st = PktSt
}}.
@@ -271,13 +284,28 @@ handle_cast({send, Data}, #st{mode = Mode,
?debug("handle_cast(send): Sending: ~p", [Data]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
?debug("Encoded = ~p", [Encoded]),
- Res = case Mode of
- bt -> rfcomm:send(Sock, Encoded);
- tcp -> gen_tcp:send(Sock, Encoded)
- end,
+ Res = do_send(Mode, Sock, Encoded),
?debug("send Res = ~p", [Res]),
{noreply, St#st{packet_st = PSt1}};
+handle_cast({send, Data, Opts}, #st{mode = Mode, rfcomm_ref = Socket,
+ packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast({send, Data, ~p}, ...), FragOpts = ~p",
+ [Opts, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE,
+ fun() ->
+ do_send(Mode, Socket, Bin)
+ end),
+ {noreply, St1};
+
+handle_cast({send_data, Data}, #st{mode = Mode, rfcomm_ref = Socket} = St) ->
+ do_send(Mode, Socket, Data),
+ {noreply, St};
+
handle_cast(_Msg, State) ->
?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
{noreply, State}.
@@ -315,25 +343,24 @@ handle_info({rfcomm, ARef, { accept, BTAddr, _ } },
handle_info({rfcomm, _ConnRef, {data, Data}},
#st { remote_addr = BTAddr,
channel = Channel,
- packet_mod = PMod,
- packet_st = PSt,
+ decode_st = DSt,
+ frag_opts = FragOpts,
mod = Mod,
func = Fun } = State) ->
?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]),
?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]),
?info("~p:handle_info(data): ~p:~p -> ~p:~p",
[ ?MODULE, BTAddr, Channel, Mod, Fun]),
- case PMod:decode(Data, fun(Elems) ->
- handle_elements(Elems, State)
- end, PSt) of
- {ok, PSt1} ->
- {noreply, State#st{packet_st = PSt1}};
+ case dlink_data:decode(Data, fun(Elems) ->
+ got_msg(Elems, State)
+ end, DSt, ?MODULE, FragOpts) of
+ {ok, DSt1} ->
+ {noreply, State#st{decode_st = DSt1}};
{error, Reason} ->
- ?error("decode failed: ~p", [Reason]),
+ ?error("decode failed: Reason = ~p", [Reason]),
{stop, Reason, State}
end;
-
handle_info({rfcomm, ConnRef, closed},
#st { remote_addr = BTAddr,
channel = Channel,
@@ -375,18 +402,21 @@ handle_info({tcp, Sock, Data}, #st{remote_addr = IP,
channel = Port,
rfcomm_ref = Sock,
packet_mod = PMod,
- packet_st = PSt} = St) ->
- ?debug("handle_info(data): From: ~p:~p", [IP, Port]),
- case PMod:decode(Data, fun(Elems) ->
- handle_elements(Elems, St)
- end, PSt) of
- {ok, PSt1} ->
+ frag_opts = FragOpts,
+ decode_st = DSt} = St) ->
+ ?debug("handle_info(Data = ~p): From: ~p:~p", [Data, IP, Port]),
+ ?debug("PMod = ~p; DSt = ~p", [PMod, DSt]),
+ case dlink_data:decode(Data, fun(Elems) ->
+ got_msg(Elems, St)
+ end, DSt, ?MODULE, FragOpts) of
+ {ok, DSt1} ->
inet:setopts(Sock, [{active, once}]),
- {noreply, St#st{packet_st = PSt1}};
+ {noreply, St#st{decode_st = DSt1}};
{error, Reason} ->
?error("decode failed, Reason = ~p", [Reason]),
{stop, Reason, St}
end;
+
handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod,
func = Fun,
args = Arg} = St) ->
@@ -432,14 +462,16 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+do_send(tcp, Sock, Data) ->
+ gen_tcp:send(Sock, Data);
+do_send(bt, Sock, Data) ->
+ rfcomm:send(Sock, Data).
+
get_module_config(Key, Default, CS) ->
rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
-handle_elements(Elements, #st{remote_addr = BTAddr,
- channel = Channel,
- mod = Mod,
- func = Fun,
- args = Arg}) ->
+got_msg(Elements, #st{remote_addr = BTAddr, channel = Channel,
+ mod = Mod, func = Fun, args = Arg}) ->
?debug("data complete; processed: ~p",
[authorize_keys:abbrev(Elements)]),
Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg).
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index 75e7926..2fc6587 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -45,7 +45,7 @@
-define(CONNECTION_TABLE, rvi_dlink_bt_connections).
-define(SERVICE_TABLE, rvi_dlink_bt_services).
--define(DLINK_BT_VER, "1.0").
+-define(DLINK_BT_VER, <<"1.0">>).
%% Multiple registrations of the same service, each with a different connection,
%% is possible.
@@ -579,7 +579,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) ->
+handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) ->
%% Resolve connection pid from service
case get_connections_by_service(Service) of
@@ -595,7 +595,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
{ ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
{ ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) },
{ ?DLINK_ARG_DATA, Data }
- ]),
+ ], DataLinkOpts),
{ reply, [ Res ], St}
end;
@@ -652,7 +652,7 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
{Address, Channel} =
- case Mode = get_mode(CompSpec) of
+ case get_mode(CompSpec) of
bt ->
{ok,[{address, Addr}]} = bt_drv:local_info([address]),
{bt_address_to_string(Addr), SetupChannel};
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 7229b3c..1d4753e 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -26,13 +26,15 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([setup/6]).
+-export([setup/7]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
-export([terminate_connection/2]).
+-export([get_source_address/1]).
-define(SERVER, ?MODULE).
@@ -47,6 +49,8 @@
args = undefined,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ decode_st = <<>>,
+ frag_opts = [],
cs
}).
@@ -55,9 +59,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, CS) ->
+setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server ->
?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
+ case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) ->
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(IP, Port, Data) ->
case connection_manager:find_connection_by_address(IP, Port) of
{ok, Pid} ->
@@ -82,6 +88,9 @@ send(IP, Port, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -106,6 +115,9 @@ is_connection_up(IP, Port) ->
false
end.
+get_source_address(Pid) ->
+ gen_server:call(Pid, get_source_address).
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -124,11 +136,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
- case IP of
- undefined -> ok;
- _ -> connection_manager:add_connection(IP, Port, self())
- end,
+init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): self(): ~p", [self()]),
?debug("connection:init(): IP: ~p", [IP]),
?debug("connection:init(): Port: ~p", [Port]),
@@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
?debug("connection:init(): Function: ~p", [Fun]),
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
PktSt = PktMod:init(CompSpec),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
func = Fun,
packet_mod = PktMod,
packet_st = PktSt,
- cs = CompSpec
+ frag_opts = FragOpts,
+ cs = rvi_common:save_source_address(
+ Role, Sock, rvi_common:set_value(role, Role, CompSpec))
}}.
get_module_config(Key, Default, CS) ->
@@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) ->
%% @end
%%--------------------------------------------------------------------
-
-handle_call(terminate_connection, _From, St) ->
+handle_call(get_source_address, _, #st{cs = CS} = St) ->
+ {reply, rvi_common:get_source_address(CS), St};
+handle_call(terminate_connection, _From, St) ->
?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.ip, St#st.port}]),
@@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
- ?debug("Encoded = ~p", [Encoded]),
gen_tcp:send(St#st.sock, Encoded),
{noreply, St#st{packet_st = PSt1}};
+handle_cast({send, Data, Opts}, #st{sock = Socket,
+ packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p",
+ [Opts, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE,
+ fun() ->
+ gen_tcp:send(Socket, Bin)
+ end),
+ {noreply, St1};
+handle_cast({send_data, Data}, #st{sock = Sock} = St) ->
+ gen_tcp:send(Sock, Data),
+ {noreply, St};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
packet_mod = PMod,
- packet_st = PSt} = State) ->
- ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]),
- case PMod:decode(Data, fun(Elems) ->
- handle_elements(Elems, State)
- end, PSt) of
- {ok, PSt1} ->
+ packet_st = PSt,
+ decode_st = DSt,
+ frag_opts = FragOpts} = State) ->
+ ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ",
+ [Data, PMod, PSt, IP, Port]),
+ case dlink_data:decode(Data, fun(Elems) ->
+ got_msg(Elems, State)
+ end, DSt, ?MODULE, FragOpts) of
+ {ok, DSt1} = Ok ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st{packet_st = PSt1}};
+ {noreply, State#st{decode_st = DSt1}};
{error, Reason} ->
?error("decode failed, Reason = ~p", [Reason]),
{stop, Reason, State}
end;
+ %% case PMod:decode(Data, fun(Elems) ->
+ %% handle_elements(Elems, State)
+ %% end, PSt) of
+ %% {ok, PSt1} ->
+ %% inet:setopts(Sock, [{active, once}]),
+ %% {noreply, State#st{packet_st = PSt1}};
+ %% {error, Reason} ->
+ %% ?error("decode failed, Reason = ~p", [Reason]),
+ %% {stop, Reason, State}
+ %% end;
handle_info({tcp_closed, Sock},
#st { ip = IP,
@@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) ->
%% {ok, PSt1};
%% { ->
-handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
- ip = IP, port = Port}) ->
+%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) ->
+%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts),
+%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]),
+%% case MaybeF of
+%% true ->
+%% %% It was a fragment, but not a complete message yet
+%% St;
+%% {true, Msg} ->
+%% #st{packet_mod = PMod, packet_st = PSt} = St,
+%% PMod:decode(Msg, fun(Elems) ->
+%% got_msg(Elems, St)
+%% end, PSt);
+%% false ->
+%% got_msg(Elements, St)
+%% end.
+
+got_msg(Elements, #st{ip = IP, port = Port,
+ mod = Mod, func = Fun, cs = CS}) ->
?debug("data complete: Processed: ~p",
[authorize_keys:abbrev(Elements)]),
Mod:Fun(self(), IP, Port, data, Elements, CS).
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index 330985a..78de04a 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.1").
+-define(DLINK_TCP_VERSION, <<"1.1">>).
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -208,11 +208,12 @@ connect_remote(IP, Port, CompSpec) ->
[IP, Port]),
%% Setup a genserver around the new connection.
- {ok, Pid } = connection:setup(IP, Port, Sock,
+ {ok, Pid } = connection:setup(client, IP, Port, Sock,
?MODULE, handle_socket, CompSpec ),
%% Send authorize
send_authorize(Pid, CompSpec),
+ connection_manager:add_connection(IP, Port, Pid),
ok;
{error, Err } ->
@@ -283,6 +284,7 @@ handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
+ ?debug("CompSpec = ~p", [CompSpec]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -538,7 +540,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) ->
+handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) ->
%% Resolve connection pid from service
case get_connections_by_service(Service) of
@@ -552,7 +554,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
{ ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
{ ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) },
{ ?DLINK_ARG_DATA, Data }
- ]),
+ ], DataLinkOpts),
{ reply, [ Res ], St}
end;
@@ -569,7 +571,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
case connection:is_connection_up(Pid) of
true ->
?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
- connection:send(Pid, jsx:encode([{?DLINK_ARG_CMD, ?DLINK_CMD_PING}])),
+ connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
@@ -659,22 +661,53 @@ availability_msg(Availability, Services, CompSpec) ->
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
+bin(S) ->
+ iolist_to_binary(S).
+
process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
- {NRemoteAddress, NRemotePort} = Conn =
- case { RemoteAddress, RemotePort } of
- { <<"0.0.0.0">>, 0 } ->
- ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
- [ PeerIP, PeerPort]),
- { PeerIP, PeerPort };
- _ -> { RemoteAddress, RemotePort}
- end,
+ F = fun() ->
+ process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec)
+ end,
+ case connection_manager:find_connection_by_address(PeerIP, PeerPort) of
+ not_found -> F();
+ BPid ->
+ deconflict_conns(FromPid, BPid, CompSpec, F)
+ end.
+deconflict_conns(APid, BPid, CsA, F) ->
+ {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA),
+ case connection:get_source_address(BPid) of
+ undefined ->
+ ?debug("Deconflict - BSrc = undefined, kill BPid (~p)", [BPid]),
+ exit(BPid, deconflict),
+ F();
+ {_,_} = BSrc when BSrc > ASrc ->
+ ?debug("Deconflict - kill BPid (~p): ASrc = ~p, BSrc = ~p", [BPid, ASrc, BSrc]),
+ exit(BPid, deconflict),
+ F();
+ BSrc ->
+ ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]),
+ exit(deconflict)
+ end.
+
+
+process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort,
+ _ProtoVersion, Credentials, CompSpec) ->
+ {NRemoteAddress, NRemotePort} = Conn =
+ case { RemoteAddress, RemotePort } of
+ { "0.0.0.0", 0 } ->
+ ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p",
+ [ PeerIP, PeerPort]),
+ { PeerIP, PeerPort };
+ _ -> { RemoteAddress, RemotePort}
+ end,
log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
authorize_rpc:store_creds(CompSpec, Credentials, Conn),
connection_authorized(FromPid, Conn, CompSpec).
@@ -683,7 +716,7 @@ send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
[{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_ADDRESS, bin(LocalIP) },
{ ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
{ ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
{ ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl
index ae0a605..16693ae 100644
--- a/components/dlink_tcp/src/gen_nb_server.erl
+++ b/components/dlink_tcp/src/gen_nb_server.erl
@@ -90,7 +90,8 @@ store_cb_state(CBState, State) when is_record(State, state) ->
%% @doc Adds a new listener socket to be managed by gen_nb_server
%% NOTE: Should only be called by the submodule
-spec add_listen_socket({string(), integer()}, #state{}) -> {ok, #state{}} | {error, any()}.
-add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) ->
+add_listen_socket({IpAddr0, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) ->
+ IpAddr = str(IpAddr0),
Key = {IpAddr, Port},
case dict:find(Key, Socks) of
{ok, _} ->
@@ -108,7 +109,8 @@ add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=
%% @doc Removes a new listener socket to be managed by gen_nb_server
%% NOTE: Should only be called by the submodule
-spec remove_listen_socket({string(), integer()}, #state{}) -> {error, not_listening} | {ok, #state{}}.
-remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
+remove_listen_socket({IpAddr0, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
+ IpAddr = str(IpAddr0),
Key = {IpAddr, Port},
case dict:find(Key, Socks) of
error ->
@@ -119,6 +121,11 @@ remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) ->
addrs=dict:erase(Sock, Addrs)}}
end.
+str(Addr) when is_list(Addr) ->
+ Addr;
+str(Addr) when is_binary(Addr) ->
+ binary_to_list(Addr).
+
%% @doc Returns the callback module's state
-spec init([atom()|any()]) -> {ok, #state{}} | {error, bad_init_state} | {error, any()}.
init([CallbackModule, InitParams]) ->
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 6def59a..88c3d23 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -101,7 +101,7 @@ new_connection(IP, Port, Sock, State) ->
%% IP and Port are garbage. We'll grab peername when we get our
%% first data.
%% Provide component spec as extra arg.
- {ok, _P} = connection:setup(undefined, 0, Sock,
+ {ok, _P} = connection:setup(server, undefined, 0, Sock,
dlink_tcp_rpc,
handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.
diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl
index c051afc..447581d 100644
--- a/components/dlink_tls/src/dlink_tls_conn.erl
+++ b/components/dlink_tls/src/dlink_tls_conn.erl
@@ -27,11 +27,12 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([setup/6]).
--export([upgrade/3,
- async_upgrade/3]).
+-export([setup/7]).
+-export([upgrade/2,
+ async_upgrade/2]).
-export([send/2]).
-export([send/3]).
+-export([send_data/2]).
-export([is_connection_up/1]).
-export([is_connection_up/2]).
-export([terminate_connection/1]).
@@ -40,7 +41,6 @@
-define(SERVER, ?MODULE).
-define(PACKET_MOD, dlink_data_msgpack).
--define(MAX_MSG_SIZE, infinity).
-record(st, {
ip = {0,0,0,0},
@@ -49,11 +49,11 @@
mode = tcp :: tcp | tls,
packet_mod = ?PACKET_MOD,
packet_st = [],
+ frag_opts = [],
mod = undefined,
func = undefined,
cs,
- role = server :: client | server,
- msg_size = ?MAX_MSG_SIZE :: infinity | pos_integer()
+ role = server :: client | server
}).
%%%===================================================================
@@ -61,8 +61,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
- Params = {IP, Port, Sock, Mod, Fun, CompSpec},
+setup(Role, IP, Port, Sock, Mod, Fun, CompSpec) when Role==client;
+ Role==server ->
+ Params = {Role, IP, Port, Sock, Mod, Fun, CompSpec},
?debug("setup() IP = ~p; Port = ~p; Mod = ~p; Fun = ~p", [IP, Port, Mod, Fun]),
?debug("CompSpec = ~p", [CompSpec]),
case gen_server:start_link(?MODULE, Params ,[]) of
@@ -75,16 +76,18 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) ->
Err
end.
-upgrade(Pid, Role, CompSpec) when Role==client; Role==server ->
- gen_server:call(Pid, {upgrade, Role, CompSpec}).
+upgrade(Pid, Role) when Role==client; Role==server ->
+ gen_server:call(Pid, {upgrade, Role}).
-async_upgrade(Pid, Role, CompSpec) when Role==client;
- Role==server ->
- gen_server:cast(Pid, {upgrade, Role, CompSpec}).
+async_upgrade(Pid, Role) when Role==client;
+ Role==server ->
+ gen_server:cast(Pid, {upgrade, Role}).
send(Pid, Data) when is_pid(Pid) ->
gen_server:cast(Pid, {send, Data}).
+send(Pid, Data, Opts) when is_pid(Pid) ->
+ gen_server:cast(Pid, {send, Data, Opts});
send(IP, Port, Data) ->
case dlink_tls_connmgr:find_connection_by_address(IP, Port) of
{ok, Pid} ->
@@ -97,6 +100,9 @@ send(IP, Port, Data) ->
end.
+send_data(Pid, Data) ->
+ gen_server:cast(Pid, {send_data, Data}).
+
terminate_connection(Pid) when is_pid(Pid) ->
gen_server:call(Pid, terminate_connection).
@@ -139,7 +145,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
+init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> dlink_tls_connmgr:add_connection(IP, Port, self())
@@ -153,6 +159,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
{ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
?debug("packet_mod = ~p", [PktMod]),
PktSt = PktMod:init(CompSpec),
+ {ok, FragOpts} = get_module_config(
+ frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec),
{ok, #st{
ip = IP,
port = Port,
@@ -160,12 +168,27 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
mod = Mod,
packet_mod = PktMod,
packet_st = PktSt,
+ frag_opts = FragOpts,
func = Fun,
- cs = CompSpec
+ cs = rvi_common:set_value(role, Role, CompSpec)
}}.
get_module_config(Key, Default, CS) ->
- rvi_common:get_module_config(data_link, dlink_tls_rpc, Key, Default, CS).
+ ModConf = fun() ->
+ rvi_common:get_module_config(
+ data_link, dlink_tls_rpc, Key, Default, CS)
+ end,
+ case rvi_common:get_value(tls_opts, undefined, CS) of
+ undefined -> ModConf();
+ Opts ->
+ case lists:keyfind(Key, 1, Opts) of
+ false ->
+ ModConf();
+ {_, Val} ->
+ Val
+ end
+ end.
+
%%--------------------------------------------------------------------
%% @private
@@ -183,18 +206,18 @@ get_module_config(Key, Default, CS) ->
%%--------------------------------------------------------------------
-handle_call(terminate_connection, _From, St) ->
+handle_call(terminate_connection, _From, #st{} = St) ->
?debug("~p:handle_call(terminate_connection): Terminating: ~p",
[ ?MODULE, {St#st.ip, St#st.port}]),
{stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St),
{stop, Reason, ok, NSt};
-handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) ->
+handle_call({upgrade, Role} = Req, _From, #st{cs = CS} = St) ->
?debug("~p:handle_call(~p)~n", [?MODULE, Req]),
%% deliberately crash (for now) if upgrade fails.
- {Reply, St1} = handle_upgrade(Role, CompSpec, St),
+ {Reply, #st{} = St1} = handle_upgrade(Role, CS, St),
{reply, Reply, St1};
-handle_call(_Request, _From, State) ->
+handle_call(_Request, _From, #st{} = State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
Reply = ok,
{reply, Reply, State}.
@@ -209,11 +232,11 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({upgrade, Role, CompSpec}, St) ->
- {_, St1} = handle_upgrade(Role, CompSpec, St),
+handle_cast({upgrade, Role}, #st{cs = CS} = St) ->
+ {_, #st{} = St1} = handle_upgrade(Role, CS, St),
{noreply, St1};
handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
- ?debug("~p:handle_call(send): Sending: ~p",
+ ?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, abbrev(Data)]),
{ok, Encoded, PSt1} = PMod:encode(Data, PSt),
?debug("Encoded~n~s", [Encoded]),
@@ -222,15 +245,29 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
tls -> ssl:send(St#st.sock, Encoded)
end,
{noreply, St#st{packet_st = PSt1}};
-
-handle_cast({activate_socket, Sock}, State) ->
+handle_cast({send, Data, Opts} = Req, #st{packet_mod = PMod,
+ packet_st = PSt,
+ frag_opts = FragOpts} = St) ->
+ ?debug("handle_cast(~p, ...), FragOpts = ~p", [Req, FragOpts]),
+ {ok, Bin, PSt1} = PMod:encode(Data, PSt),
+ St1 = St#st{packet_st = PSt1},
+ rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, fun() ->
+ do_send(Bin, St1)
+ end),
+ {noreply, St1};
+handle_cast({send_data, Data}, #st{} = St) ->
+ %% don't encode; just send
+ ?debug("send_data, ~w", [authorize_keys:abbrev_bin(Data)]),
+ do_send(Data, St),
+ {noreply, St};
+handle_cast({activate_socket, Sock}, #st{} = State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
{noreply, State};
-handle_cast(_Msg, State) ->
- ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]),
+handle_cast(_Msg, #st{} = State) ->
+ ?warning("~p:handle_cast(): Unknown cast: ~p~nSt=~p", [ ?MODULE, _Msg, State]),
{noreply, State}.
%%--------------------------------------------------------------------
@@ -255,6 +292,7 @@ handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port,
packet_mod = PMod, packet_st = PSt} = State) ->
?debug("handle_info(data): Data: ~p", [abbrev(Data)]),
?debug("handle_info(data): From: ~p:~p ", [ IP, Port]),
+ ?debug("handle_info(data): PMod: ~p", [PMod]),
case PMod:decode(Data, fun(Elems) ->
handle_elems(Elems, State)
end, PSt) of
@@ -283,15 +321,18 @@ handle_info({tcp, Sock, Data},
{stop, Reason, State}
end;
-handle_info({tcp_closed, Sock},
+handle_info({Evt, Sock},
#st { ip = IP,
port = Port,
mod = Mod,
func = Fun,
- cs = CS} = State) ->
- ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]),
+ cs = CS} = State) when Evt==tcp_closed; Evt==ssl_closed ->
+ ?debug("~p:handle_info(~w): Address: ~p:~p ", [ ?MODULE, Evt, IP, Port]),
Mod:Fun(self(), IP, Port,closed, CS),
- gen_tcp:close(Sock),
+ case Evt of
+ tcp_closed -> gen_tcp:close(Sock);
+ ssl_closed -> ssl:close(Sock)
+ end,
dlink_tls_connmgr:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -308,7 +349,7 @@ handle_info({tcp_error, _Sock},
{stop, normal, State};
-handle_info(_Info, State) ->
+handle_info(_Info, #st{} = State) ->
?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]),
{noreply, State}.
@@ -341,15 +382,20 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+
+do_send(Bin, #st{sock = Sock, mode = tcp}) ->
+ gen_tcp:send(Sock, Bin);
+do_send(Bin, #st{sock = Sock, mode = tls}) ->
+ ssl:send(Sock, Bin).
+
handle_upgrade(Role, CompSpec, #st{sock = S} = St) ->
- %% {ok, [{active, Last}]} = inet:getopts(S, [active]),
inet:setopts(S, [{active, false}]),
case do_upgrade(S, Role, CompSpec) of
- {ok, NewS} ->
+ {DoVerify, {ok, NewS}} ->
?debug("upgrade to TLS succcessful~n", []),
ssl:setopts(NewS, [{active, once}]),
{ok, {IP, Port}} = ssl:peername(NewS),
- {ok, PeerCert} = ssl:peercert(NewS),
+ PeerCert = get_peercert(DoVerify, NewS),
?debug("SSL PeerCert=~w", [abbrev(PeerCert)]),
NewCS = rvi_common:set_value(
dlink_tls_role, Role,
@@ -357,35 +403,73 @@ handle_upgrade(Role, CompSpec, #st{sock = S} = St) ->
{ok, St#st{sock = NewS, mode = tls, role = Role,
ip = inet_parse:ntoa(IP), port = Port,
cs = NewCS}};
- Error ->
+ {_, Error} ->
?error("Cannot upgrade to TLS: ~p~n", [Error]),
error({cannot_upgrade, Error})
end.
+get_peercert(DoVerify, S) ->
+ case ssl:peercert(S) of
+ {ok, PeerCert} ->
+ PeerCert;
+ {error, _} when DoVerify == false ->
+ undefined
+ end.
+
do_upgrade(Sock, client, CompSpec) ->
- Opts = tls_opts(client, CompSpec),
+ {DoVerify, Opts} = tls_opts(client, CompSpec),
?debug("TLS Opts = ~p", [Opts]),
- ssl:connect(Sock, Opts);
+ {DoVerify, ssl:connect(Sock, Opts)};
do_upgrade(Sock, server, CompSpec) ->
- Opts = tls_opts(client, CompSpec),
+ {DoVerify, Opts} = tls_opts(client, CompSpec),
?debug("TLS Opts = ~p", [Opts]),
- ssl:ssl_accept(Sock, Opts).
-
-%% FIXME: For now, use the example certs delivered with the OTP SSL appl.
-tls_opts(Role, _CompSpec) ->
- {ok, DevCert} = setup:get_env(rvi_core, device_cert),
- {ok, DevKey} = setup:get_env(rvi_core, device_key),
- {ok, CACert} = setup:get_env(rvi_core, root_cert),
- [
- {verify, verify_peer},
- {certfile, DevCert},
- {keyfile, DevKey},
- {cacertfile, CACert},
- {verify_fun, {fun verify_fun/3, public_root_key()}},
- {partial_chain, fun(X) ->
- partial_chain(Role, X)
- end}
- ].
+ {DoVerify, ssl:ssl_accept(Sock, Opts)}.
+
+tls_opts(Role, CompSpec) ->
+ TlsOpts = rvi_common:get_value(tls_opts, [], CompSpec),
+ Opt = fun(K) -> opt(K, TlsOpts,
+ fun() ->
+ ok(setup:get_env(rvi_core, K))
+ end)
+ end,
+ case VOpt = lists:keyfind(verify, 1, TlsOpts) of
+ {verify, false} when Role == server ->
+ {false, [
+ {verify, verify_none},
+ {certfile, Opt(device_cert)},
+ {keyfile, Opt(device_key)},
+ {cacertfile, Opt(root_cert)}
+ ]};
+ {verify, false} ->
+ {false, [
+ {verify, verify_none}
+ ]};
+ _ when VOpt==false; VOpt == {verify, true} -> % {verify,true} default
+ {true, [
+ {verify, verify_peer},
+ {certfile, Opt(device_cert)},
+ {keyfile, Opt(device_key)},
+ {cacertfile, Opt(root_cert)},
+ {verify_fun, opt(verify_fun, TlsOpts,
+ {fun verify_fun/3, public_root_key()})},
+ {partial_chain, opt(partial_chain, TlsOpts,
+ fun(X) ->
+ partial_chain(Role, X)
+ end)}
+ ]}
+ end.
+
+opt(Key, Opts, Def) ->
+ case lists:keyfind(Key, 1, Opts) of
+ false when is_function(Def, 0) -> Def();
+ false -> Def;
+ {_, V} -> V
+ end.
+
+ok({ok, V}) ->
+ V;
+ok(Other) ->
+ error({badmatch, Other}).
public_root_key() ->
authorize_keys:provisioning_key().
@@ -425,11 +509,26 @@ partial_chain(_, Certs) ->
?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]),
{trusted_ca, hd(Certs)}.
-handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS,
- ip = IP, port = Port}) ->
+handle_elems(Elements, #st{frag_opts = FragOpts} = St) ->
+ MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts),
+ ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]),
+ case MaybeF of
+ true ->
+ %% It was a fragment, but not a complete message yet
+ St;
+ {true, Msg} ->
+ #st{packet_mod = PMod, packet_st = PSt} = St,
+ PMod:decode(Msg, fun(Elems) ->
+ got_msg(Elems, St)
+ end, PSt);
+ false ->
+ got_msg(Elements, St)
+ end.
+
+got_msg(Elements, #st{ip = IP, port = Port, mod = Mod, func = Fun, cs = CS} = St) ->
?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]),
Mod:Fun(self(), IP, Port, data, Elements, CS),
- ok.
+ St.
verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS,
signature = Sig}, PubKey) ->
diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl
index 7d3f45e..82e6b5a 100644
--- a/components/dlink_tls/src/dlink_tls_listener.erl
+++ b/components/dlink_tls/src/dlink_tls_listener.erl
@@ -14,7 +14,7 @@
-include_lib("lager/include/log.hrl").
-export([start_link/0,
- add_listener/3,
+ add_listener/4,
remove_listener/2]).
-export([init/2, handle_call/3, handle_cast/2, handle_info/2]).
@@ -28,8 +28,8 @@ start_link() ->
create_tabs(),
gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []).
-add_listener(IpAddr, Port, CompSpec) ->
- gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}).
+add_listener(IpAddr, Port, Opts, CompSpec) ->
+ gen_server:call(?MODULE, {add_listener, IpAddr, Port, Opts, CompSpec}).
remove_listener(IpAddr, Port) ->
gen_server:call(?MODULE, {remove_listener, IpAddr, Port}).
@@ -37,10 +37,11 @@ remove_listener(IpAddr, Port) ->
init([], State) ->
State1 =
lists:foldl(
- fun({{_,_}} = Addr, Acc) ->
+ fun({{{_,_} = Addr, Opts}}, Acc) ->
+ ?debug("Addr = ~p", [Addr]),
case gen_nb_server:add_listen_socket(Addr, Acc) of
{ok, Acc1} ->
- ets_insert(?TAB, {Addr}),
+ ets_insert(?TAB, {Addr, Opts}),
Acc1;
_Error ->
ets_delete(?TAB, Addr),
@@ -60,11 +61,12 @@ create_tabs() ->
?TAB
end.
-handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) ->
+handle_call({add_listener, IpAddr, Port, Opts, CompSpec}, _From, State) ->
+ ?debug("add_listener: IpAddr=~p, Port=~p", [IpAddr, Port]),
ets_insert(?TAB, {cs, CompSpec}),
case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of
{ok, State1} ->
- ets_insert(?TAB, {{IpAddr, Port}}),
+ ets_insert(?TAB, {{IpAddr, Port}, Opts}),
{reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )};
Error ->
@@ -104,17 +106,22 @@ new_connection(IP, Port, Sock, State) ->
%% first data.
%% Provide component spec as extra arg.
CompSpec = gen_nb_server:get_cb_state(State),
+ [{_, Opts}] = ets_lookup(?TAB, {IP, Port}),
+ CS = rvi_common:set_value(tls_opts, Opts, CompSpec),
{ok, P} = dlink_tls_conn:setup(
- undefined, 0, Sock,
+ server, undefined, 0, Sock,
dlink_tls_rpc,
- handle_socket, CompSpec),
- dlink_tls_conn:async_upgrade(P, server, CompSpec),
+ handle_socket, CS),
+ dlink_tls_conn:async_upgrade(P, server),
{ok, State}.
ets_insert(Tab, Obj) ->
ets:insert(Tab, Obj).
+ets_lookup(Tab, Key) ->
+ ets:lookup(Tab, Key).
+
ets_delete(Tab, Key) ->
ets:delete(Tab, Key).
diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl
index 14d580a..632008a 100644
--- a/components/dlink_tls/src/dlink_tls_rpc.erl
+++ b/components/dlink_tls/src/dlink_tls_rpc.erl
@@ -121,11 +121,33 @@ start_connection_manager() ->
setup_initial_listeners([], _CompSpec) ->
?debug("no initial listeners", []);
setup_initial_listeners([_|_] = TlsOpts, CompSpec) ->
+ case lists:keytake(ports, 1, TlsOpts) of
+ {value, {_, Ports}, Rest} ->
+ setup_initial_listeners_(Rest, CompSpec),
+ [setup_initial_listeners_(
+ [{port,P}|inherit_opts([ip], TlsOpts, POpts)], CompSpec)
+ || {P, POpts} <- Ports];
+ false ->
+ setup_initial_listeners_(TlsOpts, CompSpec)
+ end.
+
+inherit_opts(Keys, From, To) ->
+ Pick = [{K,V} || {K, V} <- From,
+ lists:member(K, Keys),
+ not lists:keymember(K, 1, To)],
+ Pick ++ To.
+
+setup_initial_listeners_([], _CompSpec) ->
+ ok;
+setup_initial_listeners_([_|_] = TlsOpts, CompSpec) ->
IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS),
Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT),
+ setup_listener(IP, Port, TlsOpts, CompSpec).
+
+setup_listener(IP, Port, Opts, CompSpec) ->
%% Add listener port.
?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]),
- case dlink_tls_listener:add_listener(IP, Port, CompSpec) of
+ case dlink_tls_listener:add_listener(IP, Port, Opts, CompSpec) of
ok ->
?notice("---- RVI Node External Address: ~s",
[ application:get_env(rvi_core, node_address, undefined)]);
@@ -204,15 +226,16 @@ connect_remote(IP, Port, CompSpec) ->
?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p",
[IP, Port, Timeout]),
log("new connection", [], CompSpec),
- case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of
+ case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(),
+ Timeout) of
{ ok, Sock } ->
?info("dlink_tls:connect_remote(): Connected ~p:~p",
[IP, Port]),
%% Setup a genserver around the new connection.
- {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock,
+ {ok, Pid } = dlink_tls_conn:setup(client, IP, Port, Sock,
?MODULE, handle_socket, CompSpec),
- try dlink_tls_conn:upgrade(Pid, client, CompSpec) of
+ try dlink_tls_conn:upgrade(Pid, client) of
ok ->
?debug("Upgrade result = ~p", [ok]),
%% Send authorize
@@ -520,8 +543,9 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) ->
{ reply, [ Res ], St };
-handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]},
+handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts] = Args},
_From, #st{tid = Tid} = St) ->
+ ?debug("send_data: Args = ~p", [Args]),
%% Resolve connection pid from service
case get_connections_by_service(Service) of
[] ->
@@ -533,7 +557,8 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]},
ConnPid, [{?DLINK_ARG_TRANSACTION_ID, Tid},
{?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE},
{?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1)},
- {?DLINK_ARG_DATA, Data}]),
+ {?DLINK_ARG_DATA, Data}],
+ DataLinkOpts),
{reply, [Res], St#st{tid = Tid + 1}}
end;
@@ -685,10 +710,13 @@ send_authorize(Pid, CompSpec) ->
dlink_tls_conn:send(Pid, rvi_common:pass_log_id(
[{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE},
{?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION},
- {?DLINK_ARG_ADDRESS, LocalIP},
+ {?DLINK_ARG_ADDRESS, bin(LocalIP)},
{?DLINK_ARG_PORT, LocalPort},
{?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)).
+bin(S) ->
+ iolist_to_binary(S).
+
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the connection manager, this is an incoming connection
@@ -780,15 +808,6 @@ get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-%% get_authorize_jwt(CompSpec) ->
-%% case authorize_rpc:get_authorize_jwt(CompSpec) of
-%% [ok, JWT] ->
-%% JWT;
-%% [not_found] ->
-%% ?error("No authorize JWT~n", []),
-%% error(cannot_authorize)
-%% end.
-
get_credentials(CompSpec) ->
case authorize_rpc:get_credentials(CompSpec) of
[ok, Creds] ->
@@ -798,17 +817,6 @@ get_credentials(CompSpec) ->
error(no_credentials_found)
end.
-%% validate_auth_jwt(JWT, Conn, CompSpec) ->
-%% case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of
-%% [ok] ->
-%% true;
-%% [not_found] ->
-%% false
-%% end.
-
-%% term_to_json(Term) ->
-%% binary_to_list(iolist_to_binary(exo_json:encode(Term))).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
diff --git a/components/proto_json/src/proto_json_rpc.erl b/components/proto_json/src/proto_json_rpc.erl
index 9f7ccc0..9f10ee3 100644
--- a/components/proto_json/src/proto_json_rpc.erl
+++ b/components/proto_json/src/proto_json_rpc.erl
@@ -135,18 +135,10 @@ handle_call({rvi, send_message,
{ <<"timeout">>, Timeout },
{ <<"parameters">>, Parameters }
]),
-
- case use_frag(Parameters, DataLinkOpts) of
- {true, Window} ->
- {Res, St1} =
- chunk_message(Window, TID, ServiceName, DataLinkMod,
- DataLinkOpts, iolist_to_binary(Data), St),
- {reply, Res, St1};
- false ->
- Res = DataLinkMod:send_data(
- St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data),
- {reply, Res, St}
- end;
+ RviOpts = rvi_common:rvi_options(Parameters),
+ Res = DataLinkMod:send_data(
+ St#st.cs, ?MODULE, ServiceName, RviOpts ++ DataLinkOpts, Data),
+ {reply, Res, St};
handle_call(Other, _From, St) ->
?warning("proto_json_rpc:handle_call(~p): unknown", [ Other ]),
@@ -157,27 +149,19 @@ handle_cast({rvi, receive_message, [Payload, IP, Port | _LogId]} = Msg, St) ->
?debug("~p:handle_cast(~p)", [?MODULE, Msg]),
Elems = jsx:decode(iolist_to_binary(Payload)),
- case Elems of
- [{<<"frg">>, _}|_] ->
- St1 = handle_frag(Elems, IP, Port, St),
- {noreply, St1};
- _ ->
- [ ServiceName, Timeout, Parameters ] =
- opts([<<"service">>, <<"timeout">>, <<"parameters">>],
- Elems, undefined),
-
- ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
- ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
- ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]),
-
- service_edge_rpc:handle_remote_message(St#st.cs,
- {IP, Port},
- ServiceName,
- Timeout,
- Parameters),
- {noreply, St}
- end;
+ [ ServiceName, Timeout, Parameters ] =
+ opts([<<"service">>, <<"timeout">>, <<"parameters">>],
+ Elems, undefined),
+ ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
+ ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
+ ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]),
+ service_edge_rpc:handle_remote_message(St#st.cs,
+ {IP, Port},
+ ServiceName,
+ Timeout,
+ Parameters),
+ {noreply, St};
handle_cast(Other, St) ->
?warning("proto_json_rpc:handle_cast(~p): unknown", [ Other ]),
@@ -199,44 +183,3 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
[ opt(K, Elems, Def) || K <- Keys].
-
-use_frag(Params, DLinkOpts) ->
- case p_reliable(Params) of
- undefined ->
- d_reliable(DLinkOpts);
- Other ->
- Other
- end.
-
-%% We use reliable send (i.e. fragmentation support) if:
-%% - rvi.max_msg_size is set in the Params (overrides static config)
-%% - rvi.reliable = true in the Params
-%% - max_msg_size is set for the data link
-%% - {reliable, true} defined for the data link
-%%
-%% If {reliable, true} and no max_message_size, we send a single packet
-%% as one fragment (marking it as first and last fragment) and use the
-%% ack mechanism to acknowledge successful delivery.
-%%
-p_reliable([{"rvi.max_msg_size", Sz}|_]) -> {true, Sz};
-p_reliable([{"rvi.reliable", true}|_]) -> {true, infinity};
-p_reliable([{"rvi.reliable", false}|_]) -> false;
-p_reliable([_|T]) -> p_reliable(T);
-p_reliable([]) -> undefined.
-
-d_reliable([{max_msg_size, Sz}|_]) -> {true, Sz};
-d_reliable([{reliable, true}|_]) -> {true, infinity};
-d_reliable([{reliable, false}|_]) -> false;
-d_reliable([_|T]) -> d_reliable(T);
-d_reliable([]) -> false.
-
-chunk_message(Window, TID, _ServiceName, _DLinkMod, _DLinkOpts, Data, St) ->
- _Frag = first_frag(Window, TID, Data),
-
- {ok, St}.
-
-handle_frag(_Elems, _IP, _Port, _St) ->
- error(nyi).
-
-first_frag(_Window, _TID, _Data) ->
- error(nyi).
diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl
index e718e4d..c8b083a 100644
--- a/components/proto_msgpack/src/proto_msgpack_rpc.erl
+++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl
@@ -136,8 +136,9 @@ handle_call({rvi, send_message,
{ <<"service">>, ServiceName },
{ <<"timeout">>, Timeout },
{ <<"parameters">>, Parameters } ], St#st.pack_opts),
+ RviOpts = rvi_common:rvi_options(Parameters),
Res = DataLinkMod:send_data(
- St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data),
+ St#st.cs, ?MODULE, ServiceName, RviOpts ++ DataLinkOpts, Data),
{reply, Res, St};
handle_call(Other, _From, St) ->
diff --git a/components/rvi_common/include/rvi_msgpack_rpc.hrl b/components/rvi_common/include/rvi_msgpack_rpc.hrl
new file mode 100644
index 0000000..91c54d8
--- /dev/null
+++ b/components/rvi_common/include/rvi_msgpack_rpc.hrl
@@ -0,0 +1,17 @@
+%% -*- mode: erlang; indent-tabs-mode: nil; -*-
+%%=============================================================================
+%%
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+
+-define(CONNECT_TIMEOUT, 5000).
+-define(CALL_TIMEOUT, 5000).
+
+-define(TYPE_REQUEST, 0).
+-define(TYPE_RESPONSE, 1).
+-define(TYPE_NOTIFY, 2).
+
+-define(RPC_PORT, 8000).
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index d3287c3..c227815 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -50,12 +50,18 @@
pass_log_id/2]). %% (PropList, CompSpec)
-export([utc_timestamp/0,
utc_timestamp/1]).
--export([bin/1]).
+-export([bin/1,
+ take/2]).
-export([start_json_rpc_server/3,
start_json_rpc_server/4]).
+-export([start_msgpack_rpc/2,
+ start_msgpack_rpc/3]).
-export([extract_json/2,
normalize_json/1,
term_to_json/1]).
+-export([rvi_options/1]).
+-export([save_source_address/3,
+ get_source_address/1]).
-export([announce/1]).
-define(NODE_SERVICE_PREFIX, node_service_prefix).
@@ -266,6 +272,18 @@ send_json_notification(Url,Method, Args) ->
{error, internal}
end.
+rvi_options(Opts) when is_list(Opts) ->
+ [{K,V} || {K,V} <- Opts,
+ is_rvi_opt(K)].
+
+is_rvi_opt(K) ->
+ case re:run(K, <<"^rvi\\.">>, []) of
+ {match, _} ->
+ true;
+ nomatch ->
+ false
+ end.
+
term_to_json(Term) ->
jsx:encode(normalize_json(Term)).
@@ -540,7 +558,10 @@ get_component_specification_() ->
CompList),
protocol = get_component_config_(protocol,
?COMP_SPEC_PROTOCOL_DEFAULT,
- CompList)
+ CompList),
+ rvi_common = get_component_config_(rvi_common,
+ ?COMP_SPEC_RVI_COMMON_DEFAULT,
+ CompList)
}
end.
@@ -577,8 +598,8 @@ get_component_modules(_, _) ->
get_module_specification(Component, Module, CompSpec) ->
case get_component_modules(Component, CompSpec) of
undefined ->
- ?debug("get_module_specification(): Missing: rvi_core:component: ~p",
- [Component]),
+ ?debug("get_module_specification(): Missing: rvi_core:component: ~p~nCS = ~p",
+ [Component, CompSpec]),
undefined;
Modules ->
@@ -687,47 +708,79 @@ get_module_type(Component, Module, CompSpec) ->
get_module_json_rpc_address(Component, Module, CompSpec) ->
%% Dig out the JSON RPC address
+ get_module_rpc_address(json, Component, Module, CompSpec).
+ %% case get_module_config(Component,
+ %% Module,
+ %% json_rpc_address,
+ %% undefined,
+ %% CompSpec) of
+ %% {ok, undefined } ->
+ %% ?debug("get_module_json_rpc_address(): Missing component spec: "
+ %% "rvi_core:component:~p:~p:json_rpc_address, {...}", [Component, Module]),
+ %% {error, {not_found, Component, Module, json_rpc_address}};
+
+ %% {ok, { IP, Port }} ->
+ %% ?debug("get_module_json_rpc_address(~p, ~p) -> ~p:~p",
+ %% [ Component, Module, IP, Port]),
+ %% {ok, bin(IP), Port };
+
+ %% {ok, Port } ->
+ %% ?debug("get_module_json_rpc_address(~p, ~p) -> 127.0.0.1:~p",
+ %% [ Component, Module, Port]),
+ %% {ok, <<"127.0.0.1">>, Port}
+ %% end.
+
+
+get_module_rpc_address(Type, Component, Module, CompSpec)
+ when Type == json; Type == msgpack ->
+ %% Dig out the JSON/MsgPack RPC address
+ Key = case Type of
+ json -> json_rpc_address;
+ msgpack -> msgpack_rpc_address
+ end,
case get_module_config(Component,
Module,
- json_rpc_address,
+ Key,
undefined,
CompSpec) of
{ok, undefined } ->
- ?debug("get_module_json_rpc_address(): Missing component spec: "
- "rvi_core:component:~p:~p:json_rpc_address, {...}", [Component, Module]),
- {error, {not_found, Component, Module, json_rpc_address}};
+ ?debug("get_module_rpc_address(): Missing component spec: "
+ "rvi_core:components:~p:~p:~s, {...}",
+ [Component, Module, Key]),
+ {error, {not_found, Component, Module, Key}};
{ok, { IP, Port }} ->
- ?debug("get_module_json_rpc_address(~p, ~p) -> ~p:~p",
- [ Component, Module, IP, Port]),
+ ?debug("get_module_rpc_address(~p, ~p, ~p) -> ~p:~p",
+ [Type, Component, Module, IP, Port]),
{ok, bin(IP), Port };
{ok, Port } ->
- ?debug("get_module_json_rpc_address(~p, ~p) -> 127.0.0.1:~p",
- [ Component, Module, Port]),
+ ?debug("get_module_rpc_address(~p, ~p, ~p) -> 127.0.0.1:~p",
+ [Type, Component, Module, Port]),
{ok, <<"127.0.0.1">>, Port}
end.
get_module_json_rpc_url(Component, Module, CompSpec) ->
- case get_module_json_rpc_address(Component, Module, CompSpec) of
+ get_module_rpc_url(json, Component, Module, CompSpec).
+
+get_module_rpc_url(Type, Component, Module, CompSpec)
+ when Type == json; Type == msgpack ->
+ case get_module_rpc_address(Type, Component, Module, CompSpec) of
{ ok, IP, Port } when is_integer(Port)->
Res = bin(["http://", IP, ":", integer_to_binary(Port)]),
- ?debug("get_module_json_rpc_url(~p, ~p) ->~p", [ Component, Module, Res ]),
+ ?debug("get_module_rpc_url(~p, ~p, ~p) ->~p", [Type, Component, Module, Res ]),
Res;
-
-
{ ok, IP, Port } when is_list(Port)->
Res = bin(["http://", IP, ":", Port]),
- ?debug("get_module_json_rpc_url(~p, ~p) ->~p", [ Component, Module, Res ]),
+ ?debug("get_module_rpc_url(~p, ~p, ~p) ->~p", [Type, Component, Module, Res ]),
Res;
Err ->
- ?debug("get_module_json_rpc_url(~p, ~p) Failed: ~p", [ Component, Module, Err ]),
+ ?debug("get_module_rpc_url(~p, ~p, ~p) Failed: ~p", [Type, Component, Module, Err ]),
Err
end.
-
get_module_genserver_pid(Component, Module, CompSpec) ->
%% Check that this is a JSON RPC module
case get_module_type(Component, Module, CompSpec) of
@@ -767,6 +820,39 @@ start_json_rpc_server(Component, Module, Supervisor, XOpts) ->
Err
end.
+start_msgpack_rpc(Component, Module) ->
+ start_msgpack_rpc(Component, Module, []).
+
+start_msgpack_rpc(Component, Module, XOpts) ->
+ ?debug("start_msgpack_rpc(~w, ~w, ~p)", [Component, Module, XOpts]),
+ case get_module_rpc_address(msgpack, Component, Module, get_component_specification()) of
+ {ok, {client, Opts}} ->
+ ?debug("starting msgpack_rpc client: ~p", [Opts]),
+ start_msgpack_rpc_client(Component, Module, Opts, XOpts);
+ {ok, {server, Opts}} ->
+ ?debug("starting msgpack_rpc server: ~p", [Opts]),
+ start_msgpack_rpc_server(Component, Module, Opts, XOpts);
+ {ok, {IP, Port}} ->
+ start_msgpack_rpc_server(Component, Module, [{ip, IP}, {port, Port}], XOpts);
+ Error ->
+ ?debug("no recognized msgpack config for ~w:~w (~p)",
+ [Component, Module, Error])
+ end.
+
+start_msgpack_rpc_client(Component, Module, Opts, XOpts) ->
+ Name = {msgpack_rpc_client, Component, Module},
+ rvi_msgpack_rpc:start_link([{gproc, {n,l,Name}}|XOpts] ++ Opts).
+
+start_msgpack_rpc_server(Component, Module, Opts, XOpts) ->
+ Name = {msgpack_rpc_server, Component, Module},
+ [Callback, Rest] = take([{callback, fun() -> msgpack_rpc_cb(Module) end}],
+ XOpts ++ Opts),
+ rvi_msgpack_rpc_server:start_link([{callback, Callback} | Rest]).
+
+msgpack_rpc_cb(Module) ->
+ binary_to_existing_atom(
+ <<(atom_to_binary(Module, latin1))/binary, "_msgpack">>, latin1).
+
utc_timestamp() ->
calendar:datetime_to_gregorian_seconds(
calendar:universal_time()) - seconds_jan_1970().
@@ -907,3 +993,40 @@ announce(Name) ->
?debug("Announce ~p~n", [Name]),
gproc:reg(Name),
ok.
+
+%% inet_ip(IP) when is_binary(IP) ->
+%% inet_ip(binary_to_list(IP));
+%% inet_ip(IP) ->
+%% {ok, Addr} = inet:ip(IP),
+%% Addr.
+
+
+%% take([ Key::atom() | {Key::atom(), Default} ], Opts) -> [Value | Rest]
+take([H|T], Opts) when is_atom(H) ->
+ case lists:keytake(H, 1, Opts) of
+ {value, {_, Value}, Rest} ->
+ [Value | take(T, Rest)];
+ false ->
+ error({required, H})
+ end;
+take([{H,Default}|T], Opts) ->
+ case lists:keytake(H, 1, Opts) of
+ {value, {_, Value}, Rest} ->
+ [Value | take(T, Rest)];
+ false when is_function(Default, 0) ->
+ [Default() | take(T, Opts)];
+ false ->
+ [Default | take(T, Opts)]
+ end;
+take([], Opts) ->
+ [Opts].
+
+save_source_address(client, Socket, CS) ->
+ {ok, {_, _} = Addr} = inet:peername(Socket),
+ set_value(source_address, Addr, CS);
+save_source_address(server, Socket, CS) ->
+ {ok, {_, _} = Addr} = inet:sockname(Socket),
+ set_value(source_address, Addr, CS).
+
+get_source_address(CS) ->
+ get_value(source_address, undefined, CS).
diff --git a/components/rvi_common/src/rvi_frag.erl b/components/rvi_common/src/rvi_frag.erl
index 74c34da..d4000a4 100644
--- a/components/rvi_common/src/rvi_frag.erl
+++ b/components/rvi_common/src/rvi_frag.erl
@@ -1,27 +1,49 @@
-module(rvi_frag).
--compile(export_all).
+-behaviour(gen_server).
+
+-export([send/4, % (Msg, Window, Mod, Opts)
+ maybe_fragment/3]).
+-export([start_link/0]).
+
+-export([init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3]).
+
+-include_lib("lager/include/log.hrl").
+
+-record(st, {msg_id = 0,
+ frags = dict_new()}).
+
+-record(frag, {id,
+ window,
+ msg,
+ tref}).
--record(st, {}).
-define(SERVER, ?MODULE).
-define(TAB, ?MODULE).
+-define(PACKET_MOD, dlink_data_msgpack).
+-define(PACKET_ST, []).
--type msg() :: binary().
--type chunk() :: binary().
--type tid() :: any().
--type offset() :: non_neg_integer().
--type chunk_size() :: non_neg_integer().
--type is_last() :: boolean().
--type frag() :: {offset(), offset(), chunk(), is_last()}.
-
-fragment(TID, {Offset, _, Bin, IsLast}) ->
- fragment(TID, Offset, Bin, IsLast).
+send(Msg, Opts, Mod, SendF) ->
+ ?debug("send(Msg, ~p, ~p, ~p)", [Opts, Mod, SendF]),
+ case use_frag(Msg, Opts) of
+ {true, Window} ->
+ gen_server:call(?SERVER, {send, Msg, Window, Mod, Opts});
+ false ->
+ SendF()
+ end.
--spec fragment(tid(), offset(), msg(), is_last()) ->
- ok
- | {message, msg()}
- | {missing, [{offset(), offset()}]}.
-fragment(TID, Offset, Bin, IsLast) ->
- gen_server:call(?SERVER, {fragment, TID, Offset, Bin, IsLast}).
+maybe_fragment([{<<"frg">>,[_|_] = Info}], Mod, Opts) ->
+ gen_server:call(?SERVER, {fragment_received, Info, Mod, Opts});
+maybe_fragment([{<<"frg-get">>, [_|_] = Args}], Mod, Opts) ->
+ gen_server:call(?SERVER, {frag_get_received, Args, Mod, Opts});
+maybe_fragment([{<<"frg-end">>, [_|_] = Args}], _Mod, _Opts) ->
+ gen_server:call(?SERVER, {frag_end_received, Args});
+maybe_fragment(_, _, _) ->
+ false.
start_link() ->
create_ets(),
@@ -38,37 +60,63 @@ create_ets() ->
init([]) ->
{ok, #st{}}.
--spec first_fragment(msg(), chunk_size()) -> frag().
-first_fragment(Msg, infinity) ->
- {1, byte_size(Msg), Msg, true};
-first_fragment(Msg, ChunkSz) when is_integer(ChunkSz), ChunkSz > 0 ->
- MsgBin = iolist_to_binary(Msg),
- Sz = erlang:min(byte_size(MsgBin), ChunkSz),
- <<Frag:Sz/binary, Rest/binary>> = MsgBin,
- {1, Sz, Frag, Rest =:= <<>>}.
-
--spec next_fragment(msg(), offset() | frag(), chunk_size()) -> done | frag().
-next_fragment(Msg, {_, Last, _PrevFrag, _IsLast}, ChunkSz) ->
- next_fragment(Msg, Last, ChunkSz);
-
-next_fragment(Msg, Last, ChunkSz) ->
+fragment_from_offset(Msg, Offs, ChunkSz) ->
+ ?debug("fragment_from_offset(Msg, ~p, ~p)", [Offs, ChunkSz]),
MsgBin = iolist_to_binary(Msg),
- case byte_size(MsgBin) - Last of
+ ?debug("MsgBin size = ~p", [byte_size(MsgBin)]),
+ case byte_size(MsgBin) - Offs + 1 of
NewSz when NewSz =< 0 ->
- done;
+ ?debug("NewSz = ~p - empty fragment!!", [NewSz]),
+ <<>>;
NewSz ->
Sz = erlang:min(NewSz, ChunkSz),
- <<_:Last/binary, Frag:Sz/binary, Rest/binary>> = MsgBin,
- Start = Last+1,
- Stop = Last + Sz,
- {Start, Stop, Frag, Rest =:= <<>>}
+ Prev = Offs - 1,
+ ?debug("NewSz = ~p, Sz = ~p, Prev = ~p", [NewSz, Sz, Prev]),
+ <<_:Prev/binary, Frag:Sz/binary, _Rest/binary>> = MsgBin,
+ Frag
+ end.
+
+handle_call(Req, From, S) ->
+ try handle_call_(Req, From, S)
+ catch
+ error:R ->
+ {reply, {error, R}, S}
end.
-handle_call({fragment, TID, Offs, Bin, IsLast}, _, S) ->
- End = Offs + byte_size(Bin) -1,
- ets:insert(?TAB, {{TID, Offs, End}, Bin}),
- {reply, check_message(TID, Offs, IsLast), S};
-handle_call(_, _, S) ->
+handle_call_({frag_get_received, Info, Mod, Opts}, {Pid,_},
+ #st{frags = Fs} = S) ->
+ [ID, Offset, Bytes] = Info,
+ TID = {Pid, ID},
+ case dict_find(TID, Fs) of
+ {ok, #frag{msg = Msg}} ->
+ Bin = fragment_from_offset(Msg, Offset, Bytes),
+ Sz = byte_size(Msg),
+ Mod:send_data(Pid, encode_fragment(ID, Sz, Offset, Bin, Opts)),
+ {reply, true, S};
+ error ->
+ %% Ignore, but reflect that it was a fragment message
+ %% (perhaps we should send an error message to the client?)
+ {reply, true, S}
+ end;
+
+handle_call_({send, Msg, Window, Mod, Opts}, {Pid, _}, St) ->
+ try init_frag(Msg, Window, Opts, Mod, Pid, St)
+ catch
+ error:R ->
+ ?error("init_frag ERROR: ~p~n~p", [R, erlang:get_stacktrace()]),
+ {reply, {error, R}, St}
+ end;
+handle_call_({fragment_received, FragInfo, Mod, Opts}, {Pid,_}, S) ->
+ ?debug("fragment_received", []),
+ handle_fragment_received(FragInfo, Mod, Opts, Pid, S);
+
+handle_call_({frag_end_received, FragInfo}, {Pid,_},
+ #st{frags = Fs} = S) ->
+ [ID, ResultCode] = FragInfo,
+ ?debug("fragment-end; ID = ~p; ResultCode = ~p", [ID, ResultCode]),
+ {reply, true, S#st{frags = dict_erase({Pid,ID}, Fs)}};
+
+handle_call_(_, _, S) ->
{reply, error, S}.
handle_cast(_, S) -> {noreply, S}.
@@ -76,15 +124,52 @@ handle_info(_, S) -> {noreply, S}.
terminate(_, _) -> ok.
code_change(_, S, _) -> {ok, S}.
-check_message(TID, Offs, IsLast) ->
+handle_fragment_received(FragInfo, Mod, Opts, Pid, S) ->
+ [ID, Size, Offs, Bin] = FragInfo,
+ FragSz = byte_size(Bin),
+ End = Offs + FragSz - 1,
+ TID = {Pid, ID},
+ case Bin of
+ <<>> ->
+ ?debug("Empty fragment (~p); don't store", [FragInfo]),
+ ok;
+ _ ->
+ ?debug("ID = ~p, Size = ~p, Offs = ~p, End = ~p",
+ [ID, Size, Offs, End]),
+ ets:insert(?TAB, {{TID, Offs, End}, Bin})
+ end,
+ if Offs == 1, End >= Size ->
+ send_msg_complete(ID, 0, Mod, Pid, Opts),
+ {reply, {true, Bin}, S};
+ true ->
+ Check = check_message(TID, Offs, Size),
+ ?debug("check_message() -> ~p", [Check]),
+ case Check of
+ {message, Msg} ->
+ send_msg_complete(ID, 0, Mod, Pid, Opts),
+ {reply, {true, Msg}, S};
+ {missing, [{Start, End}|_]} ->
+ ReqSz = erlang:min(FragSz, End-Start+1),
+ request_fragment(ID, Start, ReqSz, Mod, Pid, Opts),
+ {reply, true, S};
+ ok ->
+ request_fragment(ID, End+1, FragSz, Mod, Pid, Opts),
+ {reply, true, S}
+ end
+ end.
+
+check_message(TID, Offs, Size) ->
Frags = fragments(TID, Offs),
- case find_holes(Frags) of
- [] when IsLast ->
+ case find_holes(Frags, Size) of
+ {[], true} ->
+ ?debug("no holes, complete message", []),
ets:select_delete(?TAB, [{ {{TID,'_','_'},'_'}, [], [true] }]),
{message, join_fragments(Frags)};
- [] ->
+ {[], _} ->
+ ?debug("no holes, not complete", []),
ok;
- Holes ->
+ {Holes, _} ->
+ ?debug("found holes = ~p", [Holes]),
{missing, Holes}
end.
@@ -93,18 +178,21 @@ fragments(TID, Offs) ->
[{'=<', '$1', Offs}],
[{{'$1','$2','$3'}}] }]).
-find_holes(Frags) ->
- {_, Missing} =
+find_holes(Frags, Size) ->
+ {_, Missing, IsLast} =
lists:foldl(
- fun({A, B, _}, {Prev, Acc}) ->
- case A - Prev of
+ fun({Beg, End, _Bin}, {Prev, Acc, IsLast0}) ->
+ ?debug("IsLast0 = ~p, Beg = ~p, End = ~p, Size = ~p",
+ [IsLast0, Beg, End, Size]),
+ IsLast1 = IsLast0 orelse (End >= Size),
+ case Beg - Prev of
1 ->
- {B, Acc};
+ {End, Acc, IsLast1};
Diff when Diff > 1 ->
- {B, [{Prev+1, A-1}|Acc]}
+ {End, [{Prev+1, Beg-1}|Acc], IsLast1}
end
- end, {0, []}, Frags),
- Missing.
+ end, {0, [], false}, Frags),
+ {Missing, IsLast}.
%% Allow fragments to overlap
join_fragments([{1,_,F}|Frags]) ->
@@ -118,3 +206,156 @@ join_fragments([{A,_,F}|Frags], Acc) ->
join_fragments(Frags, <<Prefix/binary, F/binary>>);
join_fragments([], Acc) ->
Acc.
+
+init_frag(Msg, Window, Opts, Mod, Pid, St) ->
+ ?debug("init_frag(Msg, ~p, ~p, ~p, ~p, ~p", [Window,Opts,Mod,Pid,St]),
+ {Id, St1} = next_id(St),
+ TID = {Pid, Id},
+ Sz = byte_size(Msg),
+ AdjWindow = adjust_window(Window, Id, Sz, Opts),
+ ?debug("Adjusted window: ~p", [AdjWindow]),
+ Frag = fragment_from_offset(Msg, 1, AdjWindow),
+ Data = encode_fragment(Id, Sz, 1, Frag, Opts),
+ ?debug("size of encoded fragment (Win=~p): ~p", [Window, size(Data)]),
+ ok = Mod:send_data(Pid, Data),
+ TRef = start_timer(init_timeout, TID, Pid, Opts),
+ {reply, ok, store_frag(TID, #frag{id = Id,
+ window = AdjWindow,
+ msg = Msg,
+ tref = TRef}, St1)}.
+
+next_id(#st{msg_id = Prev} = St) ->
+ Id = Prev+1,
+ {Id, St#st{msg_id = Id}}.
+
+start_timer(Type, Id, Pid, Opts) ->
+ erlang:start_timer(timeout_value(Type, Opts), self(), {Type, Id, Pid}).
+
+store_frag(TID, #frag{} = Frag, #st{frags = Frags} = St) ->
+ St#st{frags = dict_store(TID, Frag, Frags)}.
+
+timeout_value(Type, Opts) ->
+ case lists:keyfind(Type, 1, Opts) of
+ {_, Value} -> Value;
+ false -> timeout_default(Type)
+ end.
+
+timeout_default(init_timeout) -> timer:hours(1);
+timeout_default(request_timeout) -> timer:seconds(30).
+
+
+dict_new() ->
+ orddict:new().
+
+dict_store(Key, Value, Dict) ->
+ orddict:store(Key, Value, Dict).
+
+dict_find(Key, Dict) ->
+ orddict:find(Key, Dict).
+
+dict_erase(Key, Dict) ->
+ orddict:erase(Key, Dict).
+
+adjust_window(Window, Id, Sz, Opts) ->
+ %% Subtract framing size (encoded empty fragment) from Window,
+ %% but arbitrarily set minimum window to 10 (must at least be > 0)
+ Enc = encode_msg([{<<"frg">>, [Id, Sz, Sz, <<>>]}], Opts),
+ ?debug("Empty frag: ~p", [Enc]),
+ erlang:max(10, Window - byte_size(Enc)).
+
+encode_fragment(Id, Sz, Offs, Frag, Opts) ->
+ encode_msg([{<<"frg">>, [Id, Sz, Offs, Frag]}], Opts).
+
+request_fragment(ID, Start, Bytes0, Mod, Pid, Opts) ->
+ Bytes = erlang:max(get_window(Opts), Bytes0),
+ FragInfo = [ID, Start, Bytes],
+ ?debug("request_fragment: ~p", [FragInfo]),
+ Mod:send_data(Pid, encode_msg([{<<"frg-get">>, FragInfo}], Opts)).
+
+send_msg_complete(ID, ResultCode, Mod, Pid, Opts) ->
+ ?debug("send_msg_complete(~p, ~p, ~p, ~p, ...)", [ID, ResultCode,
+ Mod, Pid]),
+ Mod:send_data(Pid, encode_msg([{<<"frg-end">>, [ID, ResultCode]}], Opts)).
+
+encode_msg(Msg, Opts) ->
+ {PMod, PSt} = get_packet_mod(Opts),
+ {ok, Bin, _} = PMod:encode(Msg, PSt),
+ Bin.
+
+get_packet_mod(Opts) ->
+ case lists:keyfind(packet_mod, 1, Opts) of
+ false ->
+ {?PACKET_MOD, ?PACKET_MOD:init([])};
+ {_, {Mod,_} = Res} when is_atom(Mod) ->
+ Res;
+ {_, Mod} when is_atom(Mod) ->
+ case lists:keyfind(packet_st, 1, Opts) of
+ false ->
+ {Mod, Mod:init([])};
+ {_, St} ->
+ {Mod, St}
+ end
+ end.
+
+get_window([{"rvi.max_msg_size", Sz}|_]) -> Sz;
+get_window([{max_msg_size, Sz}|_] ) -> Sz;
+get_window([_|T]) ->
+ get_window(T);
+get_window([]) ->
+ [].
+
+use_frag(Bin, Opts) ->
+ {PR, DR, PW, DW} = frag_opts(Opts),
+ Reliable = case {PR, DR} of
+ {_, _} when is_boolean(PR) -> PR;
+ {undefined, _} when is_boolean(DR) -> DR;
+ _ -> undefined
+ end,
+ Win = calc_window(PW, DW),
+ Sz = byte_size(Bin),
+ case Reliable of
+ true -> {true, Win};
+ false ->
+ case Win of
+ _ when is_integer(Win) ->
+ if Sz < Win -> false;
+ true -> {true, Win}
+ end;
+ infinity ->
+ false
+ end;
+ undefined ->
+ if is_integer(Win) -> {true, Win};
+ true -> false
+ end
+ end.
+
+frag_opts(Opts) ->
+ ?debug("frag_opts(~p)", [Opts]),
+ frag_opts(Opts, undefined, undefined, undefined, undefined).
+
+frag_opts([{"rvi.max_msg_size", PW}|T], PR, DR, _, DW) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([{<<"rvi.max_msg_size">>, PW}|T], PR, DR, _, DW) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([{max_msg_size, DW}|T], PR, DR, PW, _) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([{"rvi.reliable", PR}|T], _, DR, PW, DW) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([{reliable, DR}|T], PR, _, PW, DW) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([_|T], PR, DR, PW, DW) ->
+ frag_opts(T, PR, DR, PW, DW);
+frag_opts([], PR, DR, PW, DW) ->
+ {PR, DR, PW, DW}.
+
+calc_window(PW, DW) when is_integer(PW), is_integer(DW) ->
+ erlang:min(PW, DW);
+calc_window(undefined, DW) ->
+ calc_window(DW);
+calc_window(PW, undefined) ->
+ calc_window(PW).
+
+calc_window(undefined) -> infinity;
+calc_window(infinity ) -> infinity;
+calc_window(W) when is_integer(W), W > 0 -> W.
diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl
index 067485c..80040bf 100644
--- a/components/rvi_common/src/rvi_log.erl
+++ b/components/rvi_common/src/rvi_log.erl
@@ -106,11 +106,11 @@ timestamp() ->
-define(ELEM(A), {element, #evt.A, '$_'}).
-define(PROD, {{'$1', ?ELEM(level), ?ELEM(component), ?ELEM(event)}}).
-fetch(Tids) ->
- fetch(Tids, []).
+fetch(Tid) ->
+ fetch(Tid, []).
-fetch(Tids, Args) ->
- TidSet = select_ids(Tids),
+fetch(TidPat, Args) ->
+ TidSet = select_ids(TidPat),
lists:foldr(
fun(Tid, Acc) ->
case match_events(
@@ -267,6 +267,7 @@ handle_rpc(<<"log">>, Args) ->
{ok, [{status, rvi_common:json_rpc_status(ok)}]};
handle_rpc(<<"fetch">>, Args) ->
TIDs = get_json_ids(Args),
+ ?debug("fetch: TIDs = ~p", [TIDs]),
Res = [{TID, fetch(TID)} || TID <- TIDs],
{ok, [{status, rvi_common:json_rpc_status(ok)},
{<<"log">>, format_result(Res)}]};
@@ -383,30 +384,39 @@ valid_id_pat(TP) ->
false
end.
-select_ids(TIDs) ->
+select_ids(TidPat) ->
ets:foldr(
fun({Tid}, Acc) ->
- case match_id(Tid, TIDs) of
+ case match_id(Tid, TidPat) of
true -> [Tid|Acc];
false -> Acc
end
end, [], ?IDS).
-match_id(Tid, [Pat|Pats]) ->
+match_id(Tid, Pat) ->
case re:run(Tid, Pat, []) of
{match, _} -> true;
- nomatch -> match_id(Tid, Pats)
- end;
-match_id(_, []) ->
- false.
+ nomatch -> false
+ end.
format_result(Log) ->
- [{TID, format_events(Es)} || {TID, Es} <- Log].
-
-format_events([{TS, Comp, Evt}|Es]) ->
- [[{<<"ts">>, rvi_common:utc_timestamp(TS)},
+ ?debug("format_result(~p)", [Log]),
+ Events = lists:foldl(
+ fun({_Pat, Matches}, Acc) ->
+ lists:foldl(
+ fun({Id,Es}, D) ->
+ orddict:store(Id, Es, D)
+ end, Acc, Matches)
+ end, orddict:new(), Log),
+ ?debug("Events = ~p", [Events]),
+ [{TID, format_events(Es)} || {TID, Es} <- Events].
+
+format_events([{TS, Level, Comp, Evt} = E|Es]) ->
+ ?debug("format_events(), E = ~p", [E]),
+ [[{<<"ts">>, utc_hr_timestamp(TS)},
+ {<<"lvl">>, bin(Level)},
{<<"cmp">>, bin(Comp)},
- {<<"evt">>, bin(Evt)}] || format_events(Es)];
+ {<<"evt">>, bin(Evt)}] | format_events(Es)];
format_events([]) ->
[].
@@ -415,3 +425,11 @@ bin(B) when is_binary(B) -> B;
bin(L) when is_list(L) -> iolist_to_binary(L);
bin(Other) ->
iolist_to_binary(io_lib:fwrite("~w", [Other])).
+
+
+utc_hr_timestamp({_,_,US} = TS) ->
+ %% The 'rem' op is just a precaution; a properly generated 'now' TS
+ %% should not have US > 1000000, but a derived TS could (since just
+ %% about all operations on such timestamps will work anyway).
+ Secs = rvi_common:utc_timestamp(TS),
+ Secs + (US rem 1000000)/1000000.
diff --git a/components/rvi_common/src/rvi_msgpack_rpc.erl b/components/rvi_common/src/rvi_msgpack_rpc.erl
new file mode 100644
index 0000000..d1f2bf9
--- /dev/null
+++ b/components/rvi_common/src/rvi_msgpack_rpc.erl
@@ -0,0 +1,165 @@
+%% -*- mode: erlang; indent-tabs-mode: nil; -*-
+%%=============================================================================
+%%
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+-module(rvi_msgpack_rpc).
+
+-export([start_link/1,
+ start_link/3,
+ start_link/4,
+ start_link/5]).
+
+-export([call/3,
+ async_call/3,
+ notify/3,
+ join/2]).
+
+-export([control/4]).
+
+-include("rvi_msgpack_rpc.hrl").
+-include_lib("lager/include/log.hrl").
+
+-record(st, {pending = [], id = 1, opts, buf = <<>>,
+ msgpack_opts = []}).
+
+start_link(Opts) ->
+ [IP, Port, ExoOpts, Rest] = rvi_common:take([ip, port, {exo, []}], Opts),
+ start_link(IP, Port, Rest, ExoOpts).
+
+start_link(Host, Port, Opts) ->
+ [Exo, Rest] = rvi_common:take([{exo, []}], Opts),
+ start_link(Host, Port, Rest, Exo).
+
+start_link(Host, Port, Opts, ExoOpts) ->
+ start_link(Host, Port, Opts, protos(ExoOpts), ExoOpts).
+
+start_link(Host, Port, Opts, Protos, ExoOpts) ->
+ ConnectTimeout = opt([connect_timeout, timeout], Opts, ?CONNECT_TIMEOUT),
+ case exo_socket:connect(Host, Port, Protos, ExoOpts, ConnectTimeout) of
+ {ok, Socket} ->
+ {ok, Pid} = exo_socket_session:start_link(Socket, ?MODULE, {Host, Port, Opts}),
+ exo_socket:controlling_process(Socket, Pid),
+ gen_server:cast(Pid, {activate, once}),
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+call(Pid, Method, Args) ->
+ call(Pid, Method, Args, ?CALL_TIMEOUT).
+
+call(Pid, Method, Args, Timeout) ->
+ gen_server:call(Pid, {call, Method, Args}, Timeout).
+
+async_call(Pid, Method, Args) ->
+ Ref = erlang:monitor(process, Pid),
+ ok = gen_server:call(Pid, {async_call, {self(), Ref}, Method, Args}),
+ Ref.
+
+notify(Pid, Method, Args) ->
+ gen_server:call(Pid, {notify, Method, Args}).
+
+join(Ref) ->
+ join(Ref, ?CALL_TIMEOUT).
+
+join(Ref, Timeout) ->
+ receive
+ {Ref, Reply} ->
+ erlang:demonitor(Ref),
+ Reply;
+ {'DOWN', Ref, _, _, Reason} ->
+ error(Reason)
+ after Timeout ->
+ error(timeout)
+ end.
+
+init({Host, Port, Opts}) ->
+ MsgPackOpts = opt([msgpack], Opts, rvi_msgpack_rpc_server:msgpack_options()),
+ gproc:reg({p,l,msgpack_rpc_client}, {Host, Port}),
+ case lists:keyfind(gproc, 1, Opts) of
+ {_, Reg} -> gproc:reg(Reg, {Host, Port});
+ false -> ok
+ end,
+ {ok, #st{opts = Opts,
+ msgpack_opts = MsgPackOpts}}.
+
+control(XSocket, Req, From, St) ->
+ try control_(XSocket, Req, From, St)
+ catch
+ error:Reason ->
+ {reply, {error, Reason}, St}
+ end.
+
+control_(XSocket, {call, Method, Args}, From,
+ #st{pending = Pending, id = ID, msgpack_opts = MOpts} = St) ->
+ pack_and_send(XSocket, [?TYPE_REQUEST, ID, Method, Args], MOpts),
+ {noreply, St#st{pending = [{ID, From}|Pending], id = ID+1}};
+control_(XSocket, {async_call, From, Method, Args}, _,
+ #st{pending = Pending, msgpack_opts = MOpts, id = ID} = St) ->
+ pack_and_send(XSocket, [?TYPE_REQUEST, ID, Method, Args], MOpts),
+ {reply, ok, St#st{pending = [{ID, From}|Pending], id = ID+1}};
+control_(XSocket, {notify, Method, Args}, _, #st{msgpack_opts = MOpts} = St) ->
+ pack_and_send(XSocket, [?TYPE_NOTIFY, Method, Args], MOpts),
+ {reply, ok, St};
+control_(_, _, _, St) ->
+ {reply, {error, unsupported}, St}.
+
+pack_and_send(XSocket, Msg, MOpts) ->
+ Data = msgpack:pack(Msg, MOpts),
+ exo_socket:send(XSocket, Data).
+
+data(XSocket, Data, #st{buf = Buf, pending = Pending,
+ msgpack_opts = MOpts} = St) ->
+ Buf1 = <<Buf/binary, Data/binary>>,
+ try msgpack:unpack_stream(Buf1, MOpts) of
+ {[?TYPE_RESPONSE, ID, Error, Result], Rest} ->
+ case lists:keytake(ID, 1, Pending) of
+ {value, {_, From}, Pending1} ->
+ Reply = case Error of
+ null -> {ok, Result};
+ _ -> {error, Error}
+ end,
+ gen_server:reply(From, Reply),
+ {ok, St#st{pending = Pending1, buf = Rest}};
+ false ->
+ {ok, St#st{buf = Rest}}
+ end;
+ {error, incomplete} ->
+ {ok, St#st{buf = Buf1}};
+ {error, Reason} ->
+ {ok, St#st{buf = <<>>}}
+ catch
+ error:Reason ->
+ ?debug("unpack CRASH: ~p", [Reason]),
+ {ok, St#st{buf = <<>>}}
+ end.
+
+opt([H|T], Opts, Default) ->
+ case lists:keyfind(H, 1, Opts) of
+ {_, Value} -> Value;
+ false when T==[] -> Default;
+ false ->
+ opt(T, Opts, Default)
+ end.
+
+
+protos(Opts) ->
+ case [1 || {K,_} <- Opts, lists:member(K, ssl_connect_opts())] of
+ [] ->
+ [tcp];
+ [_|_] ->
+ [tcp, ssl]
+ end.
+
+%% Copied from exo_socket.erl
+ssl_connect_opts() ->
+ [versions, verify, verify_fun,
+ fail_if_no_peer_cert,
+ depth, cert, certfile, key, keyfile,
+ password, cacerts, cacertfile, dh, dhfile, cihpers,
+ debug].
diff --git a/components/rvi_common/src/rvi_msgpack_rpc_server.erl b/components/rvi_common/src/rvi_msgpack_rpc_server.erl
new file mode 100644
index 0000000..cfce66f
--- /dev/null
+++ b/components/rvi_common/src/rvi_msgpack_rpc_server.erl
@@ -0,0 +1,185 @@
+%% -*- mode: erlang; indent-tabs-mode: nil; -*-
+%%=============================================================================
+%%
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+
+-module(rvi_msgpack_rpc_server).
+
+-behaviour(exo_socket_server).
+
+-include("rvi_msgpack_rpc.hrl").
+-include_lib("lager/include/log.hrl").
+
+-record(state,
+ {
+ callback,
+ msgpack_opts = msgpack_options(),
+ buf = <<>>
+ }).
+
+-export([init/2, data/3, close/2, error/3]).
+
+-export([start/1, start/3, start/4]).
+-export([start_link/1,
+ start_link/4]).
+-export([start_ssl/1, start_ssl/3, start_ssl/4]).
+-export([start_link_ssl/4]).
+-export([control/4]).
+
+-export([msgpack_options/0]).
+
+
+msgpack_options() ->
+ [{allow_atom, pack},
+ {enable_str, true},
+ jsx].
+
+start(Callback) ->
+ start(?RPC_PORT, Callback, []).
+
+start(Port, Callback, Options) ->
+ start(Port, Callback, Options, []).
+
+start(Port, Callback, Options, ExoOptions) ->
+ do_start(Port, Callback, Options, ExoOptions, start).
+
+start_link(Opts) ->
+ [Port, Callback, Exo, Rest] = rvi_common:take([port, callback, {exo, []}], Opts),
+ start_link(Port, Callback, Rest, Exo).
+
+start_link(Port, Callback, Options, ExoOptions) ->
+ do_start(Port, Callback, Options, ExoOptions, start_link).
+
+do_start(Port, Callback, Options, ExoOptions, StartF) when StartF==start;
+ StartF==start_link ->
+ ?debug("do_start(~p, ~p, ~p, ~p, ~p)", [Port, Callback, Options, ExoOptions, StartF]),
+ case lists:keymember(ssl, 1, Options) of
+ {_, true} ->
+ start_ssl(Port, Options, ExoOptions);
+ _ ->
+ exo_socket_server:StartF(Port,[tcp],
+ [{active,once},{packet,0},binary,
+ {reuseaddr,true} | ExoOptions],
+ ?MODULE, {Callback, Options})
+ end.
+
+start_ssl(Callback) ->
+ start_ssl(?RPC_PORT, Callback, []).
+
+start_ssl(Port, Callback, Options) ->
+ start_ssl(Port, Callback, Options, []).
+
+start_ssl(Port, Callback, Options, ExoOptions) ->
+ do_start_ssl(Port, Callback, Options, ExoOptions, start).
+
+start_link_ssl(Port, Callback, Options, ExoOptions) ->
+ do_start_ssl(Port, Callback, Options, ExoOptions, start_link).
+
+do_start_ssl(Port, Callback, Options, ExoOptions, StartF) when
+ StartF == start; StartF == start_link ->
+ KeyAndCert = key_and_cert(ExoOptions),
+ Verify = proplists:get_value(verify, ExoOptions, verify_none),
+ Debug = proplists:get_value(debug, ExoOptions, true),
+ exo_socket_server:StartF(Port,[tcp,probe_ssl],
+ KeyAndCert ++
+ [{active,once},{packet,0},binary,
+ {debug, Debug},
+ {verify, Verify}, %% no client cert required
+ {reuseaddr,true} | ExoOptions], ?MODULE, {Callback, Options}).
+
+key_and_cert(Opts) ->
+ Dir = code:priv_dir(rvi_common),
+ [{keyfile, opt(keyfile, Opts, filename:join(Dir, "host.key"))},
+ {certfile, opt(certfile, Opts, filename:join(Dir, "host.cert"))}].
+
+opt(K, Opts, Default) ->
+ case lists:keyfind(K, 1, Opts) of
+ {_, Value} ->
+ Value;
+ false ->
+ Default
+ end.
+
+init(Socket, {Callback, Options}) ->
+ ?debug("init(~p, ~p)", [Socket, {Callback, Options}]),
+ {ok,{IP,Port}} = exo_socket:peername(Socket),
+ ?debug("connection from: ~p : ~p", [IP, Port]),
+ gproc:reg({p,l,msgpack_rpc_server}, {IP,Port, Callback}),
+ case lists:keyfind(gproc, 1, Options) of
+ {_, Reg} ->
+ ?debug("registering with gproc: ~p", [Reg]),
+ gproc:reg(Reg, {IP, Port, Callback});
+ false ->
+ ?debug("not registering name with gproc", []),
+ ok
+ end,
+ MsgPackOpts = opt(msgpack, Options, msgpack_options()),
+ {ok, #state{callback = Callback, msgpack_opts = MsgPackOpts}}.
+
+data(Socket, Data, #state{buf = Buf, msgpack_opts = Opts} = State) ->
+ Buf1 = <<Buf/binary, Data/binary>>,
+ try Dec = msgpack:unpack_stream(Buf1, Opts),
+ ?debug("decoded: ~p", [Dec]),
+ case Dec of
+ {[?TYPE_REQUEST, ID, Method, Args], Rest} ->
+ handle_call_request(Socket, ID, Method, Args, State#state{buf = Rest});
+ {[?TYPE_NOTIFY, Method, Args], Rest} ->
+ handle_notify_request(Socket, Method, Args, State#state{buf = Rest});
+ {error, incomplete} ->
+ {ok, State#state{buf = Buf1}};
+ {error, Reason} ->
+ ?debug("error parsing stream: ~p", [Reason]),
+ {ok, State#state{buf = <<>>}}
+ end
+ catch
+ error:_Error ->
+ ?debug("decode error: ~p", [_Error]),
+ {ok,State}
+ end.
+
+control(_XSocket, _Request, _From, St) ->
+ {reply, {error, unsupported}, St}.
+
+%%
+%% close - retrieve statistics
+%% transport socket SHOULD still be open, but ssl may not handle this!
+%%
+close(Socket, State) ->
+ case exo_socket:getstat(Socket, exo_socket:stats()) of
+ {ok,_Stats} ->
+ ?debug("~w: close, stats=~w", [?MODULE, _Stats]),
+ {ok, State};
+ {error,_Reason} ->
+ ?debug("~w: close, stats error=~w", [?MODULE, _Reason]),
+ {ok, State}
+ end.
+
+error(_Socket,Error,State) ->
+ ?debug("bert_rpc_exec: error = ~p\n", [Error]),
+ {stop, Error, State}.
+
+%%
+%% Internal
+%%
+handle_call_request(Socket, ID, Method, Args,
+ #state{callback = CB,
+ msgpack_opts = Opts} = State) ->
+ try Res = apply(CB, binary_to_existing_atom(Method, latin1), Args),
+ Msg = msgpack:pack([?TYPE_RESPONSE, ID, null, Res], Opts),
+ exo_socket:send(Socket, Msg)
+ catch
+ error:Reason ->
+ ?debug("caught ~s ~p -> error:~p", [Method, Args, Reason]),
+ ReasonStr = lists:flatten(io_lib:fwrite("error:~w", [Reason])),
+ ErrMsg = msgpack:pack([?TYPE_RESPONSE, ID, ReasonStr, null]),
+ exo_socket:send(Socket, ErrMsg)
+ end,
+ {ok, State#state{buf = <<>>}}.
+
+handle_notify_request(_Socket, Method, Args, #state{callback = CB} = State) ->
+ apply(CB, binary_to_existing_atom(Method, latin1), Args),
+ {ok, State}.