diff options
author | Ulf Wiger <ulf@feuerlabs.com> | 2015-12-06 13:54:17 -0800 |
---|---|---|
committer | Ulf Wiger <ulf@feuerlabs.com> | 2015-12-06 13:54:17 -0800 |
commit | 6cfeffca9f8e93e45dd885702a77896e2a1d0951 (patch) | |
tree | 620e2dd9006b52df7129d135fa7256d793571df1 /components/dlink_tcp | |
parent | 7d098a34b25704dbaa8bea0217ca6b7be37a0e48 (diff) | |
download | rvi_core-6cfeffca9f8e93e45dd885702a77896e2a1d0951.tar.gz |
new protocol & setup scripts
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 110 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp.app.src | 5 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 207 | ||||
-rw-r--r-- | components/dlink_tcp/src/listener.erl | 4 |
4 files changed, 135 insertions, 191 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 77300d9..b24215c 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -36,6 +36,7 @@ -define(SERVER, ?MODULE). +-define(PACKET_MOD, dlink_data_json). -record(st, { ip = {0,0,0,0}, @@ -44,7 +45,9 @@ mod = undefined, func = undefined, args = undefined, - pst = undefined %% Payload state + packet_mod = ?PACKET_MOD, + packet_st = [], + cs }). %%%=================================================================== @@ -52,8 +55,9 @@ %%%=================================================================== %% MFA is to deliver data received on the socket. -setup(IP, Port, Sock, Mod, Fun, Arg) -> - case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of +setup(IP, Port, Sock, Mod, Fun, CS) -> + ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]), + case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of { ok, GenSrvPid } = Res -> gen_tcp:controlling_process(Sock, GenSrvPid), gen_server:cast(GenSrvPid, {activate_socket, Sock}), @@ -120,7 +124,7 @@ is_connection_up(IP, Port) -> %% MFA used to handle socket closed, socket error and received data %% When data is received, a separate process is spawned to handle %% the MFA invocation. -init({IP, Port, Sock, Mod, Fun, Arg}) -> +init({IP, Port, Sock, Mod, Fun, CompSpec}) -> case IP of undefined -> ok; _ -> connection_manager:add_connection(IP, Port, self()) @@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) -> ?debug("connection:init(): Sock: ~p", [Sock]), ?debug("connection:init(): Module: ~p", [Mod]), ?debug("connection:init(): Function: ~p", [Fun]), - ?debug("connection:init(): Arg: ~p", [Arg]), - %% Grab socket control + {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec), + PktSt = PktMod:init(CompSpec), {ok, #st{ ip = IP, port = Port, sock = Sock, mod = Mod, func = Fun, - args = Arg, - pst = undefined + packet_mod = PktMod, + packet_st = PktSt, + cs = CompSpec }}. +get_module_config(Key, Default, CS) -> + rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS). %%-------------------------------------------------------------------- %% @private @@ -182,13 +189,14 @@ handle_call(_Request, _From, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({send, Data}, St) -> +handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) -> ?debug("~p:handle_cast(send): Sending: ~p", [ ?MODULE, Data]), + {ok, Encoded, PSt1} = PMod:encode(Data, PSt), + ?debug("Encoded = ~p", [Encoded]), + gen_tcp:send(St#st.sock, Encoded), - gen_tcp:send(St#st.sock, Data), - - {noreply, St}; + {noreply, St#st{packet_st = PSt1}}; handle_cast({activate_socket, Sock}, State) -> Res = inet:setopts(Sock, [{active, once}]), ?debug("connection:activate_socket(): ~p", [Res]), @@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data}, handle_info({tcp, Sock, Data}, #st { ip = IP, port = Port, - mod = Mod, - func = Fun, - args = Arg, - pst = PST} = State) -> + packet_mod = PMod, + packet_st = PSt} = State) -> ?debug("handle_info(data): From: ~p:~p ", [IP, Port]), - - case jsx_decode_stream(Data, PST) of - { [], NPST } -> - ?debug("handle_info(data incomplete)", []), + case PMod:decode(Data, fun(Elems) -> + handle_elements(Elems, State) + end, PSt) of + {ok, PSt1} -> inet:setopts(Sock, [{active, once}]), - {noreply, State#st { pst = NPST} }; - - { JSONElements, NPST } -> - ?debug("data complete: Processed: ~p", - [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]), - FromPid = self(), - [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg) - || SingleElem <- JSONElements], - inet:setopts(Sock, [ { active, once } ]), - {noreply, State#st { pst = NPST} } + {noreply, State#st{packet_st = PSt1}}; + {error, Reason} -> + ?error("decode failed, Reason = ~p", [Reason]), + {stop, Reason, State} end; - - handle_info({tcp_closed, Sock}, #st { ip = IP, port = Port, @@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock}, func = Fun, args = Arg } = State) -> ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]), - Mod:Fun(self(), IP, Port,closed, Arg), + Mod:Fun(self(), IP, Port, closed, Arg), gen_tcp:close(Sock), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; @@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -jsx_decode_stream(Data, St) -> - jsx_decode_stream(Data, St, []). - -jsx_decode_stream(Data, undefined, Acc) -> - case jsx:decode(Data, [stream, return_tail]) of - {incomplete, Cont} -> - {lists:reverse(Acc), Cont}; - {with_tail, Elems, <<>>} -> - {lists:reverse([Elems|Acc]), undefined}; - {with_tail, Elems, Rest} -> - jsx_decode_stream(Rest, undefined, [Elems|Acc]) - end. +%% jsx_decode_stream(Data, St) -> +%% jsx_decode_stream(Data, St, []). + +%% jsx_decode_stream(Data, undefined, Acc) -> +%% case jsx:decode(Data, [stream, return_tail]) of +%% {incomplete, Cont} -> +%% {lists:reverse(Acc), Cont}; +%% {with_tail, Elems, <<>>} -> +%% {lists:reverse([Elems|Acc]), undefined}; +%% {with_tail, Elems, Rest} -> +%% jsx_decode_stream(Rest, undefined, [Elems|Acc]) +%% end. + +%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) -> +%% case PMod:decode(Data, PSt) of +%% {ok, Elements, PSt1} -> +%% ?debug("data complete: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% {ok, PSt1}; +%% {more, Elements, Rest, PSt1} -> +%% ?debug("data complete with Rest: Processed: ~p", +%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]), +%% Mod:Fun(self(), IP, Port, data, Elements, CS), +%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS); +%% {more, PSt1} -> +%% {ok, PSt1}; +%% { -> + +handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS, + ip = IP, port = Port}) -> + ?debug("data complete: Processed: ~p", + [authorize_keys:abbrev(Elements)]), + Mod:Fun(self(), IP, Port, data, Elements, CS). diff --git a/components/dlink_tcp/src/dlink_tcp.app.src b/components/dlink_tcp/src/dlink_tcp.app.src index 53a32b8..5ba7760 100644 --- a/components/dlink_tcp/src/dlink_tcp.app.src +++ b/components/dlink_tcp/src/dlink_tcp.app.src @@ -2,7 +2,7 @@ %% Copyright (C) 2014, Jaguar Land Rover %% %% This program is licensed under the terms and conditions of the -%% Mozilla Public License, version 2.0. The full text of the +%% Mozilla Public License, version 2.0. The full text of the %% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/ %% @@ -16,7 +16,8 @@ {applications, [ kernel, stdlib, - rvi_common + rvi_common, + dlink ]}, {mod, { dlink_tcp_app, []}}, {start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]}, diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index b3ec80d..83e0a24 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -44,7 +44,7 @@ -define(DEFAULT_TCP_ADDRESS, "0.0.0.0"). -define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes -define(SERVER, ?MODULE). --define(DLINK_TCP_VERSION, "1.0"). +-define(DLINK_TCP_VERSION, "1.1"). -define(CONNECTION_TABLE, rvi_dlink_tcp_connections). -define(SERVICE_TABLE, rvi_dlink_tcp_services). @@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) -> %% Setup a genserver around the new connection. {ok, Pid } = connection:setup(IP, Port, Sock, - ?MODULE, handle_socket, [CompSpec] ), + ?MODULE, handle_socket, CompSpec ), %% Send authorize - { LocalIP, LocalPort} = rvi_common:node_address_tuple(), - connection:send( - Pid, - term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, LocalPort }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, - get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) - ])), + send_authorize(Pid, CompSpec), ok; {error, Err } -> @@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) -> CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec), case connect_remote(IP, list_to_integer(Port), CS) of ok -> - log("connected", [], CS), + log(result, "connected", [], CS), ok; Err -> %% Failed to connect. Sleep and try again @@ -266,16 +253,8 @@ announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), - Res = connection:send( - ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec) - ])), + Msg = availability_msg(Availability, [Service], CompSpec), + Res = connection:send(ConnPid, Msg), ?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) -> handle_socket_(FromPid, undefined, SetupPort, closed, Arg) -> handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg); -handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) -> +handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) -> ?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]), NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort), @@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) -> log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]), ok. -handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> +handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]), @@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) -> case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> ?debug("got authorize ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - RemoteAddress, + [ RemoteAddress, RemotePort, ProtoVersion, - CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + Credentials ] = + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), - - Certificates = - case CertificatesTmp of - C when is_list(C) -> C; - undefined -> [] - end, process_authorize(FromPid, PeerIP, PeerPort, - TransactionID, RemoteAddress, RemotePort, - ProtoVersion, Signature, Certificates, CS); + RemoteAddress, RemotePort, + ProtoVersion, Credentials, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> ?debug("got service_announce ~s:~w", [PeerIP, PeerPort]), - [ TransactionID, - ProtoVersion, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_VERSION, - ?DLINK_ARG_SIGNATURE], + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), - Conn = {PeerIP, PeerPort}, log("sa from ~s:~w", [PeerIP, PeerPort], CS), - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - ?debug("Service Announce~nMsg = ~p~n", [Msg]), - process_announce(Msg, FromPid, PeerIP, PeerPort, - TransactionID, ProtoVersion, CompSpec); - _ -> - log("sa INVALID", [], CS), - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) -> {noreply, St}; handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) -> + ?debug("handle_socket, Arg (CS) = ~p", [Arg]), try handle_socket_(FromPid, IP, Port, Event, Arg) catch C:E -> ?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), + error("Caught ~p:~p", [C, E]), ok end, {noreply, St}; @@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> Res = connection:send(ConnPid, - jsx:encode( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ])), + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, Data } + ]), { reply, [ Res ], St} end; @@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) -> }) || SvcName <- SvcNameList ], ok. -availability_msg(Availability, Services) -> - [{ ?DLINK_ARG_STATUS, status_string(Availability) }, - { ?DLINK_ARG_SERVICES, Services }]. +availability_msg(Availability, Services, CompSpec) -> + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, + { ?DLINK_ARG_STATUS, status_string(Availability) }, + { ?DLINK_ARG_SERVICES, Services } + | log_id_tail(CompSpec) ]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, - RemotePort, ProtoVersion, Signature, Certificates, CompSpec) -> +process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, + RemotePort, ProtoVersion, Credentials, CompSpec) -> ?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]), ?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]), ?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]), - ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]), - ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]), - + ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]), {NRemoteAddress, NRemotePort} = Conn = case { RemoteAddress, RemotePort } of { "0.0.0.0", 0 } -> @@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress, _ -> { RemoteAddress, RemotePort} end, - log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), - case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. + log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn), + connection_authorized(FromPid, Conn, CompSpec). send_authorize(Pid, CompSpec) -> {LocalIP, LocalPort} = rvi_common:node_address_tuple(), connection:send(Pid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, LocalIP }, - { ?DLINK_ARG_PORT, integer_to_list(LocalPort) }, - { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, - { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } - | rvi_common:log_id_json_tail(CompSpec) ])). + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, LocalIP }, + { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) }, + { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION }, + { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } + | log_id_tail(CompSpec) ]). connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% If FromPid (the genserver managing the socket) is not yet registered @@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> ?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteIP, RemotePort]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), + AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec), log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec), - connection:send(FromPid, - rvi_common:term_to_json( - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } - | rvi_common:log_id_json_tail(CompSpec)])), - + connection:send(FromPid, AvailabilityMsg), %% Setup ping interval gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }), ok. @@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) -> ?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]), ?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), Proto = list_to_existing_atom(ProtocolMod), - Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, - base64:decode_to_string(Data)). + Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data). -process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) -> - [ Avail, - Svcs ] = - opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined), +process_announce(Avail, Services, FromPid, IP, Port, CompSpec) -> ?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]), - ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]), - ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]), + ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]), case Avail of ?DLINK_ARG_AVAILABLE -> - add_services(Svcs, FromPid), - service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE); + add_services(Services, FromPid), + service_discovery_rpc:register_services(CompSpec, Services, ?MODULE); ?DLINK_ARG_UNAVAILABLE -> - delete_services(FromPid, Svcs), - service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) + delete_services(FromPid, Services), + service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE) end, ok. @@ -837,36 +775,15 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). - -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; - [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) - end. - -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of - [ok] -> - true; - [not_found] -> - false - end. - -term_to_json(Term) -> - rvi_common:term_to_json(Term). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; @@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) -> rvi_common:set_value(rvi_log_id, LogId, CS). log(Fmt, Args, CS) -> - rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS). + log(info, Fmt, Args, CS). + +log(Lvl, Fmt, Args, CS) -> + rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS). + +log_id_tail(CompSpec) -> + rvi_common:log_id_json_tail(CompSpec). diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl index 45c0691..4512a59 100644 --- a/components/dlink_tcp/src/listener.erl +++ b/components/dlink_tcp/src/listener.erl @@ -63,7 +63,7 @@ terminate(_Reason, _State) -> ok. sock_opts() -> - [binary, {active, once}, {packet, 4}]. + [binary, {active, once}, {packet, 0}]. new_connection(IP, Port, Sock, State) -> ?debug("listener:new_connection(): Peer IP: ~p (ignored)", [IP]), @@ -75,5 +75,5 @@ new_connection(IP, Port, Sock, State) -> %% Provide component spec as extra arg. {ok, _P} = connection:setup(undefined, 0, Sock, dlink_tcp_rpc, - handle_socket, [gen_nb_server:get_cb_state(State)]), + handle_socket, gen_nb_server:get_cb_state(State)), {ok, State}. |