From 2dd27e3c33de6c2b31e4cdc30a345b3c64e3040a Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Fri, 14 Aug 2015 19:32:22 +0200 Subject: added svc reg+call tests + bug fixes --- components/authorize/src/authorize_keys.erl | 100 ++++++++++++--------- components/authorize/src/authorize_rpc.erl | 24 ++--- components/dlink_bt/src/dlink_bt_rpc.erl | 2 +- components/dlink_sms/src/dlink_sms_rpc.erl | 2 +- components/dlink_tcp/src/connection_manager.erl | 5 ++ components/dlink_tcp/src/dlink_tcp_rpc.erl | 11 ++- components/rvi_common/src/exoport_exo_http.erl | 11 ++- components/rvi_common/src/rvi_common.erl | 35 ++++++-- .../src/service_discovery_rpc.erl | 27 ++++-- components/service_edge/src/service_edge_rpc.erl | 2 +- 10 files changed, 141 insertions(+), 78 deletions(-) (limited to 'components') diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index f68776a..5edb3cb 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -12,8 +12,8 @@ -export([get_certificates/0, get_certificates/1]). -export([validate_message/2]). --export([filter_by_destination/2, - find_cert_by_destination/1]). +-export([filter_by_service/2, + find_cert_by_service/1]). -export([public_key_to_json/1, json_to_public_key/1]). @@ -35,8 +35,8 @@ authorize_jwt}). -record(cert, {id, - sources = [], - destinations = [], + register = [], + invoke = [], validity = [], jwt, cert}). @@ -107,11 +107,11 @@ get_certificates() -> get_certificates(Conn) -> gen_server:call(?MODULE, {get_certificates, Conn}). -filter_by_destination(Services, Conn) -> - gen_server:call(?MODULE, {filter_by_destination, Services, Conn}). +filter_by_service(Services, Conn) -> + gen_server:call(?MODULE, {filter_by_service, Services, Conn}). -find_cert_by_destination(Service) -> - gen_server:call(?MODULE, {find_cert_by_destination, Service}). +find_cert_by_service(Service) -> + gen_server:call(?MODULE, {find_cert_by_service, Service}). provisioning_key() -> gen_server:call(?MODULE, provisioning_key). @@ -178,14 +178,14 @@ handle_call_({save_cert, Cert, JWT, Conn}, _, S) -> ets:insert(?CERTS, {{Conn, C#cert.id}, C}), {reply, ok, S} end; -handle_call_({filter_by_destination, Services, Conn} =R, _From, State) -> +handle_call_({filter_by_service, Services, Conn} =R, _From, State) -> ?debug("authorize_keys:handle_call(~p,...)~n", [R]), - Filtered = filter_by_destination_(Services, Conn), + Filtered = filter_by_service_(Services, Conn), ?debug("Filtered = ~p~n", [Filtered]), {reply, Filtered, State}; -handle_call_({find_cert_by_destination, Service} = R, _From, State) -> +handle_call_({find_cert_by_service, Service} = R, _From, State) -> ?debug("authorize_keys:handle_call(~p,...)~n", [R]), - Res = find_cert_by_destination_(Service), + Res = find_cert_by_service_(Service), ?debug("Res = ~p~n", [Res]), {reply, Res, State}; handle_call_(_, _, S) -> @@ -215,34 +215,39 @@ certs_by_conn(Conn) -> ?debug("rough selection: ~p~n", [Certs]), [C || {C,V} <- Certs, check_validity(V, UTC)]. -filter_by_destination_(Services, Conn) -> - Dests = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{destinations = '$1', +filter_by_service_(Services, Conn) -> + ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]), + Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1', _ = '_'}}, [], ['$1'] }]), - ?debug("Dests by conn (~p) -> ~p~n", [Conn, Dests]), - filter_svcs_by_dest_(Services, Dests). + ?debug("Services by conn (~p) -> ~p~n", [Conn, Invoke]), + filter_svcs_(Services, Invoke). -filter_svcs_by_dest_([S|Svcs], Dests) -> +filter_svcs_([S|Svcs], Invoke) -> case lists:any(fun(Ds) -> lists:any( fun(D) -> - match_dest(D, S) + match_svc(D, S) end, Ds) - end, Dests) of + end, Invoke) of true -> - [S|filter_svcs_by_dest_(Svcs, Dests)]; + [S|filter_svcs_(Svcs, Invoke)]; false -> - filter_svcs_by_dest_(Svcs, Dests) + filter_svcs_(Svcs, Invoke) end; -filter_svcs_by_dest_([], _) -> +filter_svcs_([], _) -> []. -find_cert_by_destination_(Service) -> +find_cert_by_service_(Service) -> SvcParts = split_path(strip_prot(Service)), LocalCerts = ets:select(?CERTS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]), + ?debug("find_cert_by_service(~p~nLocalCerts = ~p~n", + [Service, [{Id,Reg,Inv} || #cert{id = Id, + invoke = Inv, + register = Reg} <- LocalCerts]]), case lists:foldl( - fun(#cert{destinations = Dests} = C, {Max, _} = Acc) -> - case match_length(Dests, SvcParts) of + fun(#cert{register = Register} = C, {Max, _} = Acc) -> + case match_length(Register, SvcParts) of L when L > Max -> {L, C}; _ -> @@ -255,12 +260,12 @@ find_cert_by_destination_(Service) -> {ok, Found#cert.jwt} end. -match_length(Dests, Svc) -> +match_length(Invoke, Svc) -> R = lists:foldl(fun(D, Max) -> DParts = split_path(strip_prot(D)), erlang:max(match_length_(DParts, Svc), Max) - end, 0, Dests), - ?debug("match_length(~p,~p) -> ~p~n", [Dests, Svc, R]), + end, 0, Invoke), + ?debug("match_length(~p,~p) -> ~p~n", [Invoke, Svc, R]), R. match_length_(D, Svc) -> @@ -277,11 +282,11 @@ match_length_([_|_], [], _) -> match_length_([], _, L) -> L. -match_dest(D, S) -> +match_svc(D, S) -> A = split_path(strip_prot(D)), B = split_path(strip_prot(S)), - ?debug("match_dest_(~p, ~p)~n", [A, B]), - match_dest_(A, B). + ?debug("match_svc_(~p, ~p)~n", [A, B]), + match_svc_(A, B). strip_prot(P) -> case re:split(P, ":", [{return,list}]) of @@ -292,13 +297,13 @@ strip_prot(P) -> split_path(P) -> re:split(P, "/", [{return, list}]). -match_dest_([H|T], [H|T1]) -> - match_dest_(T, T1); -match_dest_(["+"|T], [_|T1]) -> - match_dest_(T, T1); -match_dest_([], _) -> +match_svc_([H|T], [H|T1]) -> + match_svc_(T, T1); +match_svc_(["+"|T], [_|T1]) -> + match_svc_(T, T1); +match_svc_([], _) -> true; -match_dest_(_, _) -> +match_svc_(_, _) -> false. get_env(K) -> @@ -412,11 +417,20 @@ process_cert_struct(Cert, Bin) -> process_cert_struct(Cert, Bin, rvi_common:utc_timestamp()). process_cert_struct(Cert, Bin, UTC) -> + try process_cert_struct_(Cert, Bin, UTC) + catch + error:Err -> + ?warning("Failure processing Cert ~p~n~p", + [Cert, {Err, erlang:get_stacktrace()}]), + invalid + end. + +process_cert_struct_(Cert, Bin, UTC) -> ID = cert_id(Cert), - {ok, Sources} = rvi_common:get_json_element( - ["sources"], Cert), - {ok, Dests} = rvi_common:get_json_element( - ["destinations"], Cert), + {ok, Register} = rvi_common:get_json_element( + [{'OR', ["sources", "register"]}], Cert), + {ok, Invoke} = rvi_common:get_json_element( + [{'OR', ["destinations", "invoke"]}], Cert), {ok, Start} = rvi_common:get_json_element( ["validity", "start"], Cert), {ok, Stop} = rvi_common:get_json_element( @@ -426,8 +440,8 @@ process_cert_struct(Cert, Bin, UTC) -> case check_validity(Start, Stop, UTC) of true -> #cert{id = ID, - sources = Sources, - destinations = Dests, + register = Register, + invoke = Invoke, validity = Validity, jwt = Bin, cert = Cert}; diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl index 46a180f..155bd4f 100644 --- a/components/authorize/src/authorize_rpc.erl +++ b/components/authorize/src/authorize_rpc.erl @@ -25,7 +25,7 @@ validate_authorization/4, authorize_local_message/3, authorize_remote_message/3]). --export([filter_by_destination/3]). +-export([filter_by_service/3]). %% for testing & development -export([sign/1, sign_default_cert/0]). @@ -69,7 +69,7 @@ get_certificate_body(_Service) -> [ %% Topic tree patterns that this node is authorized to %% process requests for. - { "sources", + { sources, { array, [ "jaguarlandrover.com/cloud/media_server" ] @@ -164,10 +164,10 @@ authorize_remote_message(CompSpec, Service, Params) -> {parameters, Params}], [status], CompSpec). -filter_by_destination(CompSpec, Services, Conn) -> - ?debug("authorize_rpc:filter_by_destination(): services: ~p ~n", [Services]), - ?debug("authorize_rpc:filter_by_destination(): conn: ~p ~n", [Conn]), - rvi_common:request(authorize, ?MODULE, filter_by_destination, +filter_by_service(CompSpec, Services, Conn) -> + ?debug("authorize_rpc:filter_by_service(): services: ~p ~n", [Services]), + ?debug("authorize_rpc:filter_by_service(): conn: ~p ~n", [Conn]), + rvi_common:request(authorize, ?MODULE, filter_by_service, [{ services, Services }, { conn, Conn }], [status, services], CompSpec). @@ -238,12 +238,12 @@ handle_rpc("authorize_remote_message", Args) -> [Service, Params]}), { ok, rvi_common:json_rpc_status(Status)}; -handle_rpc("filter_by_destination", Args) -> - ?debug("authorize_rpc:handle_rpc(\"filter_by_destination\", ~p)~n", [Args]), +handle_rpc("filter_by_service", Args) -> + ?debug("authorize_rpc:handle_rpc(\"filter_by_service\", ~p)~n", [Args]), {ok, Services} = rvi_common:get_json_element(["services"], Args), {ok, Conn} = rvi_common:get_json_element(["conn"], Args), [ Status, FilteredServices ] = - gen_server:call(?SERVER, { rvi, filter_by_destination, + gen_server:call(?SERVER, { rvi, filter_by_service, [Services, Conn] }), {ok, [{status, rvi_common:json_rpc_status(Status)}, {services, {array, FilteredServices}}]}; @@ -295,7 +295,7 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) -> handle_call({rvi, authorize_local_message, [Service, Params] } = R, _From, #st{private_key = Key} = State) -> ?debug("authorize_rpc:handle_call(~p)~n", [R]), - case authorize_keys:find_cert_by_destination(Service) of + case authorize_keys:find_cert_by_service(Service) of {ok, Cert} -> Msg = Params ++ [{"certificate", Cert}], ?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", [Msg]), @@ -333,8 +333,8 @@ handle_call({rvi, authorize_remote_message, [_Service, Params]}=R, end end; -handle_call({rvi, filter_by_destination, [Services, Conn]}, _From, State) -> - Filtered = authorize_keys:filter_by_destination(Services, Conn), +handle_call({rvi, filter_by_service, [Services, Conn]}, _From, State) -> + Filtered = authorize_keys:filter_by_service(Services, Conn), {reply, [ok, Filtered], State}; handle_call({sign, Term}, _From, #st{private_key = Key} = State) -> diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl index aba5f3b..c36c997 100644 --- a/components/dlink_bt/src/dlink_bt_rpc.erl +++ b/components/dlink_bt/src/dlink_bt_rpc.erl @@ -698,7 +698,7 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec) %% that just authorized to us. [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), - [ ok, FilteredServices ] = authorize_rpc:filter_by_destination( + [ ok, FilteredServices ] = authorize_rpc:filter_by_service( CompSpec, LocalServices, Conn), %% Send an authorize back to the remote node diff --git a/components/dlink_sms/src/dlink_sms_rpc.erl b/components/dlink_sms/src/dlink_sms_rpc.erl index 78630ae..4bf5f14 100644 --- a/components/dlink_sms/src/dlink_sms_rpc.erl +++ b/components/dlink_sms/src/dlink_sms_rpc.erl @@ -661,7 +661,7 @@ connection_authorized(FromPid, {RemoteAddr, 0} = Conn, CompSpec) -> %% that just authorized to us. [ok, LocalServices] = service_discovery_rpc:get_services_by_module(CompSpec, local), - [ok, FilteredServices] = authorize_rpc:filter_by_destination( + [ok, FilteredServices] = authorize_rpc:filter_by_service( CompSpec, LocalServices, Conn), %% Send an authorize back to the remote node diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl index c12a3fd..6a9f1e0 100644 --- a/components/dlink_tcp/src/connection_manager.erl +++ b/components/dlink_tcp/src/connection_manager.erl @@ -31,6 +31,7 @@ -export([delete_connection_by_address/2]). -export([find_connection_by_pid/1]). -export([find_connection_by_address/2]). +-export([connections/0]). -define(SERVER, ?MODULE). @@ -58,6 +59,8 @@ find_connection_by_pid(Pid) -> find_connection_by_address(IP, Port) -> gen_server:call(?SERVER, { find_connection_by_address, IP, Port } ). +connections() -> + gen_server:call(?SERVER, connections). %%-------------------------------------------------------------------- %% @doc @@ -202,6 +205,8 @@ handle_call({find_connection_by_address, IP, Port}, _From, {reply, {ok, Pid}, St} end; +handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) -> + {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St}; handle_call(_Request, _From, State) -> ?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]), diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index a498cbb..f253c9d 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -26,6 +26,7 @@ %% FIXME: Should be rvi_service_discovery behavior -export([service_available/3, service_unavailable/3]). +-export([connections/1]). -export([setup_data_link/3, disconnect_data_link/2, @@ -154,6 +155,8 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) -> { data_link_module, DataLinkModule }], CompSpec). +connections(CompSpec) -> + rvi_common:request(data_link, ?MODULE, connections, []). setup_data_link(CompSpec, Service, Opts) -> rvi_common:request(data_link, ?MODULE, setup_data_link, @@ -176,7 +179,6 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) -> ], [status], CompSpec). - %% End of behavior %% @@ -444,7 +446,10 @@ handle_rpc("send_data", Args) -> { ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args), [ Res ] = gen_server:call(?SERVER, { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}), {ok, [ {status, rvi_common:json_rpc_status(Res)} ]}; - + +handle_rpc("connections", []) -> + Res = gen_server:call(?SERVER, connections), + {ok, [ {status, ok} | {connections, {array, Res}} ]}; handle_rpc(Other, _Args) -> ?info("dlink_tcp:handle_rpc(~p): unknown", [ Other ]), @@ -708,7 +713,7 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) -> %% that just authorized to us. [ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local), - [ ok, FilteredServices ] = authorize_rpc:filter_by_destination( + [ ok, FilteredServices ] = authorize_rpc:filter_by_service( CompSpec, LocalServices, Conn), %% Send an authorize back to the remote node diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl index 193b730..95801c6 100644 --- a/components/rvi_common/src/exoport_exo_http.erl +++ b/components/rvi_common/src/exoport_exo_http.erl @@ -36,22 +36,25 @@ instance(SupMod, AppMod, Opts) -> handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == 'POST' -> try decode_json(Body) of {call, Id, Method, Args} -> - case handle_rpc(AppMod, Method, Args) of + try handle_rpc(AppMod, Method, Args) of {ok, Reply} -> success_response(Socket, Id, Reply); ok -> ok; {error, Error} -> error_response(Socket, Id, Error) + catch + error:Reason -> + ?debug("~p:handle_rpc(~p, ~p, ~p) ERROR: ~p~n~p", + [?MODULE, AppMod, Method, Args, Reason, + erlang:get_stacktrace()]), + error_response(Socket, Id, internal_error) end; - {notification, Method, Args} -> handle_notification(AppMod, Method, Args), exo_http_server:response(Socket, undefined, 200, "OK", ""); - {error, _} -> error_response(Socket, parse_error) - catch error:_ -> exo_http_server:response(Socket, undefined, 501, diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index 8028917..39beabf 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -219,6 +219,13 @@ notification(Component, InArg = [ Val || { _Key, Val } <- InArgPropList ], InArgSpec = [ Key || { Key, _Val } <- InArgPropList ], %% Figure out how we are to invoke this MFA. + ?debug("~p:notification(", [?MODULE]), + ?debug(" Component : ~p", [Component]), + ?debug(" Module : ~p", [Module]), + ?debug(" Function : ~p", [Function]), + ?debug(" InArgPropList : ~p", [InArgPropList]), + ?debug(" CompSpec : ~p", [CompSpec]), + case get_module_type(Component, Module, CompSpec) of %% We have a gen_server { ok, gen_server } -> @@ -233,6 +240,11 @@ notification(Component, JSONArg = json_argument(InArg, InArgSpec), ?debug("Sending ~p:~p(~p) -> ~p.", [Module, Function, InArg, JSONArg]), send_json_notification(URL, atom_to_list(Function), JSONArg), + ok; + { error, _ } = Error -> + ?warning("get_module_type(~p,~p,~p) -> ~p", + [Component, Module, CompSpec, Error]), + %% ignore ok end. @@ -331,20 +343,27 @@ get_json_element_([], JSON) -> get_json_element_([Elem | T], JSON ) when is_atom(Elem) -> get_json_element_([atom_to_list(Elem) | T], JSON); -get_json_element_([Elem | T], {struct, JSON} ) -> - Res = get_json_element_(T, proplists:get_value(Elem, JSON, undefined)), - Res; - -get_json_element_([Elem | T], {array, JSON} ) -> - Res = get_json_element_(T, proplists:get_value(Elem, JSON, undefined)), - Res; +get_json_element_([Elem | T], {Type, JSON} ) when Type==array; Type==struct -> + case Elem of + {'OR', Alts} -> + get_json_element_(T, get_json_element_alt(Alts, JSON)); + _ -> + get_json_element_(T, proplists:get_value(Elem, JSON, undefined)) + end; get_json_element_(Path,JSON) -> ?warning("get_json_element_(): Unhandled: Path: ~p | JSON: ~p", [Path, JSON]), { error, undefined }. - +get_json_element_alt(Alts, [{K, V}|T]) -> + case lists:member(K, Alts) of + true -> V; + false -> get_json_element_alt(Alts, T) + end; +get_json_element_alt(_, []) -> + undefined. + json_reply(ArgList, JSON) -> retrieve_json_reply_elements(ArgList, JSON, []). diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl index b3d2a4d..e4ae226 100644 --- a/components/service_discovery/src/service_discovery_rpc.erl +++ b/components/service_discovery/src/service_discovery_rpc.erl @@ -91,12 +91,20 @@ get_modules_by_service(CompSpec, Service) -> register_services(CompSpec, Services, DataLinkModule) -> + ?debug("~p:register_services()", [?MODULE]), + ?debug(" CompSpec : ", [CompSpec]), + ?debug(" Services : ", [Services]), + ?debug(" DataLinkMod : ", [DataLinkModule]), rvi_common:notification(service_discovery, ?MODULE, register_services, [{ services, Services }, { data_link_module, DataLinkModule }], CompSpec). unregister_services(CompSpec, Services, DataLinkModule) -> + ?debug("~p:unregister_services()", [?MODULE]), + ?debug(" CompSpec : ", [CompSpec]), + ?debug(" Services : ", [Services]), + ?debug(" DataLinkMod : ", [DataLinkModule]), rvi_common:notification(service_discovery, ?MODULE, unregister_services, [{ services, Services }, { data_link_module, DataLinkModule}], @@ -192,23 +200,32 @@ handle_rpc( Other, _Args) -> ?info("svc_disc:handle_rpc(~p): unknown", [ Other ]), {ok, [ { status, invalid_command } ]}. - -handle_call({rvi, get_all_services, _Args}, _From, St) -> +handle_call(Req, From, St) -> + try handle_call_(Req, From, St) + catch + error:Reason -> + ?debug("~p:handle_call_(~p,~p,~p) -> ERROR: ~p~n~p", + [?MODULE, Req, From, St, Reason, + erlang:get_stacktrace()]), + {reply, [internal_error], St} + end. + +handle_call_({rvi, get_all_services, _Args}, _From, St) -> Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) -> [ ServiceName | Acc ] end, [], ?SERVICE_TABLE), {reply, [ok, Svcs], St }; -handle_call({rvi, get_services_by_module, [Module]}, _From, St) -> +handle_call_({rvi, get_services_by_module, [Module]}, _From, St) -> {reply, [ok, get_services_by_module_(Module)], St }; -handle_call({rvi, get_modules_by_service, [Service]}, _From, St) -> +handle_call_({rvi, get_modules_by_service, [Service]}, _From, St) -> {reply, [ok, get_modules_by_service_(Service)], St }; -handle_call(Other, _From, St) -> +handle_call_(Other, _From, St) -> ?warning("svc_disc:handle_call(~p): unknown", [ Other ]), { reply, [unknown_command] , St}. diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 2f73913..2048358 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -482,7 +482,7 @@ handle_cast({rvi, handle_remote_message, SvcName, [{remote_ip, IP}, {remote_port, Port}, - {service, SvcName}, + {service_name, SvcName}, {timeout, Timeout}, {parameters, Parameters}, {signature, Signature}]) of -- cgit v1.2.1