summaryrefslogtreecommitdiff
path: root/components/dlink_tcp
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
committerUlf Wiger <ulf@feuerlabs.com>2015-12-06 13:54:17 -0800
commit6cfeffca9f8e93e45dd885702a77896e2a1d0951 (patch)
tree620e2dd9006b52df7129d135fa7256d793571df1 /components/dlink_tcp
parent7d098a34b25704dbaa8bea0217ca6b7be37a0e48 (diff)
downloadrvi_core-6cfeffca9f8e93e45dd885702a77896e2a1d0951.tar.gz
new protocol & setup scripts
Diffstat (limited to 'components/dlink_tcp')
-rw-r--r--components/dlink_tcp/src/connection.erl110
-rw-r--r--components/dlink_tcp/src/dlink_tcp.app.src5
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl207
-rw-r--r--components/dlink_tcp/src/listener.erl4
4 files changed, 135 insertions, 191 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index 77300d9..b24215c 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -36,6 +36,7 @@
-define(SERVER, ?MODULE).
+-define(PACKET_MOD, dlink_data_json).
-record(st, {
ip = {0,0,0,0},
@@ -44,7 +45,9 @@
mod = undefined,
func = undefined,
args = undefined,
- pst = undefined %% Payload state
+ packet_mod = ?PACKET_MOD,
+ packet_st = [],
+ cs
}).
%%%===================================================================
@@ -52,8 +55,9 @@
%%%===================================================================
%% MFA is to deliver data received on the socket.
-setup(IP, Port, Sock, Mod, Fun, Arg) ->
- case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, Arg},[]) of
+setup(IP, Port, Sock, Mod, Fun, CS) ->
+ ?debug("setup(~p, ~p, Sock, ~p, ~p, ~p)", [IP, Port, Mod, Fun, CS]),
+ case gen_server:start_link(connection, {IP, Port, Sock, Mod, Fun, CS},[]) of
{ ok, GenSrvPid } = Res ->
gen_tcp:controlling_process(Sock, GenSrvPid),
gen_server:cast(GenSrvPid, {activate_socket, Sock}),
@@ -120,7 +124,7 @@ is_connection_up(IP, Port) ->
%% MFA used to handle socket closed, socket error and received data
%% When data is received, a separate process is spawned to handle
%% the MFA invocation.
-init({IP, Port, Sock, Mod, Fun, Arg}) ->
+init({IP, Port, Sock, Mod, Fun, CompSpec}) ->
case IP of
undefined -> ok;
_ -> connection_manager:add_connection(IP, Port, self())
@@ -131,18 +135,21 @@ init({IP, Port, Sock, Mod, Fun, Arg}) ->
?debug("connection:init(): Sock: ~p", [Sock]),
?debug("connection:init(): Module: ~p", [Mod]),
?debug("connection:init(): Function: ~p", [Fun]),
- ?debug("connection:init(): Arg: ~p", [Arg]),
- %% Grab socket control
+ {ok, PktMod} = get_module_config(packet_mod, ?PACKET_MOD, CompSpec),
+ PktSt = PktMod:init(CompSpec),
{ok, #st{
ip = IP,
port = Port,
sock = Sock,
mod = Mod,
func = Fun,
- args = Arg,
- pst = undefined
+ packet_mod = PktMod,
+ packet_st = PktSt,
+ cs = CompSpec
}}.
+get_module_config(Key, Default, CS) ->
+ rvi_common:get_module_config(dlink_tcp, dlink_tcp_rpc, Key, Default, CS).
%%--------------------------------------------------------------------
%% @private
@@ -182,13 +189,14 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_cast({send, Data}, St) ->
+handle_cast({send, Data}, #st{packet_mod = PMod, packet_st = PSt} = St) ->
?debug("~p:handle_cast(send): Sending: ~p",
[ ?MODULE, Data]),
+ {ok, Encoded, PSt1} = PMod:encode(Data, PSt),
+ ?debug("Encoded = ~p", [Encoded]),
+ gen_tcp:send(St#st.sock, Encoded),
- gen_tcp:send(St#st.sock, Data),
-
- {noreply, St};
+ {noreply, St#st{packet_st = PSt1}};
handle_cast({activate_socket, Sock}, State) ->
Res = inet:setopts(Sock, [{active, once}]),
?debug("connection:activate_socket(): ~p", [Res]),
@@ -221,30 +229,20 @@ handle_info({tcp, Sock, Data},
handle_info({tcp, Sock, Data},
#st { ip = IP,
port = Port,
- mod = Mod,
- func = Fun,
- args = Arg,
- pst = PST} = State) ->
+ packet_mod = PMod,
+ packet_st = PSt} = State) ->
?debug("handle_info(data): From: ~p:~p ", [IP, Port]),
-
- case jsx_decode_stream(Data, PST) of
- { [], NPST } ->
- ?debug("handle_info(data incomplete)", []),
+ case PMod:decode(Data, fun(Elems) ->
+ handle_elements(Elems, State)
+ end, PSt) of
+ {ok, PSt1} ->
inet:setopts(Sock, [{active, once}]),
- {noreply, State#st { pst = NPST} };
-
- { JSONElements, NPST } ->
- ?debug("data complete: Processed: ~p",
- [[authorize_keys:abbrev_payload(E) || E <- JSONElements]]),
- FromPid = self(),
- [Mod:Fun(FromPid, IP, Port, data, SingleElem, Arg)
- || SingleElem <- JSONElements],
- inet:setopts(Sock, [ { active, once } ]),
- {noreply, State#st { pst = NPST} }
+ {noreply, State#st{packet_st = PSt1}};
+ {error, Reason} ->
+ ?error("decode failed, Reason = ~p", [Reason]),
+ {stop, Reason, State}
end;
-
-
handle_info({tcp_closed, Sock},
#st { ip = IP,
port = Port,
@@ -252,7 +250,7 @@ handle_info({tcp_closed, Sock},
func = Fun,
args = Arg } = State) ->
?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port,closed, Arg),
+ Mod:Fun(self(), IP, Port, closed, Arg),
gen_tcp:close(Sock),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -304,15 +302,37 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-jsx_decode_stream(Data, St) ->
- jsx_decode_stream(Data, St, []).
-
-jsx_decode_stream(Data, undefined, Acc) ->
- case jsx:decode(Data, [stream, return_tail]) of
- {incomplete, Cont} ->
- {lists:reverse(Acc), Cont};
- {with_tail, Elems, <<>>} ->
- {lists:reverse([Elems|Acc]), undefined};
- {with_tail, Elems, Rest} ->
- jsx_decode_stream(Rest, undefined, [Elems|Acc])
- end.
+%% jsx_decode_stream(Data, St) ->
+%% jsx_decode_stream(Data, St, []).
+
+%% jsx_decode_stream(Data, undefined, Acc) ->
+%% case jsx:decode(Data, [stream, return_tail]) of
+%% {incomplete, Cont} ->
+%% {lists:reverse(Acc), Cont};
+%% {with_tail, Elems, <<>>} ->
+%% {lists:reverse([Elems|Acc]), undefined};
+%% {with_tail, Elems, Rest} ->
+%% jsx_decode_stream(Rest, undefined, [Elems|Acc])
+%% end.
+
+%% decode(Data, PMod, PSt, Mod, Fun, IP, Port, CS) ->
+%% case PMod:decode(Data, PSt) of
+%% {ok, Elements, PSt1} ->
+%% ?debug("data complete: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% {ok, PSt1};
+%% {more, Elements, Rest, PSt1} ->
+%% ?debug("data complete with Rest: Processed: ~p",
+%% [[authorize_keys:abbrev_payload(E) || E <- Elements]]),
+%% Mod:Fun(self(), IP, Port, data, Elements, CS),
+%% decode(Rest, PMod, PSt1, Mod, Fun, IP, Port, CS);
+%% {more, PSt1} ->
+%% {ok, PSt1};
+%% { ->
+
+handle_elements(Elements, #st{mod = Mod, func = Fun, cs = CS,
+ ip = IP, port = Port}) ->
+ ?debug("data complete: Processed: ~p",
+ [authorize_keys:abbrev(Elements)]),
+ Mod:Fun(self(), IP, Port, data, Elements, CS).
diff --git a/components/dlink_tcp/src/dlink_tcp.app.src b/components/dlink_tcp/src/dlink_tcp.app.src
index 53a32b8..5ba7760 100644
--- a/components/dlink_tcp/src/dlink_tcp.app.src
+++ b/components/dlink_tcp/src/dlink_tcp.app.src
@@ -2,7 +2,7 @@
%% Copyright (C) 2014, Jaguar Land Rover
%%
%% This program is licensed under the terms and conditions of the
-%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License, version 2.0. The full text of the
%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
%%
@@ -16,7 +16,8 @@
{applications, [
kernel,
stdlib,
- rvi_common
+ rvi_common,
+ dlink
]},
{mod, { dlink_tcp_app, []}},
{start_phases, [{json_rpc, []}, {connection_manager, []}, {announce, []}]},
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index b3ec80d..83e0a24 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -44,7 +44,7 @@
-define(DEFAULT_TCP_ADDRESS, "0.0.0.0").
-define(DEFAULT_PING_INTERVAL, 300000). %% Five minutes
-define(SERVER, ?MODULE).
--define(DLINK_TCP_VERSION, "1.0").
+-define(DLINK_TCP_VERSION, "1.1").
-define(CONNECTION_TABLE, rvi_dlink_tcp_connections).
-define(SERVICE_TABLE, rvi_dlink_tcp_services).
@@ -209,23 +209,10 @@ connect_remote(IP, Port, CompSpec) ->
%% Setup a genserver around the new connection.
{ok, Pid } = connection:setup(IP, Port, Sock,
- ?MODULE, handle_socket, [CompSpec] ),
+ ?MODULE, handle_socket, CompSpec ),
%% Send authorize
- { LocalIP, LocalPort} = rvi_common:node_address_tuple(),
- connection:send(
- Pid,
- term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, LocalPort },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES,
- get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ send_authorize(Pid, CompSpec),
ok;
{error, Err } ->
@@ -243,7 +230,7 @@ connect_and_retry_remote( IP, Port, CompSpec) ->
CS = start_log(<<"conn">>, "connect ~s:~s", [IP, Port], CompSpec),
case connect_remote(IP, list_to_integer(Port), CS) of
ok ->
- log("connected", [], CS),
+ log(result, "connected", [], CS),
ok;
Err -> %% Failed to connect. Sleep and try again
@@ -266,16 +253,8 @@ announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = connection:send(
- ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)
- ])),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = connection:send(ConnPid, Msg),
?debug("dlink_tcp:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -302,7 +281,7 @@ handle_socket(FromPid, IP, Port, Event, Payload, Arg) ->
handle_socket_(FromPid, undefined, SetupPort, closed, Arg) ->
handle_socket(FromPid, "0.0.0.0", SetupPort, closed, Arg);
-handle_socket_(FromPid, SetupIP, SetupPort, closed, [CompSpec]) ->
+handle_socket_(FromPid, SetupIP, SetupPort, closed, CompSpec) ->
?info("dlink_tcp:closed(): SetupAddress: {~p, ~p}", [ SetupIP, SetupPort ]),
NetworkAddress = SetupIP ++ ":" ++ integer_to_list(SetupPort),
@@ -348,7 +327,7 @@ handle_socket_(_FromPid, SetupIP, SetupPort, error, _ExtraArgs) ->
log_orphan(<<"sock">>, "socket ERROR ~s:~w", [SetupIP, SetupPort]),
ok.
-handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
+handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
?debug("data(): Elems ~p", [authorize_keys:abbrev_payload(Elems)]),
@@ -357,51 +336,30 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, [CompSpec]) ->
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
?debug("got authorize ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemotePort,
ProtoVersion,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
-
- Certificates =
- case CertificatesTmp of
- C when is_list(C) -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerIP, PeerPort,
- TransactionID, RemoteAddress, RemotePort,
- ProtoVersion, Signature, Certificates, CS);
+ RemoteAddress, RemotePort,
+ ProtoVersion, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
?debug("got service_announce ~s:~w", [PeerIP, PeerPort]),
- [ TransactionID,
- ProtoVersion,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_VERSION,
- ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- Conn = {PeerIP, PeerPort},
log("sa from ~s:~w", [PeerIP, PeerPort], CS),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- ?debug("Service Announce~nMsg = ~p~n", [Msg]),
- process_announce(Msg, FromPid, PeerIP, PeerPort,
- TransactionID, ProtoVersion, CompSpec);
- _ ->
- log("sa INVALID", [], CS),
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ process_announce(Status, Services, FromPid, PeerIP, PeerPort, CompSpec);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -503,10 +461,12 @@ handle_cast( {rvi, service_unavailable, [_SvcName, _]}, St) ->
{noreply, St};
handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) ->
+ ?debug("handle_socket, Arg (CS) = ~p", [Arg]),
try handle_socket_(FromPid, IP, Port, Event, Arg)
catch
C:E ->
?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]),
+ error("Caught ~p:~p", [C, E]),
ok
end,
{noreply, St};
@@ -588,12 +548,11 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
Res = connection:send(ConnPid,
- jsx:encode(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ])),
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
@@ -691,22 +650,21 @@ delete_services(ConnPid, SvcNameList) ->
}) || SvcName <- SvcNameList ],
ok.
-availability_msg(Availability, Services) ->
- [{ ?DLINK_ARG_STATUS, status_string(Availability) },
- { ?DLINK_ARG_SERVICES, Services }].
+availability_msg(Availability, Services, CompSpec) ->
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
+ { ?DLINK_ARG_STATUS, status_string(Availability) },
+ { ?DLINK_ARG_SERVICES, Services }
+ | log_id_tail(CompSpec) ].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
- RemotePort, ProtoVersion, Signature, Certificates, CompSpec) ->
+process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress,
+ RemotePort, ProtoVersion, Credentials, CompSpec) ->
?info("dlink_tcp:authorize(): Peer Address: ~p:~p", [PeerIP, PeerPort ]),
?info("dlink_tcp:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemotePort ]),
?info("dlink_tcp:authorize(): Protocol Ver: ~p", [ ProtoVersion ]),
- ?debug("dlink_tcp:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_tcp:authorize(): Certificates: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Certificates] ]),
- ?debug("dlink_tcp:authorize(): Signature: ~p", [ authorize_keys:abbrev_bin(Signature) ]),
-
+ ?debug("dlink_tcp:authorize(): Credentials: ~p", [ [authorize_keys:abbrev_bin(C) || C <- Credentials] ]),
{NRemoteAddress, NRemotePort} = Conn =
case { RemoteAddress, RemotePort } of
{ "0.0.0.0", 0 } ->
@@ -717,27 +675,19 @@ process_authorize(FromPid, PeerIP, PeerPort, TransactionID, RemoteAddress,
_ -> { RemoteAddress, RemotePort}
end,
- log("auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
- case validate_auth_jwt(Signature, Certificates, {PeerIP, PeerPort}, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [NRemoteAddress, NRemotePort], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
send_authorize(Pid, CompSpec) ->
{LocalIP, LocalPort} = rvi_common:node_address_tuple(),
connection:send(Pid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, LocalIP },
- { ?DLINK_ARG_PORT, integer_to_list(LocalPort) },
- { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
- { ?DLINK_ARG_CERTIFICATES, get_certificates(CompSpec) },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) }
- | rvi_common:log_id_json_tail(CompSpec) ])).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, LocalIP },
+ { ?DLINK_ARG_PORT, integer_to_binary(LocalPort) },
+ { ?DLINK_ARG_VERSION, ?DLINK_TCP_VERSION },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec) ]).
connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% If FromPid (the genserver managing the socket) is not yet registered
@@ -767,16 +717,9 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
?info("dlink_tcp:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteIP, RemotePort]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
log("sending sa: ~s:~w", [RemoteIP, RemotePort], CompSpec),
- connection:send(FromPid,
- rvi_common:term_to_json(
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT }
- | rvi_common:log_id_json_tail(CompSpec)])),
-
+ connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:cast(?SERVER, { setup_initial_ping, RemoteIP, RemotePort, FromPid }),
ok.
@@ -785,23 +728,18 @@ process_data(_FromPid, RemoteIP, RemotePort, ProtocolMod, Data, CompSpec) ->
?debug("dlink_tcp:receive_data(): RemoteAddr: {~p, ~p}", [ RemoteIP, RemotePort ]),
?debug("dlink_tcp:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
Proto = list_to_existing_atom(ProtocolMod),
- Proto:receive_message(CompSpec, {RemoteIP, RemotePort},
- base64:decode_to_string(Data)).
+ Proto:receive_message(CompSpec, {RemoteIP, RemotePort}, Data).
-process_announce(Elems, FromPid, IP, Port, TID, _Vsn, CompSpec) ->
- [ Avail,
- Svcs ] =
- opts([ ?DLINK_ARG_STATUS, ?DLINK_ARG_SERVICES ], Elems, undefined),
+process_announce(Avail, Services, FromPid, IP, Port, CompSpec) ->
?debug("dlink_tcp:service_announce(~p): Address: ~p:~p", [Avail,IP,Port]),
- ?debug("dlink_tcp:service_announce(~p): TransactionID: ~p", [Avail,TID]),
- ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Svcs]),
+ ?debug("dlink_tcp:service_announce(~p): Services: ~p", [Avail,Services]),
case Avail of
?DLINK_ARG_AVAILABLE ->
- add_services(Svcs, FromPid),
- service_discovery_rpc:register_services(CompSpec, Svcs, ?MODULE);
+ add_services(Services, FromPid),
+ service_discovery_rpc:register_services(CompSpec, Services, ?MODULE);
?DLINK_ARG_UNAVAILABLE ->
- delete_services(FromPid, Svcs),
- service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
+ delete_services(FromPid, Services),
+ service_discovery_rpc:unregister_services(CompSpec, Services, ?MODULE)
end,
ok.
@@ -837,36 +775,15 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
-
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
-
-term_to_json(Term) ->
- rvi_common:term_to_json(Term).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -886,4 +803,10 @@ start_log(Pfx, Fmt, Args, CS) ->
rvi_common:set_value(rvi_log_id, LogId, CS).
log(Fmt, Args, CS) ->
- rvi_log:flog(Fmt, Args, <<"dlink_tcp">>, CS).
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).
diff --git a/components/dlink_tcp/src/listener.erl b/components/dlink_tcp/src/listener.erl
index 45c0691..4512a59 100644
--- a/components/dlink_tcp/src/listener.erl
+++ b/components/dlink_tcp/src/listener.erl
@@ -63,7 +63,7 @@ terminate(_Reason, _State) ->
ok.
sock_opts() ->
- [binary, {active, once}, {packet, 4}].
+ [binary, {active, once}, {packet, 0}].
new_connection(IP, Port, Sock, State) ->
?debug("listener:new_connection(): Peer IP: ~p (ignored)", [IP]),
@@ -75,5 +75,5 @@ new_connection(IP, Port, Sock, State) ->
%% Provide component spec as extra arg.
{ok, _P} = connection:setup(undefined, 0, Sock,
dlink_tcp_rpc,
- handle_socket, [gen_nb_server:get_cb_state(State)]),
+ handle_socket, gen_nb_server:get_cb_state(State)),
{ok, State}.