diff options
Diffstat (limited to 'components/dlink_bt/src/dlink_bt_rpc.erl')
-rw-r--r-- | components/dlink_bt/src/dlink_bt_rpc.erl | 296 |
1 files changed, 124 insertions, 172 deletions
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index ab5f8ac..542fd00 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -34,7 +34,7 @@ -include_lib("lager/include/log.hrl"). -include_lib("rvi_common/include/rvi_common.hrl"). --include_lib("rvi_common/include/rvi_dlink.hrl"). +-include_lib("rvi_common/include/rvi_dlink_bin.hrl"). -define(PERSISTENT_CONNECTIONS, persistent_connections). -define(DEFAULT_BT_CHANNEL, 1). @@ -60,6 +60,7 @@ }). -record(st, { + mode = bt, %% tcp | bt cs = #component_spec{} }). @@ -93,9 +94,11 @@ init([]) -> { keypos, #connection_entry.connection }]), CS = rvi_common:get_component_specification(), + Mode = get_mode(CS), service_discovery_rpc:subscribe(CS, ?MODULE), {ok, #st { + mode = Mode, cs = CS } }. @@ -105,15 +108,19 @@ start_json_server() -> start_connection_manager() -> + ?debug("start_connection_manager()", []), CompSpec = rvi_common:get_component_specification(), - {ok, BertOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - server_opts, - [], - CompSpec), + ServerOpts = get_server_opts(CompSpec), + start_connection_manager(ServerOpts, CompSpec). + +start_connection_manager([], _) -> + ?debug("No BT server options set; start only the connection manager", []), + bt_connection_manager:start_link(); +start_connection_manager(ServerOpts, CompSpec) -> %% Retrieve the channel we should use - Mode = get_mode(BertOpts), - Channel = get_channel(Mode, BertOpts), + ?debug("ServerOpts = ~p", [ServerOpts]), + Mode = get_mode(ServerOpts), + Channel = get_channel(Mode, ServerOpts), ?info("dlink_bt:init_rvi_component(~p): Starting listener.", [self()]), @@ -124,6 +131,7 @@ start_connection_manager() -> bt:start(), bt:debug(debug); tcp -> + ?debug("Mode == tcp; not starting bt driver", []), ok end, bt_listener:start_link(Mode), @@ -146,11 +154,21 @@ start_connection_manager() -> [], CompSpec), - setup_persistent_connections_(PersistentConnections, CompSpec), + setup_persistent_connections_(PersistentConnections, Mode, CompSpec), ok. -get_mode(Opts) -> +get_server_opts(CS) when element(1, CS) == component_spec -> + {ok, ServerOpts } = rvi_common:get_module_config(data_link, + ?MODULE, + server_opts, + [], + CS), + ServerOpts. + +get_mode(CS) when element(1, CS) == component_spec -> + get_mode(get_server_opts(CS)); +get_mode(Opts) when is_list(Opts) -> proplists:get_value(test_mode, Opts, bt). get_channel(tcp, Opts) -> @@ -159,15 +177,15 @@ get_channel(bt, Opts) -> proplists:get_value(channel, Opts, ?DEFAULT_BT_CHANNEL). -setup_persistent_connections_([ ], _CompSpec) -> +setup_persistent_connections_([ ], _, _CompSpec) -> ok; -setup_persistent_connections_([ BTAddress | T], CompSpec) -> +setup_persistent_connections_([ BTAddress | T], Mode, CompSpec) -> ?debug("~p: Will persistently connect connect : ~p", [self(), BTAddress]), - [ BTAddr, Channel] = string:tokens(BTAddress, "-"), - connect_and_retry_remote(BTAddr, Channel, CompSpec), - setup_persistent_connections_(T, CompSpec), + [ BTAddr, Channel] = string:tokens(BTAddress, "-:"), %% Addr-Chan | IP:Port + connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec), + setup_persistent_connections_(T, Mode, CompSpec), ok. @@ -213,7 +231,7 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> %% %% Connect to a remote RVI node. %% -connect_remote(BTAddr, Channel, CompSpec) -> +connect_remote(BTAddr, Channel, Mode, CompSpec) -> case bt_connection_manager:find_connection_by_address(BTAddr, Channel) of { ok, _Pid } -> already_connected; @@ -225,7 +243,7 @@ connect_remote(BTAddr, Channel, CompSpec) -> %%FIXME %% Setup a genserver around the new connection. - case bt_connection:connect(BTAddr, Channel, + case bt_connection:connect(BTAddr, Channel, Mode, ?MODULE, handle_socket, CompSpec ) of { ok, Pid } -> ?info("dlink_bt:connect_remote(): Connection in progress ~p:~p - Proc ~p", @@ -239,45 +257,33 @@ connect_remote(BTAddr, Channel, CompSpec) -> end end. - -connect_and_retry_remote( BTAddr, Channel, CompSpec) -> +connect_and_retry_remote( BTAddr, Channel, Mode, CompSpec) -> ?info("dlink_bt:connect_and_retry_remote(): ~p:~p", [ BTAddr, Channel]), - case connect_remote(BTAddr, list_to_integer(Channel), CompSpec) of - ok -> ok; - + CS = start_log(<<"conn">>, "connect ~s:~s", [BTAddr, Channel], CompSpec), + case connect_remote(BTAddr, list_to_integer(Channel), Mode, CS) of + ok -> + ok; Err -> %% Failed to connect. Sleep and try again ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Failed: ~p", - [BTAddr, Channel, Err]), - - ?notice("dlink_bt:connect_and_retry_remote(~p:~p): Will try again in ~p sec", - [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]), - - setup_reconnect_timer(?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CompSpec), - + [BTAddr, Channel, Err]), + ?notice("dlink_bt:connect_and_retry_remote(~p:~p):" + " Will try again in ~p sec", + [BTAddr, Channel, ?DEFAULT_RECONNECT_INTERVAL]), + setup_reconnect_timer( + ?DEFAULT_RECONNECT_INTERVAL, BTAddr, Channel, CS), not_available end. - - announce_local_service_(_CompSpec, [], _Service, _Availability) -> ok; announce_local_service_(CompSpec, [ConnPid | T], Service, Availability) -> - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(Availability, [Service])), - Res = bt_connection:send(ConnPid, - term_to_json( - { struct, - [ - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_TRANSACTION_ID, 3}, - { ?DLINK_ARG_SIGNATURE, JWT } - ] - })), + Msg = availability_msg(Availability, [Service], CompSpec), + Res = bt_connection:send(ConnPid, Msg), ?debug("dlink_bt:announce_local_service(~p: ~p) -> ~p Res: ~p", [ Availability, Service, ConnPid, Res]), @@ -299,24 +305,23 @@ announce_local_service_(CompSpec, Service, Availability) -> process_data(_FromPid, RemoteBTAddr, RemoteChannel, ProtocolMod, Data, CompSpec) -> ?debug("dlink_bt:receive_data(): SetupAddress: {~p, ~p}", [ RemoteBTAddr, RemoteChannel ]), ?debug("dlink_bt:receive_data(): ~p:receive_message(~p)", [ ProtocolMod, Data ]), - Proto = list_to_atom(ProtocolMod), - Proto:receive_message(CompSpec, base64:decode_to_string(Data)), + Proto = list_to_existing_atom(ProtocolMod), + Proto:receive_message(CompSpec, {RemoteBTAddr, RemoteChannel}, Data), ok. -availability_msg(Availability, Services) -> - {struct, [{?DLINK_ARG_STATUS, status_string(Availability)}, - {?DLINK_ARG_SERVICES, {array, Services}}]}. +availability_msg(Availability, Services, CompSpec) -> + [{?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE}, + {?DLINK_ARG_STATUS, status_string(Availability)}, + {?DLINK_ARG_SERVICES, Services} + | log_id_tail(CompSpec)]. status_string(available ) -> ?DLINK_ARG_AVAILABLE; status_string(unavailable) -> ?DLINK_ARG_UNAVAILABLE. -process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> - {ok, Avail} = rvi_common:get_json_element([?DLINK_ARG_STATUS], Msg), - {ok, Svcs} = rvi_common:get_json_element([?DLINK_ARG_SERVICES], Msg), +process_announce(Avail, Svcs, FromPid, Addr, Channel, CompSpec) -> ?debug("dlink_bt_rpc:service_announce(~p): Address: ~p:~p", [Avail, Addr, Channel]), - ?debug("dlink_bt_rpc:service_announce(~p): TransactionID: ~p", [Avail, TID]), ?debug("dlink_bt_rpc:service_announce(~p): Services: ~p", [Avail, Svcs]), case Avail of ?DLINK_ARG_AVAILABLE -> @@ -327,23 +332,13 @@ process_availability(Msg, FromPid, Addr, Channel, TID, CompSpec) -> service_discovery_rpc:unregister_services(CompSpec, Svcs, ?MODULE) end. -process_authorize(FromPid, - PeerBTAddr, - PeerBTChannel, - TransactionID, - RemoteAddress, - RemoteChannel, - Protocol, - Certificates, - Signature, - CompSpec) -> - +process_authorize(FromPid, PeerBTAddr, PeerBTChannel, + RemoteAddress, RemoteChannel, Protocol, + Credentials, CompSpec) -> ?info("dlink_bt:authorize(): Peer Address: ~p:~p", [PeerBTAddr, PeerBTChannel ]), ?info("dlink_bt:authorize(): Remote Address: ~p~p", [ RemoteAddress, RemoteChannel ]), - ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]), - ?debug("dlink_bt:authorize(): TransactionID: ~p", [ TransactionID ]), - ?debug("dlink_bt:authorize(): Certificates: ~p", [ Certificates ]), - ?debug("dlink_bt:authorize(): Signature: ~p", [ Signature ]), + ?info("dlink_bt:authorize(): Protocol: ~p", [ Protocol ]), + ?debug("dlink_bt:authorize(): Credentials: ~p", [ Credentials ]), %% If FromPid (the genserver managing the socket) is not yet registered %% with the conneciton manager, this is an incoming connection @@ -351,59 +346,42 @@ process_authorize(FromPid, %% a service announce Conn = {RemoteAddress, RemoteChannel}, - case validate_auth_jwt(Signature, Certificates, Conn, CompSpec) of - true -> - connection_authorized(FromPid, Conn, CompSpec); - false -> - %% close connection (how?) - false - end. + log(result, "auth ~s:~w", [RemoteAddress, RemoteChannel], CompSpec), + authorize_rpc:store_creds(CompSpec, Credentials, Conn), + connection_authorized(FromPid, Conn, CompSpec). handle_socket(FromPid, PeerBTAddr, PeerChannel, data, - Payload, CompSpec) -> + Elems, CompSpec) -> - {ok, {struct, Elems}} = exo_json:decode_string(binary_to_list(Payload)), ?debug("dlink_bt:data(): Got ~p", [ Elems ]), + CS = rvi_common:pick_up_json_log_id(Elems, CompSpec), + case opt(?DLINK_ARG_CMD, Elems, undefined) of ?DLINK_CMD_AUTHORIZE -> - [ TransactionID, - RemoteAddress, + [ RemoteAddress, RemoteChannel, RVIProtocol, - CertificatesTmp, - Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, - ?DLINK_ARG_ADDRESS, + Credentials ] = + opts([?DLINK_ARG_ADDRESS, ?DLINK_ARG_PORT, ?DLINK_ARG_VERSION, - ?DLINK_ARG_CERTIFICATES, - ?DLINK_ARG_SIGNATURE], + ?DLINK_ARG_CREDENTIALS], Elems, undefined), - Certificates = - case CertificatesTmp of - { array, C} -> C; - undefined -> [] - end, process_authorize(FromPid, PeerBTAddr, RemoteChannel, - TransactionID, RemoteAddress, RemoteChannel, - RVIProtocol, Certificates, Signature, CompSpec); - - + RemoteAddress, RemoteChannel, + RVIProtocol, Credentials, CS); ?DLINK_CMD_SERVICE_ANNOUNCE -> - Conn = {PeerBTAddr, PeerChannel}, - [ TransactionID, Signature ] = - opts([?DLINK_ARG_TRANSACTION_ID, ?DLINK_ARG_SIGNATURE], + [ Status, + Services ] = + opts([?DLINK_ARG_STATUS, + ?DLINK_ARG_SERVICES], Elems, undefined), - case authorize_rpc:validate_message(CompSpec, Signature, Conn) of - [ok, Msg] -> - process_availability( - Msg, FromPid, PeerBTAddr, PeerChannel, TransactionID, CompSpec); - _ -> - ?debug("Couldn't validate availability msg from ~p", [Conn]) - end; + log("sa from ~s:~w", [PeerBTAddr, PeerChannel], CS), + process_announce(Status, Services, FromPid, PeerBTAddr, + PeerChannel, CS); ?DLINK_CMD_RECEIVE -> [ _TransactionID, @@ -414,14 +392,14 @@ handle_socket(FromPid, PeerBTAddr, PeerChannel, data, ?DLINK_ARG_DATA], Elems, undefined), process_data(FromPid, PeerBTAddr, PeerChannel, - ProtocolMod, Data, CompSpec); + ProtocolMod, Data, CS); ?DLINK_CMD_PING -> ?info("dlink_bt:ping(): Pinged from: ~p:~p", [ PeerBTAddr, PeerChannel]), ok; undefined -> - ?warning("dlink_bt:data() cmd undefined., ~p", [ Elems ]), + ?error("dlink_bt:data() cmd undefined., ~p", [ Elems ]), ok end. @@ -567,7 +545,7 @@ handle_cast(Other, St) -> {noreply, St}. -handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> +handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, #st{mode = Mode} = St) -> %% Do we already have a connection that supchannel service? case get_connections_by_service(Service) of [] -> %% Nope @@ -580,7 +558,7 @@ handle_call({rvi, setup_data_link, [ Service, Opts ]}, _From, St) -> Addr -> [ Address, Channel] = string:tokens(Addr, "-"), - case connect_remote(Address, list_to_integer(Channel), St#st.cs) of + case connect_remote(Address, list_to_integer(Channel), Mode, St#st.cs) of ok -> { reply, [ok, 2000], St }; %% 2 second timeout @@ -613,27 +591,20 @@ handle_call({rvi, send_data, [ProtoMod, Service, Data, _DataLinkOpts]}, _From, S %% FIXME: What to do if we have multiple connections to the same service? [ConnPid | _T] -> ?debug("dlink_bt:send(~p): ~s", [ProtoMod, Data]), - Res = bt_connection:send(ConnPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, - { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, - { ?DLINK_ARG_DATA, base64:encode_to_string(Data) } - ]})), - + Res = bt_connection:send( + ConnPid, + [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, + { ?DLINK_ARG_CMD, ?DLINK_CMD_RECEIVE }, + { ?DLINK_ARG_MODULE, atom_to_list(ProtoMod) }, + { ?DLINK_ARG_DATA, Data } + ]), { reply, [ Res ], St} end; - - handle_call({setup_initial_ping, Address, Channel, Pid}, _From, St) -> %% Create a timer to handle periodic pings. - {ok, ServerOpts } = rvi_common:get_module_config(data_link, - ?MODULE, - server_opts, [], - St#st.cs), + ServerOpts = get_server_opts(St#st.cs), Timeout = proplists:get_value(ping_interval, ServerOpts, ?DEFAULT_PING_INTERVAL), ?info("dlink_bt:setup_ping(): ~p:~p will be pinged every ~p msec", @@ -656,12 +627,7 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) -> case bt_connection:is_connection_up(Pid) of true -> ?info("dlink_bt:ping(): Pinging: ~p:~p", [Address, Channel]), - bt_connection:send(Pid, term_to_json( - { struct, - [ { ?DLINK_ARG_CMD, - ?DLINK_CMD_PING - }]})), - + bt_connection:send(Pid, [ { ?DLINK_ARG_CMD, ?DLINK_CMD_PING }]), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Channel, Timeout }); @@ -671,8 +637,9 @@ handle_info({ rvi_ping, Pid, Address, Channel, Timeout}, St) -> {noreply, St}; %% Setup static nodes -handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, St) -> - connect_and_retry_remote(BTAddr, Channel, CompSpec), +handle_info({ rvi_setup_persistent_connection, BTAddr, Channel, CompSpec }, + #st{mode = Mode} = St) -> + connect_and_retry_remote(BTAddr, Channel, Mode, CompSpec), { noreply, St }; handle_info(Info, St) -> @@ -688,17 +655,15 @@ code_change(_OldVsn, St, _Extra) -> send_authorize(Pid, SetupChannel, CompSpec) -> {ok,[{address, Address }]} = bt_drv:local_info([address]), bt_connection:send(Pid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, - { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, - { ?DLINK_ARG_PORT, SetupChannel }, - { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, - { ?DLINK_ARG_CERTIFICATES, {array, get_certificates(CompSpec)} }, - { ?DLINK_ARG_SIGNATURE, get_authorize_jwt(CompSpec) } ]})). + [{ ?DLINK_ARG_CMD, ?DLINK_CMD_AUTHORIZE }, + { ?DLINK_ARG_ADDRESS, bt_address_to_string(Address) }, + { ?DLINK_ARG_PORT, SetupChannel }, + { ?DLINK_ARG_VERSION, ?DLINK_BT_VER }, + { ?DLINK_ARG_CREDENTIALS, get_credentials(CompSpec) } + | log_id_tail(CompSpec)]). connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) -> + log("authorized: ~s:~p", [RemoteAddress, RemoteChannel], CompSpec), case bt_connection_manager:find_connection_by_pid(FromPid) of not_found -> ?info("dlink_bt:authorize(): New connection!"), @@ -721,15 +686,9 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) ?info("dlink_bt:authorize(): Announcing local services: ~p to remote ~p:~p", [FilteredServices, RemoteAddress, RemoteChannel]), - [ ok, JWT ] = authorize_rpc:sign_message( - CompSpec, availability_msg(available, FilteredServices)), - bt_connection:send(FromPid, - term_to_json( - {struct, - [ { ?DLINK_ARG_TRANSACTION_ID, 1 }, - { ?DLINK_ARG_CMD, ?DLINK_CMD_SERVICE_ANNOUNCE }, - { ?DLINK_ARG_SIGNATURE, JWT } ]})), - + AvailabilityMsg = availability_msg(available, FilteredServices, CompSpec), + log("sending sa: ~s:~w", [RemoteAddress, RemoteChannel], CompSpec), + bt_connection:send(FromPid, AvailabilityMsg), %% Setup ping interval gen_server:call(?SERVER, { setup_initial_ping, RemoteAddress, RemoteChannel, FromPid }), ok. @@ -828,10 +787,6 @@ get_connections(Key, Acc) -> get_connections() -> get_connections(ets:first(?CONNECTION_TABLE), []). - -term_to_json(Term) -> - binary_to_list(iolist_to_binary(exo_json:encode(Term))). - opt(K, L, Def) -> case lists:keyfind(K, 1, L) of {_, V} -> V; @@ -841,28 +796,25 @@ opt(K, L, Def) -> opts(Keys, Elems, Def) -> [ opt(K, Elems, Def) || K <- Keys]. -get_authorize_jwt(CompSpec) -> - case authorize_rpc:get_authorize_jwt(CompSpec) of - [ok, JWT] -> - JWT; +get_credentials(CompSpec) -> + case authorize_rpc:get_credentials(CompSpec) of + [ok, Creds] -> + Creds; [not_found] -> - ?error("No authorize JWT~n", []), - error(cannot_authorize) + ?error("No credentials found~n", []), + error(no_credentials_found) end. -get_certificates(CompSpec) -> - case authorize_rpc:get_certificates(CompSpec) of - [ok, Certs] -> - Certs; - [not_found] -> - ?error("No certificate found~n", []), - error(no_certificate_found) - end. +start_log(Pfx, Fmt, Args, CS) -> + LogId = rvi_log:new_id(Pfx), + rvi_log:log(LogId, <<"dlink_tcp">>, rvi_log:format(Fmt, Args)), + rvi_common:set_value(rvi_log_id, LogId, CS). -validate_auth_jwt(JWT, Certs, Conn, CompSpec) -> - case authorize_rpc:validate_authorization(CompSpec, JWT, Certs, Conn) of - [ok] -> - true; - [not_found] -> - false - end. +log(Fmt, Args, CS) -> + log(info, Fmt, Args, CS). + +log(Lvl, Fmt, Args, CS) -> + rvi_log:flog(Lvl, Fmt, Args, <<"dlink_tcp">>, CS). + +log_id_tail(CompSpec) -> + rvi_common:log_id_json_tail(CompSpec). |