diff options
Diffstat (limited to 'components')
21 files changed, 1367 insertions, 336 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index 593f69c..a7bf84a 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -360,6 +360,8 @@ match_svc_([H|T], [H|T1]) -> match_svc_(T, T1); match_svc_(["+"|T], [_|T1]) -> match_svc_(T, T1); +match_svc_([[]], _) -> + true; match_svc_([], _) -> true; match_svc_(_, _) -> @@ -681,7 +683,6 @@ abbrev_jwt({Hdr, Body} = X) -> abbrev_jwt(X) -> X. - abbrev_pl(#cred{} = Payload) -> list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B); ([{_,_}|_]=L) -> abbrev_payload(L); diff --git a/components/dlink/src/dlink_data.erl b/components/dlink/src/dlink_data.erl new file mode 100644 index 0000000..d16d923 --- /dev/null +++ b/components/dlink/src/dlink_data.erl @@ -0,0 +1,93 @@ +-module(dlink_data). + +-export([decode/5, + encode/3]). + +-include_lib("lager/include/log.hrl"). + +decode(Data, F, St, Mod, FragOpts) when is_function(F,1) -> + DecodeRes = case St of + <<>> when Data == <<>> -> + {ok, <<>>}; + <<>> -> do_decode(Data); + Rest when is_binary(Rest) -> + do_decode(<<Rest/binary, Data/binary>>); + Cont when is_function(Cont, 1) -> + Cont(Data) + end, + case DecodeRes of + Cont1 when is_function(Cont1, 1) -> + {ok, Cont1}; + {ok, Rest1} -> + {ok, Rest1}; + {ok, Decoded, Rest1} -> + decoded(Decoded, Rest1, F, Mod, FragOpts); + {error, _} = Err -> + Err + end. + +encode(Msg, PMod, PSt) -> + PMod:encode(Msg, PSt). + +do_decode(Data) -> + case Data of + <<8:4,_:4,_/binary>> -> + %% msgpack map + ?debug("detected msgpack map", []), + msgpack_decode(Data); + <<H, _/binary>> when H==16#de; H==16#df -> + %% msgpack map 16 or map 32 + ?debug("detected msgpack map 16 or map 32", []), + msgpack_decode(Data); + _ -> + ?debug("assuming json", []), + jsx_decode(Data) + end. + +decoded(Decoded, Rest, F, Mod, FragOpts) -> + case rvi_frag:maybe_fragment(Decoded, Mod, FragOpts) of + true -> + {ok, Rest}; + {true, Msg} -> + case do_decode(Msg) of + {ok, DecMsg, <<>>} -> + F(DecMsg), + decode(Rest, F, <<>>, Mod, FragOpts); + {error, _} = Err1 -> + Err1 + end; + false -> + F(Decoded), + decode(Rest, F, <<>>, Mod, FragOpts) + end. + +msgpack_decode(Data) -> + case msgpack:unpack_stream(Data, [jsx]) of + {error, incomplete} -> + fun(NewData) -> + msgpack_decode( + <<Data/binary, NewData/binary>>) + end; + {error, E} -> + {error, E}; + {Decoded, Rest} when is_binary(Rest) -> + {ok, Decoded, Rest} + end. + +jsx_decode(Data) -> + try jsx_decode_res(jsx:decode(Data, [stream, return_tail])) + catch + error:E -> + ?error("jsx decode failed: ~p", [E]), + {error, E} + end. + +jsx_decode_res(Res) -> + case Res of + {incomplete, Cont} -> + fun(NewData) -> + jsx_decode_res(Cont(NewData)) + end; + {with_tail, Decoded, Rest} -> + {ok, Decoded, Rest} + end. diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl index 6a68e48..2442b1c 100644 --- a/components/dlink/src/dlink_data_json.erl +++ b/components/dlink/src/dlink_data_json.erl @@ -6,7 +6,7 @@ port_options/0]). -init(_Opts) -> +init(_) -> []. port_options() -> diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl index 253da55..14139bc 100644 --- a/components/dlink/src/dlink_data_msgpack.erl +++ b/components/dlink/src/dlink_data_msgpack.erl @@ -1,20 +1,21 @@ -module(dlink_data_msgpack). --export([init/1, +-export([init/0, init/1, decode/3, encode/2]). -export([port_options/0]). -record(st, {opts = [{allow_atom, pack}, - {enable_str, true}, jsx], buf = <<>>}). port_options() -> [binary, {packet, 0}]. -init(_CS) -> +init(_) -> init(). + +init() -> #st{}. decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) -> diff --git a/components/dlink_bt/src/bt_connection.erl b/components/dlink_bt/src/bt_connection.erl index bcfa199..399e70d 100644 --- a/components/dlink_bt/src/bt_connection.erl +++ b/components/dlink_bt/src/bt_connection.erl @@ -29,6 +29,7 @@ -export([accept/6]). -export([send/2]). -export([send/3]). +-export([send_data/2]). -export([is_connection_up/1]). -export([is_connection_up/2]). -export([terminate_connection/1]). @@ -47,6 +48,8 @@ mode = bt, packet_mod = ?PACKET_MOD, packet_st = [], + decode_st = <<>>, + frag_opts = [], mod, func, args @@ -74,6 +77,8 @@ accept(Channel, ListenRef, Mode, Mod, Fun, Arg) -> send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). +send(Pid, Data, Opts) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data, Opts}); send(Addr, Channel, Data) -> case bt_connection_manager:find_connection_by_address(Addr, Channel) of {ok, Pid} -> @@ -86,6 +91,9 @@ send(Addr, Channel, Data) -> end. +send_data(Pid, Data) -> + gen_server:cast(Pid, {send_data, Data}). + terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). @@ -137,6 +145,10 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) -> gen_server:cast(self(), connect), {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CS), PktSt = PktMod:init(CS), + DefFragMod = dlink_data_msgpack, + DefFragSt = dlink_data_msgpack:init([]), + {ok, FragOpts} = get_module_config( + frag_opts, [{packet_mod, {DefFragMod, DefFragSt}}], CS), {ok, #st{ remote_addr = bt_addr(Mode, BTAddr), channel = Channel, @@ -144,9 +156,10 @@ init({connect, BTAddr, Channel, Mode, Mod, Fun, CS}) -> mode = Mode, mod = Mod, func = Fun, - args = CS, + args = rvi_common:set_value(role, client, CS), packet_mod = PktMod, - packet_st = PktSt + packet_st = PktSt, + frag_opts = FragOpts }}; @@ -182,7 +195,7 @@ init({accept, Channel, ListenRef, Mode, Mod, Fun, CS}) -> mode = Mode, mod = Mod, func = Fun, - args = CS, + args = rvi_common:set_value(role, server, CS), packet_mod = PktMod, packet_st = PktSt }}. @@ -271,13 +284,28 @@ handle_cast({send, Data}, #st{mode = Mode, ?debug("handle_cast(send): Sending: ~p", [Data]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), ?debug("Encoded = ~p", [Encoded]), - Res = case Mode of - bt -> rfcomm:send(Sock, Encoded); - tcp -> gen_tcp:send(Sock, Encoded) - end, + Res = do_send(Mode, Sock, Encoded), ?debug("send Res = ~p", [Res]), {noreply, St#st{packet_st = PSt1}}; +handle_cast({send, Data, Opts}, #st{mode = Mode, rfcomm_ref = Socket, + packet_mod = PMod, + packet_st = PSt, + frag_opts = FragOpts} = St) -> + ?debug("handle_cast({send, Data, ~p}, ...), FragOpts = ~p", + [Opts, FragOpts]), + {ok, Bin, PSt1} = PMod:encode(Data, PSt), + St1 = St#st{packet_st = PSt1}, + rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, + fun() -> + do_send(Mode, Socket, Bin) + end), + {noreply, St1}; + +handle_cast({send_data, Data}, #st{mode = Mode, rfcomm_ref = Socket} = St) -> + do_send(Mode, Socket, Data), + {noreply, St}; + handle_cast(_Msg, State) -> ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), {noreply, State}. @@ -315,25 +343,24 @@ handle_info({rfcomm, ARef, { accept, BTAddr, _ } }, handle_info({rfcomm, _ConnRef, {data, Data}}, #st { remote_addr = BTAddr, channel = Channel, - packet_mod = PMod, - packet_st = PSt, + decode_st = DSt, + frag_opts = FragOpts, mod = Mod, func = Fun } = State) -> ?debug("~p:handle_info(data): Data: ~p", [ ?MODULE, Data]), ?info("~p:handle_info(data): From: ~p:~p ", [ ?MODULE, BTAddr, Channel]), ?info("~p:handle_info(data): ~p:~p -> ~p:~p", [ ?MODULE, BTAddr, Channel, Mod, Fun]), - case PMod:decode(Data, fun(Elems) -> - handle_elements(Elems, State) - end, PSt) of - {ok, PSt1} -> - {noreply, State#st{packet_st = PSt1}}; + case dlink_data:decode(Data, fun(Elems) -> + got_msg(Elems, State) + end, DSt, ?MODULE, FragOpts) of + {ok, DSt1} -> + {noreply, State#st{decode_st = DSt1}}; {error, Reason} -> - ?error("decode failed: ~p", [Reason]), + ?error("decode failed: Reason = ~p", [Reason]), {stop, Reason, State} end; - handle_info({rfcomm, ConnRef, closed}, #st { remote_addr = BTAddr, channel = Channel, @@ -375,18 +402,21 @@ handle_info({tcp, Sock, Data}, #st{remote_addr = IP, channel = Port, rfcomm_ref = Sock, packet_mod = PMod, - packet_st = PSt} = St) -> - ?debug("handle_info(data): From: ~p:~p", [IP, Port]), - case PMod:decode(Data, fun(Elems) -> - handle_elements(Elems, St) - end, PSt) of - {ok, PSt1} -> + frag_opts = FragOpts, + decode_st = DSt} = St) -> + ?debug("handle_info(Data = ~p): From: ~p:~p", [Data, IP, Port]), + ?debug("PMod = ~p; DSt = ~p", [PMod, DSt]), + case dlink_data:decode(Data, fun(Elems) -> + got_msg(Elems, St) + end, DSt, ?MODULE, FragOpts) of + {ok, DSt1} -> inet:setopts(Sock, [{active, once}]), - {noreply, St#st{packet_st = PSt1}}; + {noreply, St#st{decode_st = DSt1}}; {error, Reason} -> ?error("decode failed, Reason = ~p", [Reason]), {stop, Reason, St} end; + handle_info({inet_async, _L, _Ref, {ok, Sock}} = Msg, #st{mod = Mod, func = Fun, args = Arg} = St) -> @@ -432,14 +462,16 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +do_send(tcp, Sock, Data) -> + gen_tcp:send(Sock, Data); +do_send(bt, Sock, Data) -> + rfcomm:send(Sock, Data). + get_module_config(Key, Default, CS) -> rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS). -handle_elements(Elements, #st{remote_addr = BTAddr, - channel = Channel, - mod = Mod, - func = Fun, - args = Arg}) -> +got_msg(Elements, #st{remote_addr = BTAddr, channel = Channel, + mod = Mod, func = Fun, args = Arg}) -> ?debug("data complete; processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), BTAddr, Channel, data, Elements, Arg). diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index 75e7926..2fc6587 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -45,7 +45,7 @@ -define(CONNECTION_TABLE, rvi_dlink_bt_connections). -define(SERVICE_TABLE, rvi_dlink_bt_services). --define(DLINK_BT_VER, "1.0"). +-define(DLINK_BT_VER, <<"1.0">>). %% Multiple registrations of the same service, each with a different connection, %% is possible. @@ -579,7 +579,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> { reply, [ Res ], St }; -handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) -> +handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) -> %% Resolve connection pid from service case get_connections_by_service(Service) of @@ -595,7 +595,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, { ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) }, { ?DLINK_ARG_DATA, Data } - ]), + ], DataLinkOpts), { reply, [ Res ], St} end; @@ -652,7 +652,7 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> {Address, Channel} = - case Mode = get_mode(CompSpec) of + case get_mode(CompSpec) of bt -> {ok,[{address, Addr}]} = bt_drv:local_info([address]), {bt_address_to_string(Addr), SetupChannel}; diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 7229b3c..1d4753e 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -26,13 +26,15 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([setup/6]). +-export([setup/7]). -export([send/2]). -export([send/3]). +-export([send_data/2]). -export([is_connection_up/1]). -export([is_connection_up/2]). -export([terminate_connection/1]). -export([terminate_connection/2]). +-export([get_source_address/1]). -define(SERVER, ?MODULE). @@ -47,6 +49,8 @@ args = undefined, packet_mod = ?PACKET_MOD, packet_st = [], + decode_st = <<>>, + frag_opts = [], cs }). @@ -55,9 +59,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, CS) -> +setup(Role, IP, Port, Sock, Mod, Fun, CS) when Role==client; Role==server -> ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of + case gen_server:start_link(connection, {Role, IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -70,6 +74,8 @@ setup(IP, Port, Sock, Mod, Fun, CS) -> send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). +send(Pid, Data, Opts) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data, Opts}); send(IP, Port, Data) -> case connection_manager:find_connection_by_address(IP, Port) of {ok, Pid} -> @@ -82,6 +88,9 @@ send(IP, Port, Data) -> end. +send_data(Pid, Data) -> + gen_server:cast(Pid, {send_data, Data}). + terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). @@ -106,6 +115,9 @@ is_connection_up(IP, Port) -> false end. +get_source_address(Pid) -> + gen_server:call(Pid, get_source_address). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -124,11 +136,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, CompSpec}) -> - case IP of - undefined -> ok; - _ -> connection_manager:add_connection(IP, Port, self()) - end, +init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): self(): ~p", [self()]), ?debug("connection:init(): IP: ~p", [IP]), ?debug("connection:init(): Port: ~p", [Port]), @@ -137,6 +145,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> ?debug("connection:init(): Function: ~p", [Fun]), {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), PktSt = PktMod:init(CompSpec), + {ok, FragOpts} = get_module_config( + frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec), {ok, #st{ ip = IP, port = Port, @@ -145,7 +155,9 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> func = Fun, packet_mod = PktMod, packet_st = PktSt, - cs = CompSpec + frag_opts = FragOpts, + cs = rvi_common:save_source_address( + Role, Sock, rvi_common:set_value(role, Role, CompSpec)) }}. get_module_config(Key, Default, CS) -> @@ -166,8 +178,9 @@ get_module_config(Key, Default, CS) -> %% @end %%-------------------------------------------------------------------- - -handle_call(terminate_connection, _From, St) -> +handle_call(get_source_address, _, #st{cs = CS} = St) -> + {reply, rvi_common:get_source_address(CS), St}; +handle_call(terminate_connection, _From, St) -> ?debug("~p:handle_call(terminate_connection): Terminating: ~p", [ ?MODULE, {St#st.ip, St#st.port}]), @@ -193,10 +206,25 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), - ?debug("Encoded = ~p", [Encoded]), gen_tcp:send(St#st.sock, Encoded), {noreply, St#st{packet_st = PSt1}}; +handle_cast({send, Data, Opts}, #st{sock = Socket, + packet_mod = PMod, + packet_st = PSt, + frag_opts = FragOpts} = St) -> + ?debug("handle_cast({send, Data, ~p, ...), FragOpts = ~p", + [Opts, FragOpts]), + {ok, Bin, PSt1} = PMod:encode(Data, PSt), + St1 = St#st{packet_st = PSt1}, + rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, + fun() -> + gen_tcp:send(Socket, Bin) + end), + {noreply, St1}; +handle_cast({send_data, Data}, #st{sock = Sock} = St) -> + gen_tcp:send(Sock, Data), + {noreply, St}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -230,18 +258,31 @@ handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, packet_mod = PMod, - packet_st = PSt} = State) -> - ?debug("handle_info(data, PMod=~p): From: ~p:~p ", [PMod, IP, Port]), - case PMod:decode(Data, fun(Elems) -> - handle_elements(Elems, State) - end, PSt) of - {ok, PSt1} -> + packet_st = PSt, + decode_st = DSt, + frag_opts = FragOpts} = State) -> + ?debug("handle_info(~p, PMod=~p, St=~p): From: ~p:~p ", + [Data, PMod, PSt, IP, Port]), + case dlink_data:decode(Data, fun(Elems) -> + got_msg(Elems, State) + end, DSt, ?MODULE, FragOpts) of + {ok, DSt1} = Ok -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st{packet_st = PSt1}}; + {noreply, State#st{decode_st = DSt1}}; {error, Reason} -> ?error("decode failed, Reason = ~p", [Reason]), {stop, Reason, State} end; + %% case PMod:decode(Data, fun(Elems) -> + %% handle_elements(Elems, State) + %% end, PSt) of + %% {ok, PSt1} -> + %% inet:setopts(Sock, [{active, once}]), + %% {noreply, State#st{packet_st = PSt1}}; + %% {error, Reason} -> + %% ?error("decode failed, Reason = ~p", [Reason]), + %% {stop, Reason, State} + %% end; handle_info({tcp_closed, Sock}, #st { ip = IP, @@ -331,8 +372,24 @@ code_change(_OldVsn, State, _Extra) -> %% {ok, PSt1}; %% { -> -handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, - ip = IP, port = Port}) -> +%% handle_elements(Elements, #st{frag_opts = FragOpts} = St) -> +%% MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts), +%% ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]), +%% case MaybeF of +%% true -> +%% %% It was a fragment, but not a complete message yet +%% St; +%% {true, Msg} -> +%% #st{packet_mod = PMod, packet_st = PSt} = St, +%% PMod:decode(Msg, fun(Elems) -> +%% got_msg(Elems, St) +%% end, PSt); +%% false -> +%% got_msg(Elements, St) +%% end. + +got_msg(Elements, #st{ip = IP, port = Port, + mod = Mod, func = Fun, cs = CS}) -> ?debug("data complete: Processed: ~p", [authorize_keys:abbrev(Elements)]), Mod:Fun(self(), IP, Port, data, Elements, CS). diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 330985a..78de04a 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -44,7 +44,7 @@ -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.1"). +-define(DLINK_TCP_VERSION, <<"1.1">>). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -208,11 +208,12 @@ connect_remote(IP, Port, CompSpec) -> [IP, Port]), %% Setup a genserver around the new connection. - {ok, Pid } = connection:setup(IP, Port, Sock, + {ok, Pid } = connection:setup(client, IP, Port, Sock, ?MODULE, handle_socket, CompSpec ), %% Send authorize send_authorize(Pid, CompSpec), + connection_manager:add_connection(IP, Port, Pid), ok; {error, Err } -> @@ -283,6 +284,7 @@ handle_socket_(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), + ?debug("CompSpec = ~p", [CompSpec]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -538,7 +540,7 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> { reply, [ Res ], St }; -handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, St) -> +handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}, _From, St) -> %% Resolve connection pid from service case get_connections_by_service(Service) of @@ -552,7 +554,7 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, { ?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1) }, { ?DLINK_ARG_DATA, Data } - ]), + ], DataLinkOpts), { reply, [ Res ], St} end; @@ -569,7 +571,7 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> case connection:is_connection_up(Pid) of true -> ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]), - connection:send(Pid, jsx:encode([{?DLINK_ARG_CMD, ?DLINK_CMD_PING}])), + connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); @@ -659,22 +661,53 @@ availability_msg(Availability, Services, CompSpec) -> status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. +bin(S) -> + iolist_to_binary(S). + process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, ProtoVersion, Credentials, CompSpec) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]), - {NRemoteAddress, NRemotePort} = Conn = - case { RemoteAddress, RemotePort } of - { <<"0.0.0.0">>, 0 } -> - ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", - [ PeerIP, PeerPort]), - { PeerIP, PeerPort }; - _ -> { RemoteAddress, RemotePort} - end, + F = fun() -> + process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, ProtoVersion, Credentials, CompSpec) + end, + case connection_manager:find_connection_by_address(PeerIP, PeerPort) of + not_found -> F(); + BPid -> + deconflict_conns(FromPid, BPid, CompSpec, F) + end. +deconflict_conns(APid, BPid, CsA, F) -> + {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA), + case connection:get_source_address(BPid) of + undefined -> + ?debug("Deconflict - BSrc = undefined, kill BPid (~p)", [BPid]), + exit(BPid, deconflict), + F(); + {_,_} = BSrc when BSrc > ASrc -> + ?debug("Deconflict - kill BPid (~p): ASrc = ~p, BSrc = ~p", [BPid, ASrc, BSrc]), + exit(BPid, deconflict), + F(); + BSrc -> + ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]), + exit(deconflict) + end. + + +process_authorize_(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, + _ProtoVersion, Credentials, CompSpec) -> + {NRemoteAddress, NRemotePort} = Conn = + case { RemoteAddress, RemotePort } of + { "0.0.0.0", 0 } -> + ?info("dlink_tcp:authorize(): Remote is behind firewall. Will use ~p:~p", + [ PeerIP, PeerPort]), + { PeerIP, PeerPort }; + _ -> { RemoteAddress, RemotePort} + end, log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), authorize_rpc:store_creds(CompSpec, Credentials, Conn), connection_authorized(FromPid, Conn, CompSpec). @@ -683,7 +716,7 @@ send_authorize(Pid, CompSpec) -> {LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_ADDRESS, bin(LocalIP) }, { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) }, { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } diff --git a/components/dlink_tcp/src/gen_nb_server.erl b/components/dlink_tcp/src/gen_nb_server.erl index ae0a605..16693ae 100644 --- a/components/dlink_tcp/src/gen_nb_server.erl +++ b/components/dlink_tcp/src/gen_nb_server.erl @@ -90,7 +90,8 @@ store_cb_state(CBState, State) when is_record(State, state) -> %% @doc Adds a new listener socket to be managed by gen_nb_server %% NOTE: Should only be called by the submodule -spec add_listen_socket({string(), integer()}, #state{}) -> {ok, #state{}} | {error, any()}. -add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) -> +add_listen_socket({IpAddr0, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}=State) -> + IpAddr = str(IpAddr0), Key = {IpAddr, Port}, case dict:find(Key, Socks) of {ok, _} -> @@ -108,7 +109,8 @@ add_listen_socket({IpAddr, Port}, #state{cb=Callback, addrs=Addrs, socks=Socks}= %% @doc Removes a new listener socket to be managed by gen_nb_server %% NOTE: Should only be called by the submodule -spec remove_listen_socket({string(), integer()}, #state{}) -> {error, not_listening} | {ok, #state{}}. -remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) -> +remove_listen_socket({IpAddr0, Port}, #state{socks=Socks, addrs=Addrs}=State) -> + IpAddr = str(IpAddr0), Key = {IpAddr, Port}, case dict:find(Key, Socks) of error -> @@ -119,6 +121,11 @@ remove_listen_socket({IpAddr, Port}, #state{socks=Socks, addrs=Addrs}=State) -> addrs=dict:erase(Sock, Addrs)}} end. +str(Addr) when is_list(Addr) -> + Addr; +str(Addr) when is_binary(Addr) -> + binary_to_list(Addr). + %% @doc Returns the callback module's state -spec init([atom()|any()]) -> {ok, #state{}} | {error, bad_init_state} | {error, any()}. init([CallbackModule, InitParams]) -> diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 6def59a..88c3d23 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -101,7 +101,7 @@ new_connection(IP, Port, Sock, State) -> %% IP and Port are garbage. We'll grab peername when we get our %% first data. %% Provide component spec as extra arg. - {ok, _P} = connection:setup(undefined, 0, Sock, + {ok, _P} = connection:setup(server, undefined, 0, Sock, dlink_tcp_rpc, handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. diff --git a/components/dlink_tls/src/dlink_tls_conn.erl b/components/dlink_tls/src/dlink_tls_conn.erl index c051afc..447581d 100644 --- a/components/dlink_tls/src/dlink_tls_conn.erl +++ b/components/dlink_tls/src/dlink_tls_conn.erl @@ -27,11 +27,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([setup/6]). --export([upgrade/3, - async_upgrade/3]). +-export([setup/7]). +-export([upgrade/2, + async_upgrade/2]). -export([send/2]). -export([send/3]). +-export([send_data/2]). -export([is_connection_up/1]). -export([is_connection_up/2]). -export([terminate_connection/1]). @@ -40,7 +41,6 @@ -define(SERVER, ?MODULE). -define(PACKET_MOD, dlink_data_msgpack). --define(MAX_MSG_SIZE, infinity). -record(st, { ip = {0,0,0,0}, @@ -49,11 +49,11 @@ mode = tcp :: tcp | tls, packet_mod = ?PACKET_MOD, packet_st = [], + frag_opts = [], mod = undefined, func = undefined, cs, - role = server :: client | server, - msg_size = ?MAX_MSG_SIZE :: infinity | pos_integer() + role = server :: client | server }). %%%=================================================================== @@ -61,8 +61,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, CompSpec) -> - Params = {IP, Port, Sock, Mod, Fun, CompSpec}, +setup(Role, IP, Port, Sock, Mod, Fun, CompSpec) when Role==client; + Role==server -> + Params = {Role, IP, Port, Sock, Mod, Fun, CompSpec}, ?debug("setup() IP = ~p; Port = ~p; Mod = ~p; Fun = ~p", [IP, Port, Mod, Fun]), ?debug("CompSpec = ~p", [CompSpec]), case gen_server:start_link(?MODULE, Params ,[]) of @@ -75,16 +76,18 @@ setup(IP, Port, Sock, Mod, Fun, CompSpec) -> Err end. -upgrade(Pid, Role, CompSpec) when Role==client; Role==server -> - gen_server:call(Pid, {upgrade, Role, CompSpec}). +upgrade(Pid, Role) when Role==client; Role==server -> + gen_server:call(Pid, {upgrade, Role}). -async_upgrade(Pid, Role, CompSpec) when Role==client; - Role==server -> - gen_server:cast(Pid, {upgrade, Role, CompSpec}). +async_upgrade(Pid, Role) when Role==client; + Role==server -> + gen_server:cast(Pid, {upgrade, Role}). send(Pid, Data) when is_pid(Pid) -> gen_server:cast(Pid, {send, Data}). +send(Pid, Data, Opts) when is_pid(Pid) -> + gen_server:cast(Pid, {send, Data, Opts}); send(IP, Port, Data) -> case dlink_tls_connmgr:find_connection_by_address(IP, Port) of {ok, Pid} -> @@ -97,6 +100,9 @@ send(IP, Port, Data) -> end. +send_data(Pid, Data) -> + gen_server:cast(Pid, {send_data, Data}). + terminate_connection(Pid) when is_pid(Pid) -> gen_server:call(Pid, terminate_connection). @@ -139,7 +145,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, CompSpec}) -> +init({Role, IP, Port, Sock, Mod, Fun, CompSpec}) -> case IP of undefined -> ok; _ -> dlink_tls_connmgr:add_connection(IP, Port, self()) @@ -153,6 +159,8 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), ?debug("packet_mod = ~p", [PktMod]), PktSt = PktMod:init(CompSpec), + {ok, FragOpts} = get_module_config( + frag_opts, [{packet_mod, {PktMod, PktSt}}], CompSpec), {ok, #st{ ip = IP, port = Port, @@ -160,12 +168,27 @@ init({IP, Port, Sock, Mod, Fun, CompSpec}) -> mod = Mod, packet_mod = PktMod, packet_st = PktSt, + frag_opts = FragOpts, func = Fun, - cs = CompSpec + cs = rvi_common:set_value(role, Role, CompSpec) }}. get_module_config(Key, Default, CS) -> - rvi_common:get_module_config(data_link, dlink_tls_rpc, Key, Default, CS). + ModConf = fun() -> + rvi_common:get_module_config( + data_link, dlink_tls_rpc, Key, Default, CS) + end, + case rvi_common:get_value(tls_opts, undefined, CS) of + undefined -> ModConf(); + Opts -> + case lists:keyfind(Key, 1, Opts) of + false -> + ModConf(); + {_, Val} -> + Val + end + end. + %%-------------------------------------------------------------------- %% @private @@ -183,18 +206,18 @@ get_module_config(Key, Default, CS) -> %%-------------------------------------------------------------------- -handle_call(terminate_connection, _From, St) -> +handle_call(terminate_connection, _From, #st{} = St) -> ?debug("~p:handle_call(terminate_connection): Terminating: ~p", [ ?MODULE, {St#st.ip, St#st.port}]), {stop, Reason, NSt} = handle_info({tcp_closed, St#st.sock}, St), {stop, Reason, ok, NSt}; -handle_call({upgrade, Role, CompSpec} = Req, _From, #st{sock = S} = St) -> +handle_call({upgrade, Role} = Req, _From, #st{cs = CS} = St) -> ?debug("~p:handle_call(~p)~n", [?MODULE, Req]), %% deliberately crash (for now) if upgrade fails. - {Reply, St1} = handle_upgrade(Role, CompSpec, St), + {Reply, #st{} = St1} = handle_upgrade(Role, CS, St), {reply, Reply, St1}; -handle_call(_Request, _From, State) -> +handle_call(_Request, _From, #st{} = State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), Reply = ok, {reply, Reply, State}. @@ -209,11 +232,11 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({upgrade, Role, CompSpec}, St) -> - {_, St1} = handle_upgrade(Role, CompSpec, St), +handle_cast({upgrade, Role}, #st{cs = CS} = St) -> + {_, #st{} = St1} = handle_upgrade(Role, CS, St), {noreply, St1}; handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> - ?debug("~p:handle_call(send): Sending: ~p", + ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, abbrev(Data)]), {ok, Encoded, PSt1} = PMod:encode(Data, PSt), ?debug("Encoded~n~s", [Encoded]), @@ -222,15 +245,29 @@ handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> tls -> ssl:send(St#st.sock, Encoded) end, {noreply, St#st{packet_st = PSt1}}; - -handle_cast({activate_socket, Sock}, State) -> +handle_cast({send, Data, Opts} = Req, #st{packet_mod = PMod, + packet_st = PSt, + frag_opts = FragOpts} = St) -> + ?debug("handle_cast(~p, ...), FragOpts = ~p", [Req, FragOpts]), + {ok, Bin, PSt1} = PMod:encode(Data, PSt), + St1 = St#st{packet_st = PSt1}, + rvi_frag:send(Bin, Opts ++ FragOpts, ?MODULE, fun() -> + do_send(Bin, St1) + end), + {noreply, St1}; +handle_cast({send_data, Data}, #st{} = St) -> + %% don't encode; just send + ?debug("send_data, ~w", [authorize_keys:abbrev_bin(Data)]), + do_send(Data, St), + {noreply, St}; +handle_cast({activate_socket, Sock}, #st{} = State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), {noreply, State}; -handle_cast(_Msg, State) -> - ?warning("~p:handle_cast(): Unknown call: ~p", [ ?MODULE, _Msg]), +handle_cast(_Msg, #st{} = State) -> + ?warning("~p:handle_cast(): Unknown cast: ~p~nSt=~p", [ ?MODULE, _Msg, State]), {noreply, State}. %%-------------------------------------------------------------------- @@ -255,6 +292,7 @@ handle_info({ssl, Sock, Data}, #st{ip = IP, port = Port, packet_mod = PMod, packet_st = PSt} = State) -> ?debug("handle_info(data): Data: ~p", [abbrev(Data)]), ?debug("handle_info(data): From: ~p:~p ", [ IP, Port]), + ?debug("handle_info(data): PMod: ~p", [PMod]), case PMod:decode(Data, fun(Elems) -> handle_elems(Elems, State) end, PSt) of @@ -283,15 +321,18 @@ handle_info({tcp, Sock, Data}, {stop, Reason, State} end; -handle_info({tcp_closed, Sock}, +handle_info({Evt, Sock}, #st { ip = IP, port = Port, mod = Mod, func = Fun, - cs = CS} = State) -> - ?debug("~p:handle_info(tcp_closed): Address: ~p:~p ", [ ?MODULE, IP, Port]), + cs = CS} = State) when Evt==tcp_closed; Evt==ssl_closed -> + ?debug("~p:handle_info(~w): Address: ~p:~p ", [ ?MODULE, Evt, IP, Port]), Mod:Fun(self(), IP, Port,closed, CS), - gen_tcp:close(Sock), + case Evt of + tcp_closed -> gen_tcp:close(Sock); + ssl_closed -> ssl:close(Sock) + end, dlink_tls_connmgr:delete_connection_by_pid(self()), {stop, normal, State}; @@ -308,7 +349,7 @@ handle_info({tcp_error, _Sock}, {stop, normal, State}; -handle_info(_Info, State) -> +handle_info(_Info, #st{} = State) -> ?warning("~p:handle_cast(): Unknown info: ~p", [ ?MODULE, _Info]), {noreply, State}. @@ -341,15 +382,20 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +do_send(Bin, #st{sock = Sock, mode = tcp}) -> + gen_tcp:send(Sock, Bin); +do_send(Bin, #st{sock = Sock, mode = tls}) -> + ssl:send(Sock, Bin). + handle_upgrade(Role, CompSpec, #st{sock = S} = St) -> - %% {ok, [{active, Last}]} = inet:getopts(S, [active]), inet:setopts(S, [{active, false}]), case do_upgrade(S, Role, CompSpec) of - {ok, NewS} -> + {DoVerify, {ok, NewS}} -> ?debug("upgrade to TLS succcessful~n", []), ssl:setopts(NewS, [{active, once}]), {ok, {IP, Port}} = ssl:peername(NewS), - {ok, PeerCert} = ssl:peercert(NewS), + PeerCert = get_peercert(DoVerify, NewS), ?debug("SSL PeerCert=~w", [abbrev(PeerCert)]), NewCS = rvi_common:set_value( dlink_tls_role, Role, @@ -357,35 +403,73 @@ handle_upgrade(Role, CompSpec, #st{sock = S} = St) -> {ok, St#st{sock = NewS, mode = tls, role = Role, ip = inet_parse:ntoa(IP), port = Port, cs = NewCS}}; - Error -> + {_, Error} -> ?error("Cannot upgrade to TLS: ~p~n", [Error]), error({cannot_upgrade, Error}) end. +get_peercert(DoVerify, S) -> + case ssl:peercert(S) of + {ok, PeerCert} -> + PeerCert; + {error, _} when DoVerify == false -> + undefined + end. + do_upgrade(Sock, client, CompSpec) -> - Opts = tls_opts(client, CompSpec), + {DoVerify, Opts} = tls_opts(client, CompSpec), ?debug("TLS Opts = ~p", [Opts]), - ssl:connect(Sock, Opts); + {DoVerify, ssl:connect(Sock, Opts)}; do_upgrade(Sock, server, CompSpec) -> - Opts = tls_opts(client, CompSpec), + {DoVerify, Opts} = tls_opts(client, CompSpec), ?debug("TLS Opts = ~p", [Opts]), - ssl:ssl_accept(Sock, Opts). - -%% FIXME: For now, use the example certs delivered with the OTP SSL appl. -tls_opts(Role, _CompSpec) -> - {ok, DevCert} = setup:get_env(rvi_core, device_cert), - {ok, DevKey} = setup:get_env(rvi_core, device_key), - {ok, CACert} = setup:get_env(rvi_core, root_cert), - [ - {verify, verify_peer}, - {certfile, DevCert}, - {keyfile, DevKey}, - {cacertfile, CACert}, - {verify_fun, {fun verify_fun/3, public_root_key()}}, - {partial_chain, fun(X) -> - partial_chain(Role, X) - end} - ]. + {DoVerify, ssl:ssl_accept(Sock, Opts)}. + +tls_opts(Role, CompSpec) -> + TlsOpts = rvi_common:get_value(tls_opts, [], CompSpec), + Opt = fun(K) -> opt(K, TlsOpts, + fun() -> + ok(setup:get_env(rvi_core, K)) + end) + end, + case VOpt = lists:keyfind(verify, 1, TlsOpts) of + {verify, false} when Role == server -> + {false, [ + {verify, verify_none}, + {certfile, Opt(device_cert)}, + {keyfile, Opt(device_key)}, + {cacertfile, Opt(root_cert)} + ]}; + {verify, false} -> + {false, [ + {verify, verify_none} + ]}; + _ when VOpt==false; VOpt == {verify, true} -> % {verify,true} default + {true, [ + {verify, verify_peer}, + {certfile, Opt(device_cert)}, + {keyfile, Opt(device_key)}, + {cacertfile, Opt(root_cert)}, + {verify_fun, opt(verify_fun, TlsOpts, + {fun verify_fun/3, public_root_key()})}, + {partial_chain, opt(partial_chain, TlsOpts, + fun(X) -> + partial_chain(Role, X) + end)} + ]} + end. + +opt(Key, Opts, Def) -> + case lists:keyfind(Key, 1, Opts) of + false when is_function(Def, 0) -> Def(); + false -> Def; + {_, V} -> V + end. + +ok({ok, V}) -> + V; +ok(Other) -> + error({badmatch, Other}). public_root_key() -> authorize_keys:provisioning_key(). @@ -425,11 +509,26 @@ partial_chain(_, Certs) -> ?debug("partial_chain: ~p", [[lager:pr(Dec) || Dec <- Decoded]]), {trusted_ca, hd(Certs)}. -handle_elems(Elements, #st{mod = Mod, func = Fun, cs = CS, - ip = IP, port = Port}) -> +handle_elems(Elements, #st{frag_opts = FragOpts} = St) -> + MaybeF = rvi_frag:maybe_fragment(Elements, ?MODULE, FragOpts), + ?debug("maybe_fragment(~p) -> ~p", [Elements, MaybeF]), + case MaybeF of + true -> + %% It was a fragment, but not a complete message yet + St; + {true, Msg} -> + #st{packet_mod = PMod, packet_st = PSt} = St, + PMod:decode(Msg, fun(Elems) -> + got_msg(Elems, St) + end, PSt); + false -> + got_msg(Elements, St) + end. + +got_msg(Elements, #st{ip = IP, port = Port, mod = Mod, func = Fun, cs = CS} = St) -> ?debug("handle_info(data complete): Processed: ~p", [abbrev(Elements)]), Mod:Fun(self(), IP, Port, data, Elements, CS), - ok. + St. verify_cert_sig(#'OTPCertificate'{tbsCertificate = TBS, signature = Sig}, PubKey) -> diff --git a/components/dlink_tls/src/dlink_tls_listener.erl b/components/dlink_tls/src/dlink_tls_listener.erl index 7d3f45e..82e6b5a 100644 --- a/components/dlink_tls/src/dlink_tls_listener.erl +++ b/components/dlink_tls/src/dlink_tls_listener.erl @@ -14,7 +14,7 @@ -include_lib("lager/include/log.hrl"). -export([start_link/0, - add_listener/3, + add_listener/4, remove_listener/2]). -export([init/2, handle_call/3, handle_cast/2, handle_info/2]). @@ -28,8 +28,8 @@ start_link() -> create_tabs(), gen_nb_server:start_link({local, ?MODULE}, ?MODULE, []). -add_listener(IpAddr, Port, CompSpec) -> - gen_server:call(?MODULE, {add_listener, IpAddr, Port, CompSpec}). +add_listener(IpAddr, Port, Opts, CompSpec) -> + gen_server:call(?MODULE, {add_listener, IpAddr, Port, Opts, CompSpec}). remove_listener(IpAddr, Port) -> gen_server:call(?MODULE, {remove_listener, IpAddr, Port}). @@ -37,10 +37,11 @@ remove_listener(IpAddr, Port) -> init([], State) -> State1 = lists:foldl( - fun({{_,_}} = Addr, Acc) -> + fun({{{_,_} = Addr, Opts}}, Acc) -> + ?debug("Addr = ~p", [Addr]), case gen_nb_server:add_listen_socket(Addr, Acc) of {ok, Acc1} -> - ets_insert(?TAB, {Addr}), + ets_insert(?TAB, {Addr, Opts}), Acc1; _Error -> ets_delete(?TAB, Addr), @@ -60,11 +61,12 @@ create_tabs() -> ?TAB end. -handle_call({add_listener, IpAddr, Port, CompSpec}, _From, State) -> +handle_call({add_listener, IpAddr, Port, Opts, CompSpec}, _From, State) -> + ?debug("add_listener: IpAddr=~p, Port=~p", [IpAddr, Port]), ets_insert(?TAB, {cs, CompSpec}), case gen_nb_server:add_listen_socket({IpAddr, Port}, State) of {ok, State1} -> - ets_insert(?TAB, {{IpAddr, Port}}), + ets_insert(?TAB, {{IpAddr, Port}, Opts}), {reply, ok, gen_nb_server:store_cb_state( CompSpec, State1 )}; Error -> @@ -104,17 +106,22 @@ new_connection(IP, Port, Sock, State) -> %% first data. %% Provide component spec as extra arg. CompSpec = gen_nb_server:get_cb_state(State), + [{_, Opts}] = ets_lookup(?TAB, {IP, Port}), + CS = rvi_common:set_value(tls_opts, Opts, CompSpec), {ok, P} = dlink_tls_conn:setup( - undefined, 0, Sock, + server, undefined, 0, Sock, dlink_tls_rpc, - handle_socket, CompSpec), - dlink_tls_conn:async_upgrade(P, server, CompSpec), + handle_socket, CS), + dlink_tls_conn:async_upgrade(P, server), {ok, State}. ets_insert(Tab, Obj) -> ets:insert(Tab, Obj). +ets_lookup(Tab, Key) -> + ets:lookup(Tab, Key). + ets_delete(Tab, Key) -> ets:delete(Tab, Key). diff --git a/components/dlink_tls/src/dlink_tls_rpc.erl b/components/dlink_tls/src/dlink_tls_rpc.erl index 14d580a..632008a 100644 --- a/components/dlink_tls/src/dlink_tls_rpc.erl +++ b/components/dlink_tls/src/dlink_tls_rpc.erl @@ -121,11 +121,33 @@ start_connection_manager() -> setup_initial_listeners([], _CompSpec) -> ?debug("no initial listeners", []); setup_initial_listeners([_|_] = TlsOpts, CompSpec) -> + case lists:keytake(ports, 1, TlsOpts) of + {value, {_, Ports}, Rest} -> + setup_initial_listeners_(Rest, CompSpec), + [setup_initial_listeners_( + [{port,P}|inherit_opts([ip], TlsOpts, POpts)], CompSpec) + || {P, POpts} <- Ports]; + false -> + setup_initial_listeners_(TlsOpts, CompSpec) + end. + +inherit_opts(Keys, From, To) -> + Pick = [{K,V} || {K, V} <- From, + lists:member(K, Keys), + not lists:keymember(K, 1, To)], + Pick ++ To. + +setup_initial_listeners_([], _CompSpec) -> + ok; +setup_initial_listeners_([_|_] = TlsOpts, CompSpec) -> IP = proplists:get_value(ip, TlsOpts, ?DEFAULT_TCP_ADDRESS), Port = proplists:get_value(port, TlsOpts, ?DEFAULT_TCP_PORT), + setup_listener(IP, Port, TlsOpts, CompSpec). + +setup_listener(IP, Port, Opts, CompSpec) -> %% Add listener port. ?info("dlink_tls:init_rvi_component(): Adding listener ~p:~p", [ IP, Port ]), - case dlink_tls_listener:add_listener(IP, Port, CompSpec) of + case dlink_tls_listener:add_listener(IP, Port, Opts, CompSpec) of ok -> ?notice("---- RVI Node External Address: ~s", [ application:get_env(rvi_core, node_address, undefined)]); @@ -204,15 +226,16 @@ connect_remote(IP, Port, CompSpec) -> ?info("dlink_tls:connect_remote(): Connecting ~p:~p (TO=~p", [IP, Port, Timeout]), log("new connection", [], CompSpec), - case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), Timeout) of + case gen_tcp:connect(IP, Port, dlink_tls_listener:sock_opts(), + Timeout) of { ok, Sock } -> ?info("dlink_tls:connect_remote(): Connected ~p:~p", [IP, Port]), %% Setup a genserver around the new connection. - {ok, Pid } = dlink_tls_conn:setup(IP, Port, Sock, + {ok, Pid } = dlink_tls_conn:setup(client, IP, Port, Sock, ?MODULE, handle_socket, CompSpec), - try dlink_tls_conn:upgrade(Pid, client, CompSpec) of + try dlink_tls_conn:upgrade(Pid, client) of ok -> ?debug("Upgrade result = ~p", [ok]), %% Send authorize @@ -520,8 +543,9 @@ handle_call({rvi, disconnect_data_link, [NetworkAddress] }, _From, St) -> { reply, [ Res ], St }; -handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, +handle_call({rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts] = Args}, _From, #st{tid = Tid} = St) -> + ?debug("send_data: Args = ~p", [Args]), %% Resolve connection pid from service case get_connections_by_service(Service) of [] -> @@ -533,7 +557,8 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, ConnPid, [{?DLINK_ARG_TRANSACTION_ID, Tid}, {?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE}, {?DLINK_ARG_MODULE, atom_to_binary(ProtoMod, latin1)}, - {?DLINK_ARG_DATA, Data}]), + {?DLINK_ARG_DATA, Data}], + DataLinkOpts), {reply, [Res], St#st{tid = Tid + 1}} end; @@ -685,10 +710,13 @@ send_authorize(Pid, CompSpec) -> dlink_tls_conn:send(Pid, rvi_common:pass_log_id( [{?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE}, {?DLINK_ARG_VERSION, ?DLINK_TLS_VERSION}, - {?DLINK_ARG_ADDRESS, LocalIP}, + {?DLINK_ARG_ADDRESS, bin(LocalIP)}, {?DLINK_ARG_PORT, LocalPort}, {?DLINK_ARG_CREDENTIALS, Creds}], CompSpec)). +bin(S) -> + iolist_to_binary(S). + connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered %% with the connection manager, this is an incoming connection @@ -780,15 +808,6 @@ get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). -%% get_authorize_jwt(CompSpec) -> -%% case authorize_rpc:get_authorize_jwt(CompSpec) of -%% [ok, JWT] -> -%% JWT; -%% [not_found] -> -%% ?error("No authorize JWT~n", []), -%% error(cannot_authorize) -%% end. - get_credentials(CompSpec) -> case authorize_rpc:get_credentials(CompSpec) of [ok, Creds] -> @@ -798,17 +817,6 @@ get_credentials(CompSpec) -> error(no_credentials_found) end. -%% validate_auth_jwt(JWT, Conn, CompSpec) -> -%% case authorize_rpc:validate_authorization(CompSpec, JWT, Conn) of -%% [ok] -> -%% true; -%% [not_found] -> -%% false -%% end. - -%% term_to_json(Term) -> -%% binary_to_list(iolist_to_binary(exo_json:encode(Term))). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; diff --git a/components/proto_json/src/proto_json_rpc.erl b/components/proto_json/src/proto_json_rpc.erl index 9f7ccc0..9f10ee3 100644 --- a/components/proto_json/src/proto_json_rpc.erl +++ b/components/proto_json/src/proto_json_rpc.erl @@ -135,18 +135,10 @@ handle_call({rvi, send_message, { <<"timeout">>, Timeout }, { <<"parameters">>, Parameters } ]), - - case use_frag(Parameters, DataLinkOpts) of - {true, Window} -> - {Res, St1} = - chunk_message(Window, TID, ServiceName, DataLinkMod, - DataLinkOpts, iolist_to_binary(Data), St), - {reply, Res, St1}; - false -> - Res = DataLinkMod:send_data( - St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data), - {reply, Res, St} - end; + RviOpts = rvi_common:rvi_options(Parameters), + Res = DataLinkMod:send_data( + St#st.cs, ?MODULE, ServiceName, RviOpts ++ DataLinkOpts, Data), + {reply, Res, St}; handle_call(Other, _From, St) -> ?warning("proto_json_rpc:handle_call(~p): unknown", [ Other ]), @@ -157,27 +149,19 @@ handle_cast({rvi, receive_message, [Payload, IP, Port | _LogId]} = Msg, St) -> ?debug("~p:handle_cast(~p)", [?MODULE, Msg]), Elems = jsx:decode(iolist_to_binary(Payload)), - case Elems of - [{<<"frg">>, _}|_] -> - St1 = handle_frag(Elems, IP, Port, St), - {noreply, St1}; - _ -> - [ ServiceName, Timeout, Parameters ] = - opts([<<"service">>, <<"timeout">>, <<"parameters">>], - Elems, undefined), - - ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), - ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), - ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]), - - service_edge_rpc:handle_remote_message(St#st.cs, - {IP, Port}, - ServiceName, - Timeout, - Parameters), - {noreply, St} - end; + [ ServiceName, Timeout, Parameters ] = + opts([<<"service">>, <<"timeout">>, <<"parameters">>], + Elems, undefined), + ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]), + ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]), + ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]), + service_edge_rpc:handle_remote_message(St#st.cs, + {IP, Port}, + ServiceName, + Timeout, + Parameters), + {noreply, St}; handle_cast(Other, St) -> ?warning("proto_json_rpc:handle_cast(~p): unknown", [ Other ]), @@ -199,44 +183,3 @@ opt(K, L, Def) -> opts(Keys, Elems, Def) -> [ opt(K, Elems, Def) || K <- Keys]. - -use_frag(Params, DLinkOpts) -> - case p_reliable(Params) of - undefined -> - d_reliable(DLinkOpts); - Other -> - Other - end. - -%% We use reliable send (i.e. fragmentation support) if: -%% - rvi.max_msg_size is set in the Params (overrides static config) -%% - rvi.reliable = true in the Params -%% - max_msg_size is set for the data link -%% - {reliable, true} defined for the data link -%% -%% If {reliable, true} and no max_message_size, we send a single packet -%% as one fragment (marking it as first and last fragment) and use the -%% ack mechanism to acknowledge successful delivery. -%% -p_reliable([{"rvi.max_msg_size", Sz}|_]) -> {true, Sz}; -p_reliable([{"rvi.reliable", true}|_]) -> {true, infinity}; -p_reliable([{"rvi.reliable", false}|_]) -> false; -p_reliable([_|T]) -> p_reliable(T); -p_reliable([]) -> undefined. - -d_reliable([{max_msg_size, Sz}|_]) -> {true, Sz}; -d_reliable([{reliable, true}|_]) -> {true, infinity}; -d_reliable([{reliable, false}|_]) -> false; -d_reliable([_|T]) -> d_reliable(T); -d_reliable([]) -> false. - -chunk_message(Window, TID, _ServiceName, _DLinkMod, _DLinkOpts, Data, St) -> - _Frag = first_frag(Window, TID, Data), - - {ok, St}. - -handle_frag(_Elems, _IP, _Port, _St) -> - error(nyi). - -first_frag(_Window, _TID, _Data) -> - error(nyi). diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl index e718e4d..c8b083a 100644 --- a/components/proto_msgpack/src/proto_msgpack_rpc.erl +++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl @@ -136,8 +136,9 @@ handle_call({rvi, send_message, { <<"service">>, ServiceName }, { <<"timeout">>, Timeout }, { <<"parameters">>, Parameters } ], St#st.pack_opts), + RviOpts = rvi_common:rvi_options(Parameters), Res = DataLinkMod:send_data( - St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data), + St#st.cs, ?MODULE, ServiceName, RviOpts ++ DataLinkOpts, Data), {reply, Res, St}; handle_call(Other, _From, St) -> diff --git a/components/rvi_common/include/rvi_msgpack_rpc.hrl b/components/rvi_common/include/rvi_msgpack_rpc.hrl new file mode 100644 index 0000000..91c54d8 --- /dev/null +++ b/components/rvi_common/include/rvi_msgpack_rpc.hrl @@ -0,0 +1,17 @@ +%% -*- mode: erlang; indent-tabs-mode: nil; -*- +%%============================================================================= +%% +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ + +-define(CONNECT_TIMEOUT, 5000). +-define(CALL_TIMEOUT, 5000). + +-define(TYPE_REQUEST, 0). +-define(TYPE_RESPONSE, 1). +-define(TYPE_NOTIFY, 2). + +-define(RPC_PORT, 8000). diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index d3287c3..c227815 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -50,12 +50,18 @@ pass_log_id/2]). %% (PropList, CompSpec) -export([utc_timestamp/0, utc_timestamp/1]). --export([bin/1]). +-export([bin/1, + take/2]). -export([start_json_rpc_server/3, start_json_rpc_server/4]). +-export([start_msgpack_rpc/2, + start_msgpack_rpc/3]). -export([extract_json/2, normalize_json/1, term_to_json/1]). +-export([rvi_options/1]). +-export([save_source_address/3, + get_source_address/1]). -export([announce/1]). -define(NODE_SERVICE_PREFIX, node_service_prefix). @@ -266,6 +272,18 @@ send_json_notification(Url,Method, Args) -> {error, internal} end. +rvi_options(Opts) when is_list(Opts) -> + [{K,V} || {K,V} <- Opts, + is_rvi_opt(K)]. + +is_rvi_opt(K) -> + case re:run(K, <<"^rvi\\.">>, []) of + {match, _} -> + true; + nomatch -> + false + end. + term_to_json(Term) -> jsx:encode(normalize_json(Term)). @@ -540,7 +558,10 @@ get_component_specification_() -> CompList), protocol = get_component_config_(protocol, ?COMP_SPEC_PROTOCOL_DEFAULT, - CompList) + CompList), + rvi_common = get_component_config_(rvi_common, + ?COMP_SPEC_RVI_COMMON_DEFAULT, + CompList) } end. @@ -577,8 +598,8 @@ get_component_modules(_, _) -> get_module_specification(Component, Module, CompSpec) -> case get_component_modules(Component, CompSpec) of undefined -> - ?debug("get_module_specification(): Missing: rvi_core:component: ~p", - [Component]), + ?debug("get_module_specification(): Missing: rvi_core:component: ~p~nCS = ~p", + [Component, CompSpec]), undefined; Modules -> @@ -687,47 +708,79 @@ get_module_type(Component, Module, CompSpec) -> get_module_json_rpc_address(Component, Module, CompSpec) -> %% Dig out the JSON RPC address + get_module_rpc_address(json, Component, Module, CompSpec). + %% case get_module_config(Component, + %% Module, + %% json_rpc_address, + %% undefined, + %% CompSpec) of + %% {ok, undefined } -> + %% ?debug("get_module_json_rpc_address(): Missing component spec: " + %% "rvi_core:component:~p:~p:json_rpc_address, {...}", [Component, Module]), + %% {error, {not_found, Component, Module, json_rpc_address}}; + + %% {ok, { IP, Port }} -> + %% ?debug("get_module_json_rpc_address(~p, ~p) -> ~p:~p", + %% [ Component, Module, IP, Port]), + %% {ok, bin(IP), Port }; + + %% {ok, Port } -> + %% ?debug("get_module_json_rpc_address(~p, ~p) -> 127.0.0.1:~p", + %% [ Component, Module, Port]), + %% {ok, <<"127.0.0.1">>, Port} + %% end. + + +get_module_rpc_address(Type, Component, Module, CompSpec) + when Type == json; Type == msgpack -> + %% Dig out the JSON/MsgPack RPC address + Key = case Type of + json -> json_rpc_address; + msgpack -> msgpack_rpc_address + end, case get_module_config(Component, Module, - json_rpc_address, + Key, undefined, CompSpec) of {ok, undefined } -> - ?debug("get_module_json_rpc_address(): Missing component spec: " - "rvi_core:component:~p:~p:json_rpc_address, {...}", [Component, Module]), - {error, {not_found, Component, Module, json_rpc_address}}; + ?debug("get_module_rpc_address(): Missing component spec: " + "rvi_core:components:~p:~p:~s, {...}", + [Component, Module, Key]), + {error, {not_found, Component, Module, Key}}; {ok, { IP, Port }} -> - ?debug("get_module_json_rpc_address(~p, ~p) -> ~p:~p", - [ Component, Module, IP, Port]), + ?debug("get_module_rpc_address(~p, ~p, ~p) -> ~p:~p", + [Type, Component, Module, IP, Port]), {ok, bin(IP), Port }; {ok, Port } -> - ?debug("get_module_json_rpc_address(~p, ~p) -> 127.0.0.1:~p", - [ Component, Module, Port]), + ?debug("get_module_rpc_address(~p, ~p, ~p) -> 127.0.0.1:~p", + [Type, Component, Module, Port]), {ok, <<"127.0.0.1">>, Port} end. get_module_json_rpc_url(Component, Module, CompSpec) -> - case get_module_json_rpc_address(Component, Module, CompSpec) of + get_module_rpc_url(json, Component, Module, CompSpec). + +get_module_rpc_url(Type, Component, Module, CompSpec) + when Type == json; Type == msgpack -> + case get_module_rpc_address(Type, Component, Module, CompSpec) of { ok, IP, Port } when is_integer(Port)-> Res = bin(["http://", IP, ":", integer_to_binary(Port)]), - ?debug("get_module_json_rpc_url(~p, ~p) ->~p", [ Component, Module, Res ]), + ?debug("get_module_rpc_url(~p, ~p, ~p) ->~p", [Type, Component, Module, Res ]), Res; - - { ok, IP, Port } when is_list(Port)-> Res = bin(["http://", IP, ":", Port]), - ?debug("get_module_json_rpc_url(~p, ~p) ->~p", [ Component, Module, Res ]), + ?debug("get_module_rpc_url(~p, ~p, ~p) ->~p", [Type, Component, Module, Res ]), Res; Err -> - ?debug("get_module_json_rpc_url(~p, ~p) Failed: ~p", [ Component, Module, Err ]), + ?debug("get_module_rpc_url(~p, ~p, ~p) Failed: ~p", [Type, Component, Module, Err ]), Err end. - get_module_genserver_pid(Component, Module, CompSpec) -> %% Check that this is a JSON RPC module case get_module_type(Component, Module, CompSpec) of @@ -767,6 +820,39 @@ start_json_rpc_server(Component, Module, Supervisor, XOpts) -> Err end. +start_msgpack_rpc(Component, Module) -> + start_msgpack_rpc(Component, Module, []). + +start_msgpack_rpc(Component, Module, XOpts) -> + ?debug("start_msgpack_rpc(~w, ~w, ~p)", [Component, Module, XOpts]), + case get_module_rpc_address(msgpack, Component, Module, get_component_specification()) of + {ok, {client, Opts}} -> + ?debug("starting msgpack_rpc client: ~p", [Opts]), + start_msgpack_rpc_client(Component, Module, Opts, XOpts); + {ok, {server, Opts}} -> + ?debug("starting msgpack_rpc server: ~p", [Opts]), + start_msgpack_rpc_server(Component, Module, Opts, XOpts); + {ok, {IP, Port}} -> + start_msgpack_rpc_server(Component, Module, [{ip, IP}, {port, Port}], XOpts); + Error -> + ?debug("no recognized msgpack config for ~w:~w (~p)", + [Component, Module, Error]) + end. + +start_msgpack_rpc_client(Component, Module, Opts, XOpts) -> + Name = {msgpack_rpc_client, Component, Module}, + rvi_msgpack_rpc:start_link([{gproc, {n,l,Name}}|XOpts] ++ Opts). + +start_msgpack_rpc_server(Component, Module, Opts, XOpts) -> + Name = {msgpack_rpc_server, Component, Module}, + [Callback, Rest] = take([{callback, fun() -> msgpack_rpc_cb(Module) end}], + XOpts ++ Opts), + rvi_msgpack_rpc_server:start_link([{callback, Callback} | Rest]). + +msgpack_rpc_cb(Module) -> + binary_to_existing_atom( + <<(atom_to_binary(Module, latin1))/binary, "_msgpack">>, latin1). + utc_timestamp() -> calendar:datetime_to_gregorian_seconds( calendar:universal_time()) - seconds_jan_1970(). @@ -907,3 +993,40 @@ announce(Name) -> ?debug("Announce ~p~n", [Name]), gproc:reg(Name), ok. + +%% inet_ip(IP) when is_binary(IP) -> +%% inet_ip(binary_to_list(IP)); +%% inet_ip(IP) -> +%% {ok, Addr} = inet:ip(IP), +%% Addr. + + +%% take([ Key::atom() | {Key::atom(), Default} ], Opts) -> [Value | Rest] +take([H|T], Opts) when is_atom(H) -> + case lists:keytake(H, 1, Opts) of + {value, {_, Value}, Rest} -> + [Value | take(T, Rest)]; + false -> + error({required, H}) + end; +take([{H,Default}|T], Opts) -> + case lists:keytake(H, 1, Opts) of + {value, {_, Value}, Rest} -> + [Value | take(T, Rest)]; + false when is_function(Default, 0) -> + [Default() | take(T, Opts)]; + false -> + [Default | take(T, Opts)] + end; +take([], Opts) -> + [Opts]. + +save_source_address(client, Socket, CS) -> + {ok, {_, _} = Addr} = inet:peername(Socket), + set_value(source_address, Addr, CS); +save_source_address(server, Socket, CS) -> + {ok, {_, _} = Addr} = inet:sockname(Socket), + set_value(source_address, Addr, CS). + +get_source_address(CS) -> + get_value(source_address, undefined, CS). diff --git a/components/rvi_common/src/rvi_frag.erl b/components/rvi_common/src/rvi_frag.erl index 74c34da..d4000a4 100644 --- a/components/rvi_common/src/rvi_frag.erl +++ b/components/rvi_common/src/rvi_frag.erl @@ -1,27 +1,49 @@ -module(rvi_frag). --compile(export_all). +-behaviour(gen_server). + +-export([send/4, % (Msg, Window, Mod, Opts) + maybe_fragment/3]). +-export([start_link/0]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include_lib("lager/include/log.hrl"). + +-record(st, {msg_id = 0, + frags = dict_new()}). + +-record(frag, {id, + window, + msg, + tref}). --record(st, {}). -define(SERVER, ?MODULE). -define(TAB, ?MODULE). +-define(PACKET_MOD, dlink_data_msgpack). +-define(PACKET_ST, []). --type msg() :: binary(). --type chunk() :: binary(). --type tid() :: any(). --type offset() :: non_neg_integer(). --type chunk_size() :: non_neg_integer(). --type is_last() :: boolean(). --type frag() :: {offset(), offset(), chunk(), is_last()}. - -fragment(TID, {Offset, _, Bin, IsLast}) -> - fragment(TID, Offset, Bin, IsLast). +send(Msg, Opts, Mod, SendF) -> + ?debug("send(Msg, ~p, ~p, ~p)", [Opts, Mod, SendF]), + case use_frag(Msg, Opts) of + {true, Window} -> + gen_server:call(?SERVER, {send, Msg, Window, Mod, Opts}); + false -> + SendF() + end. --spec fragment(tid(), offset(), msg(), is_last()) -> - ok - | {message, msg()} - | {missing, [{offset(), offset()}]}. -fragment(TID, Offset, Bin, IsLast) -> - gen_server:call(?SERVER, {fragment, TID, Offset, Bin, IsLast}). +maybe_fragment([{<<"frg">>,[_|_] = Info}], Mod, Opts) -> + gen_server:call(?SERVER, {fragment_received, Info, Mod, Opts}); +maybe_fragment([{<<"frg-get">>, [_|_] = Args}], Mod, Opts) -> + gen_server:call(?SERVER, {frag_get_received, Args, Mod, Opts}); +maybe_fragment([{<<"frg-end">>, [_|_] = Args}], _Mod, _Opts) -> + gen_server:call(?SERVER, {frag_end_received, Args}); +maybe_fragment(_, _, _) -> + false. start_link() -> create_ets(), @@ -38,37 +60,63 @@ create_ets() -> init([]) -> {ok, #st{}}. --spec first_fragment(msg(), chunk_size()) -> frag(). -first_fragment(Msg, infinity) -> - {1, byte_size(Msg), Msg, true}; -first_fragment(Msg, ChunkSz) when is_integer(ChunkSz), ChunkSz > 0 -> - MsgBin = iolist_to_binary(Msg), - Sz = erlang:min(byte_size(MsgBin), ChunkSz), - <<Frag:Sz/binary, Rest/binary>> = MsgBin, - {1, Sz, Frag, Rest =:= <<>>}. - --spec next_fragment(msg(), offset() | frag(), chunk_size()) -> done | frag(). -next_fragment(Msg, {_, Last, _PrevFrag, _IsLast}, ChunkSz) -> - next_fragment(Msg, Last, ChunkSz); - -next_fragment(Msg, Last, ChunkSz) -> +fragment_from_offset(Msg, Offs, ChunkSz) -> + ?debug("fragment_from_offset(Msg, ~p, ~p)", [Offs, ChunkSz]), MsgBin = iolist_to_binary(Msg), - case byte_size(MsgBin) - Last of + ?debug("MsgBin size = ~p", [byte_size(MsgBin)]), + case byte_size(MsgBin) - Offs + 1 of NewSz when NewSz =< 0 -> - done; + ?debug("NewSz = ~p - empty fragment!!", [NewSz]), + <<>>; NewSz -> Sz = erlang:min(NewSz, ChunkSz), - <<_:Last/binary, Frag:Sz/binary, Rest/binary>> = MsgBin, - Start = Last+1, - Stop = Last + Sz, - {Start, Stop, Frag, Rest =:= <<>>} + Prev = Offs - 1, + ?debug("NewSz = ~p, Sz = ~p, Prev = ~p", [NewSz, Sz, Prev]), + <<_:Prev/binary, Frag:Sz/binary, _Rest/binary>> = MsgBin, + Frag + end. + +handle_call(Req, From, S) -> + try handle_call_(Req, From, S) + catch + error:R -> + {reply, {error, R}, S} end. -handle_call({fragment, TID, Offs, Bin, IsLast}, _, S) -> - End = Offs + byte_size(Bin) -1, - ets:insert(?TAB, {{TID, Offs, End}, Bin}), - {reply, check_message(TID, Offs, IsLast), S}; -handle_call(_, _, S) -> +handle_call_({frag_get_received, Info, Mod, Opts}, {Pid,_}, + #st{frags = Fs} = S) -> + [ID, Offset, Bytes] = Info, + TID = {Pid, ID}, + case dict_find(TID, Fs) of + {ok, #frag{msg = Msg}} -> + Bin = fragment_from_offset(Msg, Offset, Bytes), + Sz = byte_size(Msg), + Mod:send_data(Pid, encode_fragment(ID, Sz, Offset, Bin, Opts)), + {reply, true, S}; + error -> + %% Ignore, but reflect that it was a fragment message + %% (perhaps we should send an error message to the client?) + {reply, true, S} + end; + +handle_call_({send, Msg, Window, Mod, Opts}, {Pid, _}, St) -> + try init_frag(Msg, Window, Opts, Mod, Pid, St) + catch + error:R -> + ?error("init_frag ERROR: ~p~n~p", [R, erlang:get_stacktrace()]), + {reply, {error, R}, St} + end; +handle_call_({fragment_received, FragInfo, Mod, Opts}, {Pid,_}, S) -> + ?debug("fragment_received", []), + handle_fragment_received(FragInfo, Mod, Opts, Pid, S); + +handle_call_({frag_end_received, FragInfo}, {Pid,_}, + #st{frags = Fs} = S) -> + [ID, ResultCode] = FragInfo, + ?debug("fragment-end; ID = ~p; ResultCode = ~p", [ID, ResultCode]), + {reply, true, S#st{frags = dict_erase({Pid,ID}, Fs)}}; + +handle_call_(_, _, S) -> {reply, error, S}. handle_cast(_, S) -> {noreply, S}. @@ -76,15 +124,52 @@ handle_info(_, S) -> {noreply, S}. terminate(_, _) -> ok. code_change(_, S, _) -> {ok, S}. -check_message(TID, Offs, IsLast) -> +handle_fragment_received(FragInfo, Mod, Opts, Pid, S) -> + [ID, Size, Offs, Bin] = FragInfo, + FragSz = byte_size(Bin), + End = Offs + FragSz - 1, + TID = {Pid, ID}, + case Bin of + <<>> -> + ?debug("Empty fragment (~p); don't store", [FragInfo]), + ok; + _ -> + ?debug("ID = ~p, Size = ~p, Offs = ~p, End = ~p", + [ID, Size, Offs, End]), + ets:insert(?TAB, {{TID, Offs, End}, Bin}) + end, + if Offs == 1, End >= Size -> + send_msg_complete(ID, 0, Mod, Pid, Opts), + {reply, {true, Bin}, S}; + true -> + Check = check_message(TID, Offs, Size), + ?debug("check_message() -> ~p", [Check]), + case Check of + {message, Msg} -> + send_msg_complete(ID, 0, Mod, Pid, Opts), + {reply, {true, Msg}, S}; + {missing, [{Start, End}|_]} -> + ReqSz = erlang:min(FragSz, End-Start+1), + request_fragment(ID, Start, ReqSz, Mod, Pid, Opts), + {reply, true, S}; + ok -> + request_fragment(ID, End+1, FragSz, Mod, Pid, Opts), + {reply, true, S} + end + end. + +check_message(TID, Offs, Size) -> Frags = fragments(TID, Offs), - case find_holes(Frags) of - [] when IsLast -> + case find_holes(Frags, Size) of + {[], true} -> + ?debug("no holes, complete message", []), ets:select_delete(?TAB, [{ {{TID,'_','_'},'_'}, [], [true] }]), {message, join_fragments(Frags)}; - [] -> + {[], _} -> + ?debug("no holes, not complete", []), ok; - Holes -> + {Holes, _} -> + ?debug("found holes = ~p", [Holes]), {missing, Holes} end. @@ -93,18 +178,21 @@ fragments(TID, Offs) -> [{'=<', '$1', Offs}], [{{'$1','$2','$3'}}] }]). -find_holes(Frags) -> - {_, Missing} = +find_holes(Frags, Size) -> + {_, Missing, IsLast} = lists:foldl( - fun({A, B, _}, {Prev, Acc}) -> - case A - Prev of + fun({Beg, End, _Bin}, {Prev, Acc, IsLast0}) -> + ?debug("IsLast0 = ~p, Beg = ~p, End = ~p, Size = ~p", + [IsLast0, Beg, End, Size]), + IsLast1 = IsLast0 orelse (End >= Size), + case Beg - Prev of 1 -> - {B, Acc}; + {End, Acc, IsLast1}; Diff when Diff > 1 -> - {B, [{Prev+1, A-1}|Acc]} + {End, [{Prev+1, Beg-1}|Acc], IsLast1} end - end, {0, []}, Frags), - Missing. + end, {0, [], false}, Frags), + {Missing, IsLast}. %% Allow fragments to overlap join_fragments([{1,_,F}|Frags]) -> @@ -118,3 +206,156 @@ join_fragments([{A,_,F}|Frags], Acc) -> join_fragments(Frags, <<Prefix/binary, F/binary>>); join_fragments([], Acc) -> Acc. + +init_frag(Msg, Window, Opts, Mod, Pid, St) -> + ?debug("init_frag(Msg, ~p, ~p, ~p, ~p, ~p", [Window,Opts,Mod,Pid,St]), + {Id, St1} = next_id(St), + TID = {Pid, Id}, + Sz = byte_size(Msg), + AdjWindow = adjust_window(Window, Id, Sz, Opts), + ?debug("Adjusted window: ~p", [AdjWindow]), + Frag = fragment_from_offset(Msg, 1, AdjWindow), + Data = encode_fragment(Id, Sz, 1, Frag, Opts), + ?debug("size of encoded fragment (Win=~p): ~p", [Window, size(Data)]), + ok = Mod:send_data(Pid, Data), + TRef = start_timer(init_timeout, TID, Pid, Opts), + {reply, ok, store_frag(TID, #frag{id = Id, + window = AdjWindow, + msg = Msg, + tref = TRef}, St1)}. + +next_id(#st{msg_id = Prev} = St) -> + Id = Prev+1, + {Id, St#st{msg_id = Id}}. + +start_timer(Type, Id, Pid, Opts) -> + erlang:start_timer(timeout_value(Type, Opts), self(), {Type, Id, Pid}). + +store_frag(TID, #frag{} = Frag, #st{frags = Frags} = St) -> + St#st{frags = dict_store(TID, Frag, Frags)}. + +timeout_value(Type, Opts) -> + case lists:keyfind(Type, 1, Opts) of + {_, Value} -> Value; + false -> timeout_default(Type) + end. + +timeout_default(init_timeout) -> timer:hours(1); +timeout_default(request_timeout) -> timer:seconds(30). + + +dict_new() -> + orddict:new(). + +dict_store(Key, Value, Dict) -> + orddict:store(Key, Value, Dict). + +dict_find(Key, Dict) -> + orddict:find(Key, Dict). + +dict_erase(Key, Dict) -> + orddict:erase(Key, Dict). + +adjust_window(Window, Id, Sz, Opts) -> + %% Subtract framing size (encoded empty fragment) from Window, + %% but arbitrarily set minimum window to 10 (must at least be > 0) + Enc = encode_msg([{<<"frg">>, [Id, Sz, Sz, <<>>]}], Opts), + ?debug("Empty frag: ~p", [Enc]), + erlang:max(10, Window - byte_size(Enc)). + +encode_fragment(Id, Sz, Offs, Frag, Opts) -> + encode_msg([{<<"frg">>, [Id, Sz, Offs, Frag]}], Opts). + +request_fragment(ID, Start, Bytes0, Mod, Pid, Opts) -> + Bytes = erlang:max(get_window(Opts), Bytes0), + FragInfo = [ID, Start, Bytes], + ?debug("request_fragment: ~p", [FragInfo]), + Mod:send_data(Pid, encode_msg([{<<"frg-get">>, FragInfo}], Opts)). + +send_msg_complete(ID, ResultCode, Mod, Pid, Opts) -> + ?debug("send_msg_complete(~p, ~p, ~p, ~p, ...)", [ID, ResultCode, + Mod, Pid]), + Mod:send_data(Pid, encode_msg([{<<"frg-end">>, [ID, ResultCode]}], Opts)). + +encode_msg(Msg, Opts) -> + {PMod, PSt} = get_packet_mod(Opts), + {ok, Bin, _} = PMod:encode(Msg, PSt), + Bin. + +get_packet_mod(Opts) -> + case lists:keyfind(packet_mod, 1, Opts) of + false -> + {?PACKET_MOD, ?PACKET_MOD:init([])}; + {_, {Mod,_} = Res} when is_atom(Mod) -> + Res; + {_, Mod} when is_atom(Mod) -> + case lists:keyfind(packet_st, 1, Opts) of + false -> + {Mod, Mod:init([])}; + {_, St} -> + {Mod, St} + end + end. + +get_window([{"rvi.max_msg_size", Sz}|_]) -> Sz; +get_window([{max_msg_size, Sz}|_] ) -> Sz; +get_window([_|T]) -> + get_window(T); +get_window([]) -> + []. + +use_frag(Bin, Opts) -> + {PR, DR, PW, DW} = frag_opts(Opts), + Reliable = case {PR, DR} of + {_, _} when is_boolean(PR) -> PR; + {undefined, _} when is_boolean(DR) -> DR; + _ -> undefined + end, + Win = calc_window(PW, DW), + Sz = byte_size(Bin), + case Reliable of + true -> {true, Win}; + false -> + case Win of + _ when is_integer(Win) -> + if Sz < Win -> false; + true -> {true, Win} + end; + infinity -> + false + end; + undefined -> + if is_integer(Win) -> {true, Win}; + true -> false + end + end. + +frag_opts(Opts) -> + ?debug("frag_opts(~p)", [Opts]), + frag_opts(Opts, undefined, undefined, undefined, undefined). + +frag_opts([{"rvi.max_msg_size", PW}|T], PR, DR, _, DW) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([{<<"rvi.max_msg_size">>, PW}|T], PR, DR, _, DW) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([{max_msg_size, DW}|T], PR, DR, PW, _) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([{"rvi.reliable", PR}|T], _, DR, PW, DW) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([{reliable, DR}|T], PR, _, PW, DW) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([_|T], PR, DR, PW, DW) -> + frag_opts(T, PR, DR, PW, DW); +frag_opts([], PR, DR, PW, DW) -> + {PR, DR, PW, DW}. + +calc_window(PW, DW) when is_integer(PW), is_integer(DW) -> + erlang:min(PW, DW); +calc_window(undefined, DW) -> + calc_window(DW); +calc_window(PW, undefined) -> + calc_window(PW). + +calc_window(undefined) -> infinity; +calc_window(infinity ) -> infinity; +calc_window(W) when is_integer(W), W > 0 -> W. diff --git a/components/rvi_common/src/rvi_log.erl b/components/rvi_common/src/rvi_log.erl index 067485c..80040bf 100644 --- a/components/rvi_common/src/rvi_log.erl +++ b/components/rvi_common/src/rvi_log.erl @@ -106,11 +106,11 @@ timestamp() -> -define(ELEM(A), {element, #evt.A, '$_'}). -define(PROD, {{'$1', ?ELEM(level), ?ELEM(component), ?ELEM(event)}}). -fetch(Tids) -> - fetch(Tids, []). +fetch(Tid) -> + fetch(Tid, []). -fetch(Tids, Args) -> - TidSet = select_ids(Tids), +fetch(TidPat, Args) -> + TidSet = select_ids(TidPat), lists:foldr( fun(Tid, Acc) -> case match_events( @@ -267,6 +267,7 @@ handle_rpc(<<"log">>, Args) -> {ok, [{status, rvi_common:json_rpc_status(ok)}]}; handle_rpc(<<"fetch">>, Args) -> TIDs = get_json_ids(Args), + ?debug("fetch: TIDs = ~p", [TIDs]), Res = [{TID, fetch(TID)} || TID <- TIDs], {ok, [{status, rvi_common:json_rpc_status(ok)}, {<<"log">>, format_result(Res)}]}; @@ -383,30 +384,39 @@ valid_id_pat(TP) -> false end. -select_ids(TIDs) -> +select_ids(TidPat) -> ets:foldr( fun({Tid}, Acc) -> - case match_id(Tid, TIDs) of + case match_id(Tid, TidPat) of true -> [Tid|Acc]; false -> Acc end end, [], ?IDS). -match_id(Tid, [Pat|Pats]) -> +match_id(Tid, Pat) -> case re:run(Tid, Pat, []) of {match, _} -> true; - nomatch -> match_id(Tid, Pats) - end; -match_id(_, []) -> - false. + nomatch -> false + end. format_result(Log) -> - [{TID, format_events(Es)} || {TID, Es} <- Log]. - -format_events([{TS, Comp, Evt}|Es]) -> - [[{<<"ts">>, rvi_common:utc_timestamp(TS)}, + ?debug("format_result(~p)", [Log]), + Events = lists:foldl( + fun({_Pat, Matches}, Acc) -> + lists:foldl( + fun({Id,Es}, D) -> + orddict:store(Id, Es, D) + end, Acc, Matches) + end, orddict:new(), Log), + ?debug("Events = ~p", [Events]), + [{TID, format_events(Es)} || {TID, Es} <- Events]. + +format_events([{TS, Level, Comp, Evt} = E|Es]) -> + ?debug("format_events(), E = ~p", [E]), + [[{<<"ts">>, utc_hr_timestamp(TS)}, + {<<"lvl">>, bin(Level)}, {<<"cmp">>, bin(Comp)}, - {<<"evt">>, bin(Evt)}] || format_events(Es)]; + {<<"evt">>, bin(Evt)}] | format_events(Es)]; format_events([]) -> []. @@ -415,3 +425,11 @@ bin(B) when is_binary(B) -> B; bin(L) when is_list(L) -> iolist_to_binary(L); bin(Other) -> iolist_to_binary(io_lib:fwrite("~w", [Other])). + + +utc_hr_timestamp({_,_,US} = TS) -> + %% The 'rem' op is just a precaution; a properly generated 'now' TS + %% should not have US > 1000000, but a derived TS could (since just + %% about all operations on such timestamps will work anyway). + Secs = rvi_common:utc_timestamp(TS), + Secs + (US rem 1000000)/1000000. diff --git a/components/rvi_common/src/rvi_msgpack_rpc.erl b/components/rvi_common/src/rvi_msgpack_rpc.erl new file mode 100644 index 0000000..d1f2bf9 --- /dev/null +++ b/components/rvi_common/src/rvi_msgpack_rpc.erl @@ -0,0 +1,165 @@ +%% -*- mode: erlang; indent-tabs-mode: nil; -*- +%%============================================================================= +%% +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ +%% +-module(rvi_msgpack_rpc). + +-export([start_link/1, + start_link/3, + start_link/4, + start_link/5]). + +-export([call/3, + async_call/3, + notify/3, + join/2]). + +-export([control/4]). + +-include("rvi_msgpack_rpc.hrl"). +-include_lib("lager/include/log.hrl"). + +-record(st, {pending = [], id = 1, opts, buf = <<>>, + msgpack_opts = []}). + +start_link(Opts) -> + [IP, Port, ExoOpts, Rest] = rvi_common:take([ip, port, {exo, []}], Opts), + start_link(IP, Port, Rest, ExoOpts). + +start_link(Host, Port, Opts) -> + [Exo, Rest] = rvi_common:take([{exo, []}], Opts), + start_link(Host, Port, Rest, Exo). + +start_link(Host, Port, Opts, ExoOpts) -> + start_link(Host, Port, Opts, protos(ExoOpts), ExoOpts). + +start_link(Host, Port, Opts, Protos, ExoOpts) -> + ConnectTimeout = opt([connect_timeout, timeout], Opts, ?CONNECT_TIMEOUT), + case exo_socket:connect(Host, Port, Protos, ExoOpts, ConnectTimeout) of + {ok, Socket} -> + {ok, Pid} = exo_socket_session:start_link(Socket, ?MODULE, {Host, Port, Opts}), + exo_socket:controlling_process(Socket, Pid), + gen_server:cast(Pid, {activate, once}), + {ok, Pid}; + Error -> + Error + end. + +call(Pid, Method, Args) -> + call(Pid, Method, Args, ?CALL_TIMEOUT). + +call(Pid, Method, Args, Timeout) -> + gen_server:call(Pid, {call, Method, Args}, Timeout). + +async_call(Pid, Method, Args) -> + Ref = erlang:monitor(process, Pid), + ok = gen_server:call(Pid, {async_call, {self(), Ref}, Method, Args}), + Ref. + +notify(Pid, Method, Args) -> + gen_server:call(Pid, {notify, Method, Args}). + +join(Ref) -> + join(Ref, ?CALL_TIMEOUT). + +join(Ref, Timeout) -> + receive + {Ref, Reply} -> + erlang:demonitor(Ref), + Reply; + {'DOWN', Ref, _, _, Reason} -> + error(Reason) + after Timeout -> + error(timeout) + end. + +init({Host, Port, Opts}) -> + MsgPackOpts = opt([msgpack], Opts, rvi_msgpack_rpc_server:msgpack_options()), + gproc:reg({p,l,msgpack_rpc_client}, {Host, Port}), + case lists:keyfind(gproc, 1, Opts) of + {_, Reg} -> gproc:reg(Reg, {Host, Port}); + false -> ok + end, + {ok, #st{opts = Opts, + msgpack_opts = MsgPackOpts}}. + +control(XSocket, Req, From, St) -> + try control_(XSocket, Req, From, St) + catch + error:Reason -> + {reply, {error, Reason}, St} + end. + +control_(XSocket, {call, Method, Args}, From, + #st{pending = Pending, id = ID, msgpack_opts = MOpts} = St) -> + pack_and_send(XSocket, [?TYPE_REQUEST, ID, Method, Args], MOpts), + {noreply, St#st{pending = [{ID, From}|Pending], id = ID+1}}; +control_(XSocket, {async_call, From, Method, Args}, _, + #st{pending = Pending, msgpack_opts = MOpts, id = ID} = St) -> + pack_and_send(XSocket, [?TYPE_REQUEST, ID, Method, Args], MOpts), + {reply, ok, St#st{pending = [{ID, From}|Pending], id = ID+1}}; +control_(XSocket, {notify, Method, Args}, _, #st{msgpack_opts = MOpts} = St) -> + pack_and_send(XSocket, [?TYPE_NOTIFY, Method, Args], MOpts), + {reply, ok, St}; +control_(_, _, _, St) -> + {reply, {error, unsupported}, St}. + +pack_and_send(XSocket, Msg, MOpts) -> + Data = msgpack:pack(Msg, MOpts), + exo_socket:send(XSocket, Data). + +data(XSocket, Data, #st{buf = Buf, pending = Pending, + msgpack_opts = MOpts} = St) -> + Buf1 = <<Buf/binary, Data/binary>>, + try msgpack:unpack_stream(Buf1, MOpts) of + {[?TYPE_RESPONSE, ID, Error, Result], Rest} -> + case lists:keytake(ID, 1, Pending) of + {value, {_, From}, Pending1} -> + Reply = case Error of + null -> {ok, Result}; + _ -> {error, Error} + end, + gen_server:reply(From, Reply), + {ok, St#st{pending = Pending1, buf = Rest}}; + false -> + {ok, St#st{buf = Rest}} + end; + {error, incomplete} -> + {ok, St#st{buf = Buf1}}; + {error, Reason} -> + {ok, St#st{buf = <<>>}} + catch + error:Reason -> + ?debug("unpack CRASH: ~p", [Reason]), + {ok, St#st{buf = <<>>}} + end. + +opt([H|T], Opts, Default) -> + case lists:keyfind(H, 1, Opts) of + {_, Value} -> Value; + false when T==[] -> Default; + false -> + opt(T, Opts, Default) + end. + + +protos(Opts) -> + case [1 || {K,_} <- Opts, lists:member(K, ssl_connect_opts())] of + [] -> + [tcp]; + [_|_] -> + [tcp, ssl] + end. + +%% Copied from exo_socket.erl +ssl_connect_opts() -> + [versions, verify, verify_fun, + fail_if_no_peer_cert, + depth, cert, certfile, key, keyfile, + password, cacerts, cacertfile, dh, dhfile, cihpers, + debug]. diff --git a/components/rvi_common/src/rvi_msgpack_rpc_server.erl b/components/rvi_common/src/rvi_msgpack_rpc_server.erl new file mode 100644 index 0000000..cfce66f --- /dev/null +++ b/components/rvi_common/src/rvi_msgpack_rpc_server.erl @@ -0,0 +1,185 @@ +%% -*- mode: erlang; indent-tabs-mode: nil; -*- +%%============================================================================= +%% +%% Copyright (C) 2015, Jaguar Land Rover +%% +%% This program is licensed under the terms and conditions of the +%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ + +-module(rvi_msgpack_rpc_server). + +-behaviour(exo_socket_server). + +-include("rvi_msgpack_rpc.hrl"). +-include_lib("lager/include/log.hrl"). + +-record(state, + { + callback, + msgpack_opts = msgpack_options(), + buf = <<>> + }). + +-export([init/2, data/3, close/2, error/3]). + +-export([start/1, start/3, start/4]). +-export([start_link/1, + start_link/4]). +-export([start_ssl/1, start_ssl/3, start_ssl/4]). +-export([start_link_ssl/4]). +-export([control/4]). + +-export([msgpack_options/0]). + + +msgpack_options() -> + [{allow_atom, pack}, + {enable_str, true}, + jsx]. + +start(Callback) -> + start(?RPC_PORT, Callback, []). + +start(Port, Callback, Options) -> + start(Port, Callback, Options, []). + +start(Port, Callback, Options, ExoOptions) -> + do_start(Port, Callback, Options, ExoOptions, start). + +start_link(Opts) -> + [Port, Callback, Exo, Rest] = rvi_common:take([port, callback, {exo, []}], Opts), + start_link(Port, Callback, Rest, Exo). + +start_link(Port, Callback, Options, ExoOptions) -> + do_start(Port, Callback, Options, ExoOptions, start_link). + +do_start(Port, Callback, Options, ExoOptions, StartF) when StartF==start; + StartF==start_link -> + ?debug("do_start(~p, ~p, ~p, ~p, ~p)", [Port, Callback, Options, ExoOptions, StartF]), + case lists:keymember(ssl, 1, Options) of + {_, true} -> + start_ssl(Port, Options, ExoOptions); + _ -> + exo_socket_server:StartF(Port,[tcp], + [{active,once},{packet,0},binary, + {reuseaddr,true} | ExoOptions], + ?MODULE, {Callback, Options}) + end. + +start_ssl(Callback) -> + start_ssl(?RPC_PORT, Callback, []). + +start_ssl(Port, Callback, Options) -> + start_ssl(Port, Callback, Options, []). + +start_ssl(Port, Callback, Options, ExoOptions) -> + do_start_ssl(Port, Callback, Options, ExoOptions, start). + +start_link_ssl(Port, Callback, Options, ExoOptions) -> + do_start_ssl(Port, Callback, Options, ExoOptions, start_link). + +do_start_ssl(Port, Callback, Options, ExoOptions, StartF) when + StartF == start; StartF == start_link -> + KeyAndCert = key_and_cert(ExoOptions), + Verify = proplists:get_value(verify, ExoOptions, verify_none), + Debug = proplists:get_value(debug, ExoOptions, true), + exo_socket_server:StartF(Port,[tcp,probe_ssl], + KeyAndCert ++ + [{active,once},{packet,0},binary, + {debug, Debug}, + {verify, Verify}, %% no client cert required + {reuseaddr,true} | ExoOptions], ?MODULE, {Callback, Options}). + +key_and_cert(Opts) -> + Dir = code:priv_dir(rvi_common), + [{keyfile, opt(keyfile, Opts, filename:join(Dir, "host.key"))}, + {certfile, opt(certfile, Opts, filename:join(Dir, "host.cert"))}]. + +opt(K, Opts, Default) -> + case lists:keyfind(K, 1, Opts) of + {_, Value} -> + Value; + false -> + Default + end. + +init(Socket, {Callback, Options}) -> + ?debug("init(~p, ~p)", [Socket, {Callback, Options}]), + {ok,{IP,Port}} = exo_socket:peername(Socket), + ?debug("connection from: ~p : ~p", [IP, Port]), + gproc:reg({p,l,msgpack_rpc_server}, {IP,Port, Callback}), + case lists:keyfind(gproc, 1, Options) of + {_, Reg} -> + ?debug("registering with gproc: ~p", [Reg]), + gproc:reg(Reg, {IP, Port, Callback}); + false -> + ?debug("not registering name with gproc", []), + ok + end, + MsgPackOpts = opt(msgpack, Options, msgpack_options()), + {ok, #state{callback = Callback, msgpack_opts = MsgPackOpts}}. + +data(Socket, Data, #state{buf = Buf, msgpack_opts = Opts} = State) -> + Buf1 = <<Buf/binary, Data/binary>>, + try Dec = msgpack:unpack_stream(Buf1, Opts), + ?debug("decoded: ~p", [Dec]), + case Dec of + {[?TYPE_REQUEST, ID, Method, Args], Rest} -> + handle_call_request(Socket, ID, Method, Args, State#state{buf = Rest}); + {[?TYPE_NOTIFY, Method, Args], Rest} -> + handle_notify_request(Socket, Method, Args, State#state{buf = Rest}); + {error, incomplete} -> + {ok, State#state{buf = Buf1}}; + {error, Reason} -> + ?debug("error parsing stream: ~p", [Reason]), + {ok, State#state{buf = <<>>}} + end + catch + error:_Error -> + ?debug("decode error: ~p", [_Error]), + {ok,State} + end. + +control(_XSocket, _Request, _From, St) -> + {reply, {error, unsupported}, St}. + +%% +%% close - retrieve statistics +%% transport socket SHOULD still be open, but ssl may not handle this! +%% +close(Socket, State) -> + case exo_socket:getstat(Socket, exo_socket:stats()) of + {ok,_Stats} -> + ?debug("~w: close, stats=~w", [?MODULE, _Stats]), + {ok, State}; + {error,_Reason} -> + ?debug("~w: close, stats error=~w", [?MODULE, _Reason]), + {ok, State} + end. + +error(_Socket,Error,State) -> + ?debug("bert_rpc_exec: error = ~p\n", [Error]), + {stop, Error, State}. + +%% +%% Internal +%% +handle_call_request(Socket, ID, Method, Args, + #state{callback = CB, + msgpack_opts = Opts} = State) -> + try Res = apply(CB, binary_to_existing_atom(Method, latin1), Args), + Msg = msgpack:pack([?TYPE_RESPONSE, ID, null, Res], Opts), + exo_socket:send(Socket, Msg) + catch + error:Reason -> + ?debug("caught ~s ~p -> error:~p", [Method, Args, Reason]), + ReasonStr = lists:flatten(io_lib:fwrite("error:~w", [Reason])), + ErrMsg = msgpack:pack([?TYPE_RESPONSE, ID, ReasonStr, null]), + exo_socket:send(Socket, ErrMsg) + end, + {ok, State#state{buf = <<>>}}. + +handle_notify_request(_Socket, Method, Args, #state{callback = CB} = State) -> + apply(CB, binary_to_existing_atom(Method, latin1), Args), + {ok, State}. |