summaryrefslogtreecommitdiff
path: root/components/dlink_bt/src/dlink_bt_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/dlink_bt/src/dlink_bt_rpc.erl')
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl296
1 files changed, 124 insertions, 172 deletions
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index ab5f8ac..542fd00 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -34,7 +34,7 @@
-include_lib("lager/include/log.hrl").
-include_lib("rvi_common/include/rvi_common.hrl").
--include_lib("rvi_common/include/rvi_dlink.hrl").
+-include_lib("rvi_common/include/rvi_dlink_bin.hrl").
-define(PERSISTENT_CONNECTIONS, persistent_connections).
-define(DEFAULT_BT_CHANNEL, 1).
@@ -60,6 +60,7 @@
}).
-record(st, {
+ mode = bt, %% tcp | bt
cs = #component_spec{}
}).
@@ -93,9 +94,11 @@ init([]) ->
{ keypos, #connection_entry.connection }]),
CS = rvi_common:get_component_specification(),
+ Mode = get_mode(CS),
service_discovery_rpc:subscribe(CS, ?MODULE),
{ok, #st {
+ mode = Mode,
cs = CS
}
}.
@@ -105,15 +108,19 @@ start_json_server() ->
start_connection_manager() ->
+ ?debug("start_connection_manager()", []),
CompSpec = rvi_common:get_component_specification(),
- {ok, BertOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts,
- [],
- CompSpec),
+ ServerOpts = get_server_opts(CompSpec),
+ start_connection_manager(ServerOpts, CompSpec).
+
+start_connection_manager([], _) ->
+ ?debug("No BT server options set; start only the connection manager", []),
+ bt_connection_manager:start_link();
+start_connection_manager(ServerOpts, CompSpec) ->
%% Retrieve the channel we should use
- Mode = get_mode(BertOpts),
- Channel = get_channel(Mode, BertOpts),
+ ?debug("ServerOpts = ~p", [ServerOpts]),
+ Mode = get_mode(ServerOpts),
+ Channel = get_channel(Mode, ServerOpts),
?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]),
@@ -124,6 +131,7 @@ start_connection_manager() ->
bt:start(),
bt:debug(debug);
tcp ->
+ ?debug("Mode == tcp; not starting bt driver", []),
ok
end,
bt_listener:start_link(Mode),
@@ -146,11 +154,21 @@ start_connection_manager() ->
[],
CompSpec),
- setup_persistent_connections_(PersistentConnections, CompSpec),
+ setup_persistent_connections_(PersistentConnections, Mode, CompSpec),
ok.
-get_mode(Opts) ->
+get_server_opts(CS) when element(1, CS) == component_spec ->
+ {ok, ServerOpts } = rvi_common:get_module_config(data_link,
+ ?MODULE,
+ server_opts,
+ [],
+ CS),
+ ServerOpts.
+
+get_mode(CS) when element(1, CS) == component_spec ->
+ get_mode(get_server_opts(CS));
+get_mode(Opts) when is_list(Opts) ->
proplists:get_value(test_mode, Opts, bt).
get_channel(tcp, Opts) ->
@@ -159,15 +177,15 @@ get_channel(bt, Opts) ->
proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL).
-setup_persistent_connections_([ ], _CompSpec) ->
+setup_persistent_connections_([ ], _, _CompSpec) ->
ok;
-setup_persistent_connections_([ BTAddress | T], CompSpec) ->
+setup_persistent_connections_([ BTAddress | T], Mode, CompSpec) ->
?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]),
- [ BTAddr, Channel] = string:tokens(BTAddress, "-"),
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
- setup_persistent_connections_(T, CompSpec),
+ [ BTAddr, Channel] = string:tokens(BTAddress, "-:"), %% Addr-Chan | IP:Port
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
+ setup_persistent_connections_(T, Mode, CompSpec),
ok.
@@ -213,7 +231,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
%%
%% Connect to a remote RVI node.
%%
-connect_remote(BTAddr, Channel, CompSpec) ->
+connect_remote(BTAddr, Channel, Mode, CompSpec) ->
case bt_connection_manager:find_connection_by_address(BTAddr, Channel) of
{ ok, _Pid } ->
already_connected;
@@ -225,7 +243,7 @@ connect_remote(BTAddr, Channel, CompSpec) ->
%%FIXME
%% Setup a genserver around the new connection.
- case bt_connection:connect(BTAddr, Channel,
+ case bt_connection:connect(BTAddr, Channel, Mode,
?MODULE, handle_socket, CompSpec ) of
{ ok, Pid } ->
?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p",
@@ -239,45 +257,33 @@ connect_remote(BTAddr, Channel, CompSpec) ->
end
end.
-
-connect_and_retry_remote( BTAddr, Channel, CompSpec) ->
+connect_and_retry_remote( BTAddr, Channel, Mode, CompSpec) ->
?info("dlink_bt:connect_and_retry_remote(): ~p:~p",
[ BTAddr, Channel]),
- case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of
- ok -> ok;
-
+ CS = start_log(<<"conn">>, "connect ~s:~s", [BTAddr, Channel], CompSpec),
+ case connect_remote(BTAddr, list_to_integer(Channel), Mode, CS) of
+ ok ->
+ ok;
Err -> %% Failed to connect. Sleep and try again
?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p",
- [BTAddr, Channel, Err]),
-
- ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec",
- [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
-
- setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec),
-
+ [BTAddr, Channel, Err]),
+ ?notice("dlink_bt:connect_and_retry_remote(~p:~p):"
+ " Will try again in ~p sec",
+ [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]),
+ setup_reconnect_timer(
+ ?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CS),
not_available
end.
-
-
announce_local_service_(_CompSpec, [], _Service, _Availability) ->
ok;
announce_local_service_(CompSpec,
[ConnPid | T],
Service, Availability) ->
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(Availability, [Service])),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- { struct,
- [
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_TRANSACTION_ID, 3},
- { ?DLINK_ARG_SIGNATURE, JWT }
- ]
- })),
+ Msg = availability_msg(Availability, [Service], CompSpec),
+ Res = bt_connection:send(ConnPid, Msg),
?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p",
[ Availability, Service, ConnPid, Res]),
@@ -299,24 +305,23 @@ announce_local_service_(CompSpec, Service, Availability) ->
process_data(_FromPid, RemoteBTAddr, RemoteChannel, ProtocolMod, Data, CompSpec) ->
?debug("dlink_bt:receive_data(): SetupAddress: {~p, ~p}", [ RemoteBTAddr, RemoteChannel ]),
?debug("dlink_bt:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]),
- Proto = list_to_atom(ProtocolMod),
- Proto:receive_message(CompSpec, base64:decode_to_string(Data)),
+ Proto = list_to_existing_atom(ProtocolMod),
+ Proto:receive_message(CompSpec, {RemoteBTAddr, RemoteChannel}, Data),
ok.
-availability_msg(Availability, Services) ->
- {struct, [{?DLINK_ARG_STATUS, status_string(Availability)},
- {?DLINK_ARG_SERVICES, {array, Services}}]}.
+availability_msg(Availability, Services, CompSpec) ->
+ [{?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE},
+ {?DLINK_ARG_STATUS, status_string(Availability)},
+ {?DLINK_ARG_SERVICES, Services}
+ | log_id_tail(CompSpec)].
status_string(available ) -> ?DLINK_ARG_AVAILABLE;
status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE.
-process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
- {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg),
- {ok, Svcs} = rvi_common:get_json_element([?DLINK_ARG_SERVICES], Msg),
+process_announce(Avail, Svcs, FromPid, Addr, Channel, CompSpec) ->
?debug("dlink_bt_rpc:service_announce(~p): Address: ~p:~p", [Avail, Addr, Channel]),
- ?debug("dlink_bt_rpc:service_announce(~p): TransactionID: ~p", [Avail, TID]),
?debug("dlink_bt_rpc:service_announce(~p): Services: ~p", [Avail, Svcs]),
case Avail of
?DLINK_ARG_AVAILABLE ->
@@ -327,23 +332,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) ->
service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE)
end.
-process_authorize(FromPid,
- PeerBTAddr,
- PeerBTChannel,
- TransactionID,
- RemoteAddress,
- RemoteChannel,
- Protocol,
- Certificates,
- Signature,
- CompSpec) ->
-
+process_authorize(FromPid, PeerBTAddr, PeerBTChannel,
+ RemoteAddress, RemoteChannel, Protocol,
+ Credentials, CompSpec) ->
?info("dlink_bt:authorize(): Peer Address: ~p:~p", [PeerBTAddr, PeerBTChannel ]),
?info("dlink_bt:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemoteChannel ]),
- ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
- ?debug("dlink_bt:authorize(): TransactionID: ~p", [ TransactionID ]),
- ?debug("dlink_bt:authorize(): Certificates: ~p", [ Certificates ]),
- ?debug("dlink_bt:authorize(): Signature: ~p", [ Signature ]),
+ ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]),
+ ?debug("dlink_bt:authorize(): Credentials: ~p", [ Credentials ]),
%% If FromPid (the genserver managing the socket) is not yet registered
%% with the conneciton manager, this is an incoming connection
@@ -351,59 +346,42 @@ process_authorize(FromPid,
%% a service announce
Conn = {RemoteAddress, RemoteChannel},
- case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of
- true ->
- connection_authorized(FromPid, Conn, CompSpec);
- false ->
- %% close connection (how?)
- false
- end.
+ log(result, "auth ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ authorize_rpc:store_creds(CompSpec, Credentials, Conn),
+ connection_authorized(FromPid, Conn, CompSpec).
handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
- Payload, CompSpec) ->
+ Elems, CompSpec) ->
- {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)),
?debug("dlink_bt:data(): Got ~p", [ Elems ]),
+ CS = rvi_common:pick_up_json_log_id(Elems, CompSpec),
+
case opt(?DLINK_ARG_CMD, Elems, undefined) of
?DLINK_CMD_AUTHORIZE ->
- [ TransactionID,
- RemoteAddress,
+ [ RemoteAddress,
RemoteChannel,
RVIProtocol,
- CertificatesTmp,
- Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID,
- ?DLINK_ARG_ADDRESS,
+ Credentials ] =
+ opts([?DLINK_ARG_ADDRESS,
?DLINK_ARG_PORT,
?DLINK_ARG_VERSION,
- ?DLINK_ARG_CERTIFICATES,
- ?DLINK_ARG_SIGNATURE],
+ ?DLINK_ARG_CREDENTIALS],
Elems, undefined),
- Certificates =
- case CertificatesTmp of
- { array, C} -> C;
- undefined -> []
- end,
process_authorize(FromPid, PeerBTAddr, RemoteChannel,
- TransactionID, RemoteAddress, RemoteChannel,
- RVIProtocol, Certificates, Signature, CompSpec);
-
-
+ RemoteAddress, RemoteChannel,
+ RVIProtocol, Credentials, CS);
?DLINK_CMD_SERVICE_ANNOUNCE ->
- Conn = {PeerBTAddr, PeerChannel},
- [ TransactionID, Signature ] =
- opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE],
+ [ Status,
+ Services ] =
+ opts([?DLINK_ARG_STATUS,
+ ?DLINK_ARG_SERVICES],
Elems, undefined),
- case authorize_rpc:validate_message(CompSpec, Signature, Conn) of
- [ok, Msg] ->
- process_availability(
- Msg, FromPid, PeerBTAddr, PeerChannel, TransactionID, CompSpec);
- _ ->
- ?debug("Couldn't validate availability msg from ~p", [Conn])
- end;
+ log("sa from ~s:~w", [PeerBTAddr, PeerChannel], CS),
+ process_announce(Status, Services, FromPid, PeerBTAddr,
+ PeerChannel, CS);
?DLINK_CMD_RECEIVE ->
[ _TransactionID,
@@ -414,14 +392,14 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data,
?DLINK_ARG_DATA],
Elems, undefined),
process_data(FromPid, PeerBTAddr, PeerChannel,
- ProtocolMod, Data, CompSpec);
+ ProtocolMod, Data, CS);
?DLINK_CMD_PING ->
?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]),
ok;
undefined ->
- ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
+ ?error("dlink_bt:data() cmd undefined., ~p", [ Elems ]),
ok
end.
@@ -567,7 +545,7 @@ handle_cast(Other, St) ->
{noreply, St}.
-handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
+handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, #st{mode = Mode} = St) ->
%% Do we already have a connection that supchannel service?
case get_connections_by_service(Service) of
[] -> %% Nope
@@ -580,7 +558,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) ->
Addr ->
[ Address, Channel] = string:tokens(Addr, "-"),
- case connect_remote(Address, list_to_integer(Channel), St#st.cs) of
+ case connect_remote(Address, list_to_integer(Channel), Mode, St#st.cs) of
ok ->
{ reply, [ok, 2000], St }; %% 2 second timeout
@@ -613,27 +591,20 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S
%% FIXME: What to do if we have multiple connections to the same service?
[ConnPid | _T] ->
?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]),
- Res = bt_connection:send(ConnPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
- { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
- { ?DLINK_ARG_DATA, base64:encode_to_string(Data) }
- ]})),
-
+ Res = bt_connection:send(
+ ConnPid,
+ [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
+ { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE },
+ { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) },
+ { ?DLINK_ARG_DATA, Data }
+ ]),
{ reply, [ Res ], St}
end;
-
-
handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) ->
%% Create a timer to handle periodic pings.
- {ok, ServerOpts } = rvi_common:get_module_config(data_link,
- ?MODULE,
- server_opts, [],
- St#st.cs),
+ ServerOpts = get_server_opts(St#st.cs),
Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL),
?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec",
@@ -656,12 +627,7 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
case bt_connection:is_connection_up(Pid) of
true ->
?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]),
- bt_connection:send(Pid, term_to_json(
- { struct,
- [ { ?DLINK_ARG_CMD,
- ?DLINK_CMD_PING
- }]})),
-
+ bt_connection:send(Pid, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Channel, Timeout });
@@ -671,8 +637,9 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) ->
{noreply, St};
%% Setup static nodes
-handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, St) ->
- connect_and_retry_remote(BTAddr, Channel, CompSpec),
+handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec },
+ #st{mode = Mode} = St) ->
+ connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec),
{ noreply, St };
handle_info(Info, St) ->
@@ -688,17 +655,15 @@ code_change(_OldVsn, St, _Extra) ->
send_authorize(Pid, SetupChannel, CompSpec) ->
{ok,[{address, Address }]} = bt_drv:local_info([address]),
bt_connection:send(Pid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
- { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
- { ?DLINK_ARG_PORT, SetupChannel },
- { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
- { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} },
- { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})).
+ [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE },
+ { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) },
+ { ?DLINK_ARG_PORT, SetupChannel },
+ { ?DLINK_ARG_VERSION, ?DLINK_BT_VER },
+ { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) }
+ | log_id_tail(CompSpec)]).
connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ->
+ log("authorized: ~s:~p", [RemoteAddress, RemoteChannel], CompSpec),
case bt_connection_manager:find_connection_by_pid(FromPid) of
not_found ->
?info("dlink_bt:authorize(): New connection!"),
@@ -721,15 +686,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
?info("dlink_bt:authorize(): Announcing local services: ~p to remote ~p:~p",
[FilteredServices, RemoteAddress, RemoteChannel]),
- [ ok, JWT ] = authorize_rpc:sign_message(
- CompSpec, availability_msg(available, FilteredServices)),
- bt_connection:send(FromPid,
- term_to_json(
- {struct,
- [ { ?DLINK_ARG_TRANSACTION_ID, 1 },
- { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE },
- { ?DLINK_ARG_SIGNATURE, JWT } ]})),
-
+ AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec),
+ log("sending sa: ~s:~w", [RemoteAddress, RemoteChannel], CompSpec),
+ bt_connection:send(FromPid, AvailabilityMsg),
%% Setup ping interval
gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemoteChannel, FromPid }),
ok.
@@ -828,10 +787,6 @@ get_connections(Key, Acc) ->
get_connections() ->
get_connections(ets:first(?CONNECTION_TABLE), []).
-
-term_to_json(Term) ->
- binary_to_list(iolist_to_binary(exo_json:encode(Term))).
-
opt(K, L, Def) ->
case lists:keyfind(K, 1, L) of
{_, V} -> V;
@@ -841,28 +796,25 @@ opt(K, L, Def) ->
opts(Keys, Elems, Def) ->
[ opt(K, Elems, Def) || K <- Keys].
-get_authorize_jwt(CompSpec) ->
- case authorize_rpc:get_authorize_jwt(CompSpec) of
- [ok, JWT] ->
- JWT;
+get_credentials(CompSpec) ->
+ case authorize_rpc:get_credentials(CompSpec) of
+ [ok, Creds] ->
+ Creds;
[not_found] ->
- ?error("No authorize JWT~n", []),
- error(cannot_authorize)
+ ?error("No credentials found~n", []),
+ error(no_credentials_found)
end.
-get_certificates(CompSpec) ->
- case authorize_rpc:get_certificates(CompSpec) of
- [ok, Certs] ->
- Certs;
- [not_found] ->
- ?error("No certificate found~n", []),
- error(no_certificate_found)
- end.
+start_log(Pfx, Fmt, Args, CS) ->
+ LogId = rvi_log:new_id(Pfx),
+ rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)),
+ rvi_common:set_value(rvi_log_id, LogId, CS).
-validate_auth_jwt(JWT, Certs, Conn, CompSpec) ->
- case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of
- [ok] ->
- true;
- [not_found] ->
- false
- end.
+log(Fmt, Args, CS) ->
+ log(info, Fmt, Args, CS).
+
+log(Lvl, Fmt, Args, CS) ->
+ rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS).
+
+log_id_tail(CompSpec) ->
+ rvi_common:log_id_json_tail(CompSpec).