summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2015-08-14 19:32:22 +0200
committerUlf Wiger <ulf@feuerlabs.com>2015-08-14 19:32:22 +0200
commit2dd27e3c33de6c2b31e4cdc30a345b3c64e3040a (patch)
tree5620cfbed7605baa2c1325fbbc7ca56157164616
parent54974ae9ead2200104cb35b5e34fff8030a06cc9 (diff)
downloadrvi_core-2dd27e3c33de6c2b31e4cdc30a345b3c64e3040a.tar.gz
added svc reg+call tests + bug fixes
-rw-r--r--components/authorize/src/authorize_keys.erl100
-rw-r--r--components/authorize/src/authorize_rpc.erl24
-rw-r--r--components/dlink_bt/src/dlink_bt_rpc.erl2
-rw-r--r--components/dlink_sms/src/dlink_sms_rpc.erl2
-rw-r--r--components/dlink_tcp/src/connection_manager.erl5
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl11
-rw-r--r--components/rvi_common/src/exoport_exo_http.erl11
-rw-r--r--components/rvi_common/src/rvi_common.erl35
-rw-r--r--components/service_discovery/src/service_discovery_rpc.erl27
-rw-r--r--components/service_edge/src/service_edge_rpc.erl2
-rwxr-xr-xpython/rvi_call.py2
-rwxr-xr-xpython/rvi_service.py2
-rwxr-xr-xscripts/rvi_create_certificate.py21
-rwxr-xr-xscripts/setup_genbin68196 -> 68196 bytes
-rw-r--r--test/config/backend.config3
-rw-r--r--test/config/sample.config7
-rw-r--r--test/rvi_core_SUITE.erl181
17 files changed, 342 insertions, 93 deletions
diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl
index f68776a..5edb3cb 100644
--- a/components/authorize/src/authorize_keys.erl
+++ b/components/authorize/src/authorize_keys.erl
@@ -12,8 +12,8 @@
-export([get_certificates/0,
get_certificates/1]).
-export([validate_message/2]).
--export([filter_by_destination/2,
- find_cert_by_destination/1]).
+-export([filter_by_service/2,
+ find_cert_by_service/1]).
-export([public_key_to_json/1,
json_to_public_key/1]).
@@ -35,8 +35,8 @@
authorize_jwt}).
-record(cert, {id,
- sources = [],
- destinations = [],
+ register = [],
+ invoke = [],
validity = [],
jwt,
cert}).
@@ -107,11 +107,11 @@ get_certificates() ->
get_certificates(Conn) ->
gen_server:call(?MODULE, {get_certificates, Conn}).
-filter_by_destination(Services, Conn) ->
- gen_server:call(?MODULE, {filter_by_destination, Services, Conn}).
+filter_by_service(Services, Conn) ->
+ gen_server:call(?MODULE, {filter_by_service, Services, Conn}).
-find_cert_by_destination(Service) ->
- gen_server:call(?MODULE, {find_cert_by_destination, Service}).
+find_cert_by_service(Service) ->
+ gen_server:call(?MODULE, {find_cert_by_service, Service}).
provisioning_key() ->
gen_server:call(?MODULE, provisioning_key).
@@ -178,14 +178,14 @@ handle_call_({save_cert, Cert, JWT, Conn}, _, S) ->
ets:insert(?CERTS, {{Conn, C#cert.id}, C}),
{reply, ok, S}
end;
-handle_call_({filter_by_destination, Services, Conn} =R, _From, State) ->
+handle_call_({filter_by_service, Services, Conn} =R, _From, State) ->
?debug("authorize_keys:handle_call(~p,...)~n", [R]),
- Filtered = filter_by_destination_(Services, Conn),
+ Filtered = filter_by_service_(Services, Conn),
?debug("Filtered = ~p~n", [Filtered]),
{reply, Filtered, State};
-handle_call_({find_cert_by_destination, Service} = R, _From, State) ->
+handle_call_({find_cert_by_service, Service} = R, _From, State) ->
?debug("authorize_keys:handle_call(~p,...)~n", [R]),
- Res = find_cert_by_destination_(Service),
+ Res = find_cert_by_service_(Service),
?debug("Res = ~p~n", [Res]),
{reply, Res, State};
handle_call_(_, _, S) ->
@@ -215,34 +215,39 @@ certs_by_conn(Conn) ->
?debug("rough selection: ~p~n", [Certs]),
[C || {C,V} <- Certs, check_validity(V, UTC)].
-filter_by_destination_(Services, Conn) ->
- Dests = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{destinations = '$1',
+filter_by_service_(Services, Conn) ->
+ ?debug("Filter: certs = ~p", [ets:tab2list(?CERTS)]),
+ Invoke = ets:select(?CERTS, [{ {{Conn,'_'}, #cert{invoke = '$1',
_ = '_'}},
[], ['$1'] }]),
- ?debug("Dests by conn (~p) -> ~p~n", [Conn, Dests]),
- filter_svcs_by_dest_(Services, Dests).
+ ?debug("Services by conn (~p) -> ~p~n", [Conn, Invoke]),
+ filter_svcs_(Services, Invoke).
-filter_svcs_by_dest_([S|Svcs], Dests) ->
+filter_svcs_([S|Svcs], Invoke) ->
case lists:any(fun(Ds) ->
lists:any(
fun(D) ->
- match_dest(D, S)
+ match_svc(D, S)
end, Ds)
- end, Dests) of
+ end, Invoke) of
true ->
- [S|filter_svcs_by_dest_(Svcs, Dests)];
+ [S|filter_svcs_(Svcs, Invoke)];
false ->
- filter_svcs_by_dest_(Svcs, Dests)
+ filter_svcs_(Svcs, Invoke)
end;
-filter_svcs_by_dest_([], _) ->
+filter_svcs_([], _) ->
[].
-find_cert_by_destination_(Service) ->
+find_cert_by_service_(Service) ->
SvcParts = split_path(strip_prot(Service)),
LocalCerts = ets:select(?CERTS, [{ {{local,'_'}, '$1'}, [], ['$1'] }]),
+ ?debug("find_cert_by_service(~p~nLocalCerts = ~p~n",
+ [Service, [{Id,Reg,Inv} || #cert{id = Id,
+ invoke = Inv,
+ register = Reg} <- LocalCerts]]),
case lists:foldl(
- fun(#cert{destinations = Dests} = C, {Max, _} = Acc) ->
- case match_length(Dests, SvcParts) of
+ fun(#cert{register = Register} = C, {Max, _} = Acc) ->
+ case match_length(Register, SvcParts) of
L when L > Max ->
{L, C};
_ ->
@@ -255,12 +260,12 @@ find_cert_by_destination_(Service) ->
{ok, Found#cert.jwt}
end.
-match_length(Dests, Svc) ->
+match_length(Invoke, Svc) ->
R = lists:foldl(fun(D, Max) ->
DParts = split_path(strip_prot(D)),
erlang:max(match_length_(DParts, Svc), Max)
- end, 0, Dests),
- ?debug("match_length(~p,~p) -> ~p~n", [Dests, Svc, R]),
+ end, 0, Invoke),
+ ?debug("match_length(~p,~p) -> ~p~n", [Invoke, Svc, R]),
R.
match_length_(D, Svc) ->
@@ -277,11 +282,11 @@ match_length_([_|_], [], _) ->
match_length_([], _, L) ->
L.
-match_dest(D, S) ->
+match_svc(D, S) ->
A = split_path(strip_prot(D)),
B = split_path(strip_prot(S)),
- ?debug("match_dest_(~p, ~p)~n", [A, B]),
- match_dest_(A, B).
+ ?debug("match_svc_(~p, ~p)~n", [A, B]),
+ match_svc_(A, B).
strip_prot(P) ->
case re:split(P, ":", [{return,list}]) of
@@ -292,13 +297,13 @@ strip_prot(P) ->
split_path(P) ->
re:split(P, "/", [{return, list}]).
-match_dest_([H|T], [H|T1]) ->
- match_dest_(T, T1);
-match_dest_(["+"|T], [_|T1]) ->
- match_dest_(T, T1);
-match_dest_([], _) ->
+match_svc_([H|T], [H|T1]) ->
+ match_svc_(T, T1);
+match_svc_(["+"|T], [_|T1]) ->
+ match_svc_(T, T1);
+match_svc_([], _) ->
true;
-match_dest_(_, _) ->
+match_svc_(_, _) ->
false.
get_env(K) ->
@@ -412,11 +417,20 @@ process_cert_struct(Cert, Bin) ->
process_cert_struct(Cert, Bin, rvi_common:utc_timestamp()).
process_cert_struct(Cert, Bin, UTC) ->
+ try process_cert_struct_(Cert, Bin, UTC)
+ catch
+ error:Err ->
+ ?warning("Failure processing Cert ~p~n~p",
+ [Cert, {Err, erlang:get_stacktrace()}]),
+ invalid
+ end.
+
+process_cert_struct_(Cert, Bin, UTC) ->
ID = cert_id(Cert),
- {ok, Sources} = rvi_common:get_json_element(
- ["sources"], Cert),
- {ok, Dests} = rvi_common:get_json_element(
- ["destinations"], Cert),
+ {ok, Register} = rvi_common:get_json_element(
+ [{'OR', ["sources", "register"]}], Cert),
+ {ok, Invoke} = rvi_common:get_json_element(
+ [{'OR', ["destinations", "invoke"]}], Cert),
{ok, Start} = rvi_common:get_json_element(
["validity", "start"], Cert),
{ok, Stop} = rvi_common:get_json_element(
@@ -426,8 +440,8 @@ process_cert_struct(Cert, Bin, UTC) ->
case check_validity(Start, Stop, UTC) of
true ->
#cert{id = ID,
- sources = Sources,
- destinations = Dests,
+ register = Register,
+ invoke = Invoke,
validity = Validity,
jwt = Bin,
cert = Cert};
diff --git a/components/authorize/src/authorize_rpc.erl b/components/authorize/src/authorize_rpc.erl
index 46a180f..155bd4f 100644
--- a/components/authorize/src/authorize_rpc.erl
+++ b/components/authorize/src/authorize_rpc.erl
@@ -25,7 +25,7 @@
validate_authorization/4,
authorize_local_message/3,
authorize_remote_message/3]).
--export([filter_by_destination/3]).
+-export([filter_by_service/3]).
%% for testing & development
-export([sign/1, sign_default_cert/0]).
@@ -69,7 +69,7 @@ get_certificate_body(_Service) ->
[
%% Topic tree patterns that this node is authorized to
%% process requests for.
- { "sources",
+ { sources,
{ array, [
"jaguarlandrover.com/cloud/media_server"
]
@@ -164,10 +164,10 @@ authorize_remote_message(CompSpec, Service, Params) ->
{parameters, Params}],
[status], CompSpec).
-filter_by_destination(CompSpec, Services, Conn) ->
- ?debug("authorize_rpc:filter_by_destination(): services: ~p ~n", [Services]),
- ?debug("authorize_rpc:filter_by_destination(): conn: ~p ~n", [Conn]),
- rvi_common:request(authorize, ?MODULE, filter_by_destination,
+filter_by_service(CompSpec, Services, Conn) ->
+ ?debug("authorize_rpc:filter_by_service(): services: ~p ~n", [Services]),
+ ?debug("authorize_rpc:filter_by_service(): conn: ~p ~n", [Conn]),
+ rvi_common:request(authorize, ?MODULE, filter_by_service,
[{ services, Services },
{ conn, Conn }],
[status, services], CompSpec).
@@ -238,12 +238,12 @@ handle_rpc("authorize_remote_message", Args) ->
[Service, Params]}),
{ ok, rvi_common:json_rpc_status(Status)};
-handle_rpc("filter_by_destination", Args) ->
- ?debug("authorize_rpc:handle_rpc(\"filter_by_destination\", ~p)~n", [Args]),
+handle_rpc("filter_by_service", Args) ->
+ ?debug("authorize_rpc:handle_rpc(\"filter_by_service\", ~p)~n", [Args]),
{ok, Services} = rvi_common:get_json_element(["services"], Args),
{ok, Conn} = rvi_common:get_json_element(["conn"], Args),
[ Status, FilteredServices ] =
- gen_server:call(?SERVER, { rvi, filter_by_destination,
+ gen_server:call(?SERVER, { rvi, filter_by_service,
[Services, Conn] }),
{ok, [{status, rvi_common:json_rpc_status(Status)},
{services, {array, FilteredServices}}]};
@@ -295,7 +295,7 @@ handle_call({rvi, validate_authorization, [JWT, Certs, Conn] }, _From, State) ->
handle_call({rvi, authorize_local_message, [Service, Params] } = R, _From,
#st{private_key = Key} = State) ->
?debug("authorize_rpc:handle_call(~p)~n", [R]),
- case authorize_keys:find_cert_by_destination(Service) of
+ case authorize_keys:find_cert_by_service(Service) of
{ok, Cert} ->
Msg = Params ++ [{"certificate", Cert}],
?debug("authorize_rpc:authorize_local_message~nMsg = ~p~n", [Msg]),
@@ -333,8 +333,8 @@ handle_call({rvi, authorize_remote_message, [_Service, Params]}=R,
end
end;
-handle_call({rvi, filter_by_destination, [Services, Conn]}, _From, State) ->
- Filtered = authorize_keys:filter_by_destination(Services, Conn),
+handle_call({rvi, filter_by_service, [Services, Conn]}, _From, State) ->
+ Filtered = authorize_keys:filter_by_service(Services, Conn),
{reply, [ok, Filtered], State};
handle_call({sign, Term}, _From, #st{private_key = Key} = State) ->
diff --git a/components/dlink_bt/src/dlink_bt_rpc.erl b/components/dlink_bt/src/dlink_bt_rpc.erl
index aba5f3b..c36c997 100644
--- a/components/dlink_bt/src/dlink_bt_rpc.erl
+++ b/components/dlink_bt/src/dlink_bt_rpc.erl
@@ -698,7 +698,7 @@ connection_authorized(FromPid, {RemoteAddress, RemoteChannel} = Conn, CompSpec)
%% that just authorized to us.
[ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
- [ ok, FilteredServices ] = authorize_rpc:filter_by_destination(
+ [ ok, FilteredServices ] = authorize_rpc:filter_by_service(
CompSpec, LocalServices, Conn),
%% Send an authorize back to the remote node
diff --git a/components/dlink_sms/src/dlink_sms_rpc.erl b/components/dlink_sms/src/dlink_sms_rpc.erl
index 78630ae..4bf5f14 100644
--- a/components/dlink_sms/src/dlink_sms_rpc.erl
+++ b/components/dlink_sms/src/dlink_sms_rpc.erl
@@ -661,7 +661,7 @@ connection_authorized(FromPid, {RemoteAddr, 0} = Conn, CompSpec) ->
%% that just authorized to us.
[ok, LocalServices] = service_discovery_rpc:get_services_by_module(CompSpec, local),
- [ok, FilteredServices] = authorize_rpc:filter_by_destination(
+ [ok, FilteredServices] = authorize_rpc:filter_by_service(
CompSpec, LocalServices, Conn),
%% Send an authorize back to the remote node
diff --git a/components/dlink_tcp/src/connection_manager.erl b/components/dlink_tcp/src/connection_manager.erl
index c12a3fd..6a9f1e0 100644
--- a/components/dlink_tcp/src/connection_manager.erl
+++ b/components/dlink_tcp/src/connection_manager.erl
@@ -31,6 +31,7 @@
-export([delete_connection_by_address/2]).
-export([find_connection_by_pid/1]).
-export([find_connection_by_address/2]).
+-export([connections/0]).
-define(SERVER, ?MODULE).
@@ -58,6 +59,8 @@ find_connection_by_pid(Pid) ->
find_connection_by_address(IP, Port) ->
gen_server:call(?SERVER, { find_connection_by_address, IP, Port } ).
+connections() ->
+ gen_server:call(?SERVER, connections).
%%--------------------------------------------------------------------
%% @doc
@@ -202,6 +205,8 @@ handle_call({find_connection_by_address, IP, Port}, _From,
{reply, {ok, Pid}, St}
end;
+handle_call(connections, _From, #st{conn_by_addr = ConAddr} = St) ->
+ {reply, [Addr || {Addr, _} <- dict:to_list(ConAddr)], St};
handle_call(_Request, _From, State) ->
?warning("~p:handle_call(): Unknown call: ~p", [ ?MODULE, _Request]),
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index a498cbb..f253c9d 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -26,6 +26,7 @@
%% FIXME: Should be rvi_service_discovery behavior
-export([service_available/3,
service_unavailable/3]).
+-export([connections/1]).
-export([setup_data_link/3,
disconnect_data_link/2,
@@ -154,6 +155,8 @@ service_unavailable(CompSpec, SvcName, DataLinkModule) ->
{ data_link_module, DataLinkModule }],
CompSpec).
+connections(CompSpec) ->
+ rvi_common:request(data_link, ?MODULE, connections, []).
setup_data_link(CompSpec, Service, Opts) ->
rvi_common:request(data_link, ?MODULE, setup_data_link,
@@ -176,7 +179,6 @@ send_data(CompSpec, ProtoMod, Service, DataLinkOpts, Data) ->
],
[status], CompSpec).
-
%% End of behavior
%%
@@ -444,7 +446,10 @@ handle_rpc("send_data", Args) ->
{ ok, DataLinkOpts } = rvi_common:get_json_element(["opts"], Args),
[ Res ] = gen_server:call(?SERVER, { rvi, send_data, [ProtoMod, Service, Data, DataLinkOpts]}),
{ok, [ {status, rvi_common:json_rpc_status(Res)} ]};
-
+
+handle_rpc("connections", []) ->
+ Res = gen_server:call(?SERVER, connections),
+ {ok, [ {status, ok} | {connections, {array, Res}} ]};
handle_rpc(Other, _Args) ->
?info("dlink_tcp:handle_rpc(~p): unknown", [ Other ]),
@@ -708,7 +713,7 @@ connection_authorized(FromPid, {RemoteIP, RemotePort} = Conn, CompSpec) ->
%% that just authorized to us.
[ ok, LocalServices ] = service_discovery_rpc:get_services_by_module(CompSpec, local),
- [ ok, FilteredServices ] = authorize_rpc:filter_by_destination(
+ [ ok, FilteredServices ] = authorize_rpc:filter_by_service(
CompSpec, LocalServices, Conn),
%% Send an authorize back to the remote node
diff --git a/components/rvi_common/src/exoport_exo_http.erl b/components/rvi_common/src/exoport_exo_http.erl
index 193b730..95801c6 100644
--- a/components/rvi_common/src/exoport_exo_http.erl
+++ b/components/rvi_common/src/exoport_exo_http.erl
@@ -36,22 +36,25 @@ instance(SupMod, AppMod, Opts) ->
handle_body(Socket, Request, Body, AppMod) when Request#http_request.method == 'POST' ->
try decode_json(Body) of
{call, Id, Method, Args} ->
- case handle_rpc(AppMod, Method, Args) of
+ try handle_rpc(AppMod, Method, Args) of
{ok, Reply} ->
success_response(Socket, Id, Reply);
ok ->
ok;
{error, Error} ->
error_response(Socket, Id, Error)
+ catch
+ error:Reason ->
+ ?debug("~p:handle_rpc(~p, ~p, ~p) ERROR: ~p~n~p",
+ [?MODULE, AppMod, Method, Args, Reason,
+ erlang:get_stacktrace()]),
+ error_response(Socket, Id, internal_error)
end;
-
{notification, Method, Args} ->
handle_notification(AppMod, Method, Args),
exo_http_server:response(Socket, undefined, 200, "OK", "");
-
{error, _} ->
error_response(Socket, parse_error)
-
catch
error:_ ->
exo_http_server:response(Socket, undefined, 501,
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index 8028917..39beabf 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -219,6 +219,13 @@ notification(Component,
InArg = [ Val || { _Key, Val } <- InArgPropList ],
InArgSpec = [ Key || { Key, _Val } <- InArgPropList ],
%% Figure out how we are to invoke this MFA.
+ ?debug("~p:notification(", [?MODULE]),
+ ?debug(" Component : ~p", [Component]),
+ ?debug(" Module : ~p", [Module]),
+ ?debug(" Function : ~p", [Function]),
+ ?debug(" InArgPropList : ~p", [InArgPropList]),
+ ?debug(" CompSpec : ~p", [CompSpec]),
+
case get_module_type(Component, Module, CompSpec) of
%% We have a gen_server
{ ok, gen_server } ->
@@ -233,6 +240,11 @@ notification(Component,
JSONArg = json_argument(InArg, InArgSpec),
?debug("Sending ~p:~p(~p) -> ~p.", [Module, Function, InArg, JSONArg]),
send_json_notification(URL, atom_to_list(Function), JSONArg),
+ ok;
+ { error, _ } = Error ->
+ ?warning("get_module_type(~p,~p,~p) -> ~p",
+ [Component, Module, CompSpec, Error]),
+ %% ignore
ok
end.
@@ -331,20 +343,27 @@ get_json_element_([], JSON) ->
get_json_element_([Elem | T], JSON ) when is_atom(Elem) ->
get_json_element_([atom_to_list(Elem) | T], JSON);
-get_json_element_([Elem | T], {struct, JSON} ) ->
- Res = get_json_element_(T, proplists:get_value(Elem, JSON, undefined)),
- Res;
-
-get_json_element_([Elem | T], {array, JSON} ) ->
- Res = get_json_element_(T, proplists:get_value(Elem, JSON, undefined)),
- Res;
+get_json_element_([Elem | T], {Type, JSON} ) when Type==array; Type==struct ->
+ case Elem of
+ {'OR', Alts} ->
+ get_json_element_(T, get_json_element_alt(Alts, JSON));
+ _ ->
+ get_json_element_(T, proplists:get_value(Elem, JSON, undefined))
+ end;
get_json_element_(Path,JSON) ->
?warning("get_json_element_(): Unhandled: Path: ~p | JSON: ~p",
[Path, JSON]),
{ error, undefined }.
-
+get_json_element_alt(Alts, [{K, V}|T]) ->
+ case lists:member(K, Alts) of
+ true -> V;
+ false -> get_json_element_alt(Alts, T)
+ end;
+get_json_element_alt(_, []) ->
+ undefined.
+
json_reply(ArgList, JSON) ->
retrieve_json_reply_elements(ArgList, JSON, []).
diff --git a/components/service_discovery/src/service_discovery_rpc.erl b/components/service_discovery/src/service_discovery_rpc.erl
index b3d2a4d..e4ae226 100644
--- a/components/service_discovery/src/service_discovery_rpc.erl
+++ b/components/service_discovery/src/service_discovery_rpc.erl
@@ -91,12 +91,20 @@ get_modules_by_service(CompSpec, Service) ->
register_services(CompSpec, Services, DataLinkModule) ->
+ ?debug("~p:register_services()", [?MODULE]),
+ ?debug(" CompSpec : ", [CompSpec]),
+ ?debug(" Services : ", [Services]),
+ ?debug(" DataLinkMod : ", [DataLinkModule]),
rvi_common:notification(service_discovery, ?MODULE, register_services,
[{ services, Services },
{ data_link_module, DataLinkModule }],
CompSpec).
unregister_services(CompSpec, Services, DataLinkModule) ->
+ ?debug("~p:unregister_services()", [?MODULE]),
+ ?debug(" CompSpec : ", [CompSpec]),
+ ?debug(" Services : ", [Services]),
+ ?debug(" DataLinkMod : ", [DataLinkModule]),
rvi_common:notification(service_discovery, ?MODULE, unregister_services,
[{ services, Services },
{ data_link_module, DataLinkModule}],
@@ -192,23 +200,32 @@ handle_rpc( Other, _Args) ->
?info("svc_disc:handle_rpc(~p): unknown", [ Other ]),
{ok, [ { status, invalid_command } ]}.
-
-handle_call({rvi, get_all_services, _Args}, _From, St) ->
+handle_call(Req, From, St) ->
+ try handle_call_(Req, From, St)
+ catch
+ error:Reason ->
+ ?debug("~p:handle_call_(~p,~p,~p) -> ERROR: ~p~n~p",
+ [?MODULE, Req, From, St, Reason,
+ erlang:get_stacktrace()]),
+ {reply, [internal_error], St}
+ end.
+
+handle_call_({rvi, get_all_services, _Args}, _From, St) ->
Svcs = ets:foldl(fun(#service_entry {service = ServiceName}, Acc) ->
[ ServiceName | Acc ] end,
[], ?SERVICE_TABLE),
{reply, [ok, Svcs], St };
-handle_call({rvi, get_services_by_module, [Module]}, _From, St) ->
+handle_call_({rvi, get_services_by_module, [Module]}, _From, St) ->
{reply, [ok, get_services_by_module_(Module)], St };
-handle_call({rvi, get_modules_by_service, [Service]}, _From, St) ->
+handle_call_({rvi, get_modules_by_service, [Service]}, _From, St) ->
{reply, [ok, get_modules_by_service_(Service)], St };
-handle_call(Other, _From, St) ->
+handle_call_(Other, _From, St) ->
?warning("svc_disc:handle_call(~p): unknown", [ Other ]),
{ reply, [unknown_command] , St}.
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 2f73913..2048358 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -482,7 +482,7 @@ handle_cast({rvi, handle_remote_message,
SvcName,
[{remote_ip, IP},
{remote_port, Port},
- {service, SvcName},
+ {service_name, SvcName},
{timeout, Timeout},
{parameters, Parameters},
{signature, Signature}]) of
diff --git a/python/rvi_call.py b/python/rvi_call.py
index efe54e6..18bce73 100755
--- a/python/rvi_call.py
+++ b/python/rvi_call.py
@@ -23,7 +23,7 @@ def usage():
print " service Service to invoke in RVI."
print " key=val Named arguments to provide to service."
print
- print "Example: ./callrvi.py http://rvi1.nginfotpdx.net:8801 \\"
+ print "Example: ./callrvi.py -n http://rvi1.nginfotpdx.net:8801 \\"
print " jlr.com/vin/aaron/4711/test/ping \\"
print " arg1=val1 arg2=val2"
diff --git a/python/rvi_service.py b/python/rvi_service.py
index 2c3271f..0d6ec6d 100755
--- a/python/rvi_service.py
+++ b/python/rvi_service.py
@@ -28,7 +28,7 @@ def usage():
print "The Service Edge URL is also logged as a notice when the"
print "RVI node is started."
print
- print "Example: ./rvi_service.py /test/some_service http://rvi1.nginfotpdx.net:8801"
+ print "Example: ./rvi_service.py -n http://rvi1.nginfotpdx.net:8801 /test/some_service"
sys.exit(255)
diff --git a/scripts/rvi_create_certificate.py b/scripts/rvi_create_certificate.py
index f59208f..e4d5b0b 100755
--- a/scripts/rvi_create_certificate.py
+++ b/scripts/rvi_create_certificate.py
@@ -23,6 +23,23 @@ import jwt
import time
import json
import base64
+import struct
+
+def long2intarr(long_int):
+ _bytes = []
+ while long_int:
+ long_int, r = divmod(long_int, 256)
+ _bytes.insert(0, r)
+ return _bytes
+
+# copied from https://github.com/rohe/pyjwkest
+def long_to_base64(n):
+ bys = long2intarr(n)
+ data = struct.pack('%sB' % len(bys), *bys)
+ if not len(data):
+ data = '\x00'
+ s = base64.urlsafe_b64encode(data).rstrip(b'=')
+ return s
def usage():
print "Usage:", sys.argv[0], "--id=<id> --invoke='<services>' -register='<services>' \\"
@@ -208,8 +225,8 @@ cert = {
"kty": "RSA",
"alg": "RS256",
"use": "sig",
- "e": base64.urlsafe_b64encode(str(device_key.e)),
- "n": base64.urlsafe_b64encode(str(device_key.n))
+ "e": long_to_base64(device_key.e),
+ "n": long_to_base64(device_key.n)
}],
'validity': {
'start': start,
diff --git a/scripts/setup_gen b/scripts/setup_gen
index 1eb3326..8ec0865 100755
--- a/scripts/setup_gen
+++ b/scripts/setup_gen
Binary files differ
diff --git a/test/config/backend.config b/test/config/backend.config
index 7839d38..3a7be0b 100644
--- a/test/config/backend.config
+++ b/test/config/backend.config
@@ -10,7 +10,8 @@
{provisioning_key,
{openssl_pem, filename:join(CurDir, "root_keys/root_pub.pem")}},
{authorize_jwt, filename:join(
- CurDir, "basic_backend_keys/dev_pub_sign.jwt")}
+ CurDir, "basic_backend_keys/dev_pub_sign.jwt")},
+ {cert_dir, filename:join(CurDir, "basic_backend_certs")}
]}
]}
].
diff --git a/test/config/sample.config b/test/config/sample.config
index 1c0db37..97588f7 100644
--- a/test/config/sample.config
+++ b/test/config/sample.config
@@ -5,12 +5,15 @@
{set_env,
[
{rvi_core,
- [{key_pair, {openssl_pem, filename:join(
+ [
+ {node_service_prefix, "jlr.com/vin/abc"},
+ {key_pair, {openssl_pem, filename:join(
CurDir, "basic_sample_keys/dev_priv.pem")}},
{provisioning_key,
{openssl_pem, filename:join(CurDir, "root_keys/root_pub.pem")}},
{authorize_jwt, filename:join(
- CurDir, "basic_sample_keys/dev_pub_sign.jwt")}
+ CurDir, "basic_sample_keys/dev_pub_sign.jwt")},
+ {cert_dir, filename:join(CurDir, "basic_sample_certs")}
]}
]}
].
diff --git a/test/rvi_core_SUITE.erl b/test/rvi_core_SUITE.erl
index ea1b447..efdefb4 100644
--- a/test/rvi_core_SUITE.erl
+++ b/test/rvi_core_SUITE.erl
@@ -11,14 +11,16 @@
%% test case exports
-export(
[
- t_backend_keys/1,
+ t_backend_keys_and_cert/1,
t_sample_keys_and_cert/1,
t_install_backend_node/1,
t_install_sample_node/1,
t_install_sms_backend_node/1,
t_install_sms_sample_node/1,
t_start_basic_backend/1,
- t_start_basic_sample/1
+ t_start_basic_sample/1,
+ t_register_lock_service/1,
+ t_call_lock_service/1
]).
-include_lib("common_test/include/ct.hrl").
@@ -37,7 +39,7 @@ groups() ->
%% Note that order is significant in this test group.
%% The test cases produce files on disk that are used in later tests
[
- t_backend_keys,
+ t_backend_keys_and_cert,
t_sample_keys_and_cert,
t_install_backend_node,
t_install_sample_node,
@@ -47,7 +49,9 @@ groups() ->
{test_run, [],
[
t_start_basic_backend,
- t_start_basic_sample
+ t_start_basic_sample,
+ t_register_lock_service,
+ t_call_lock_service
]}
].
@@ -83,16 +87,18 @@ end_per_testcase(Case, _Config) ->
%% Test cases
%% ======================================================================
-t_backend_keys(Config) ->
+t_backend_keys_and_cert(Config) ->
RootKeyDir = ensure_dir(root_keys()),
cmd([scripts(),"/rvi_create_root_key.sh -o ",
RootKeyDir, "/root -b 2048"]),
- generate_device_keys("basic_backend_keys", Config).
+ Dir = ensure_dir("basic_backend_keys"),
+ generate_device_keys(Dir, Config),
+ generate_cert(backend, Dir, ensure_dir("basic_backend_certs"), Config).
t_sample_keys_and_cert(Config) ->
Dir = ensure_dir("basic_sample_keys"),
generate_device_keys(Dir, Config),
- generate_cert(Dir, ensure_dir("basic_sample_certs"), Config).
+ generate_cert(sample, Dir, ensure_dir("basic_sample_certs"), Config).
t_install_backend_node(Config) ->
install_rvi_node("basic_backend", env(),
@@ -124,14 +130,133 @@ t_start_basic_sample(Config) ->
await_started("basic_sample"),
ok.
+t_register_lock_service(Config) ->
+ Pid =
+ spawn_cmd(
+ [python(),
+ "/rvi_service.py -n ", service_edge("sample"), " lock"]),
+ save({service, lock}, Pid),
+ timer:sleep(2000).
+
+t_call_lock_service(Config) ->
+ CallPid = spawn_cmd(
+ [python(),
+ "/rvi_call.py -n ", service_edge("sample"),
+ " jlr.com/vin/abc/lock arg1='val1'"]),
+ timer:sleep(2000),
+ [{_, Svc}] = lookup({service, lock}),
+ SvcRes = fetch(Svc),
+ verify_service_res(join_stdout_msgs(SvcRes)),
+ ct:log("SvcRes = ~p~n", [SvcRes]),
+ CallRes = fetch(CallPid),
+ verify_call_res(join_stdout_msgs(CallRes)),
+ ct:log("CallRes = ~p~n", [CallRes]).
+
+verify_service_res(Bin) ->
+ {match,_} =
+ re:run(Bin, <<"Service:[\\h]*jlr.com/vin/abc/lock">>, []),
+ {match,_} =
+ re:run(Bin, <<"Service invoked![\\s]*args: {u'arg1': u'val1'}">>, []),
+ nomatch = re:run(Bin, <<"Traceback">>, []),
+ ok.
+
+verify_call_res(Bin) ->
+ nomatch = re:run(Bin, <<"Traceback">>, []),
+ ok.
+
+join_stdout_msgs(L) ->
+ lists:foldl(
+ fun({stdout,_,Bin}, Acc) ->
+ <<Acc/binary, Bin/binary>>;
+ (_, Acc) ->
+ Acc
+ end, <<>>, L).
+
+spawn_cmd(Cmd0) ->
+ Cmd = binary_to_list(iolist_to_binary(Cmd0)),
+ Me = self(),
+ Pid = spawn(fun() ->
+ Res = exec:run(Cmd, [stdin, stdout, stderr]),
+ ct:log("~s ->~n~p~n", [Cmd, Res]),
+ Me ! {self(), ok},
+ cmd_loop()
+ end),
+ receive
+ {Pid, ok} ->
+ Pid
+ end.
+
+fetch(Pid) ->
+ ct:log("fetch(~p)", [Pid]),
+ Pid ! {self(), fetch},
+ receive
+ {Pid, Res} ->
+ Res
+ after 3000 ->
+ error(timeout)
+ end.
+
+cmd_loop() -> cmd_loop([]).
+
+cmd_loop(Acc) ->
+ receive
+ {From, fetch} ->
+ From ! {self(), lists:reverse(Acc)},
+ cmd_loop([]);
+ Msg ->
+ io:fwrite(user, "~p <- ~p", [self(), Msg]),
+ cmd_loop([Msg|Acc])
+ end.
+
generate_device_keys(Dir, Config) ->
ensure_dir(Dir),
cmd([scripts(),"/rvi_create_device_key.py ",
"-p ", root_keys(), "/root_priv.pem -o ", Dir, "/dev -b 2048"]).
-generate_cert(KeyDir, SampleDir, Config) ->
+generate_cert(sample, KeyDir, CertDir, Config) ->
+ %% Don't put lock_cert.json in the certs directory, since rvi_core
+ %% will report a parse failure for it.
+ UUID = uuid(),
+ {Start, Stop} = start_stop(),
+ cmd([scripts(), "/rvi_create_certificate.py"
+ " --id=", UUID,
+ " --device_key=", KeyDir, "/dev_pub.pem",
+ " --start='", Start, "'"
+ " --stop='", Stop, "'"
+ " --root_key=", root_keys(), "/root_priv.pem"
+ " --register='jlr.com/vin/abc/unlock jlr.com/vin/abc/lock'"
+ " --invoke='jlr.com/backend/set_state'"
+ " --jwt_out=", CertDir, "/lock_cert.jwt"
+ " --cert_out=", KeyDir, "/lock_cert.json"]),
+ ok;
+generate_cert(backend, KeyDir, CertDir, Config) ->
+ UUID = uuid(),
+ {Start, Stop} = start_stop(),
+ cmd([scripts(), "/rvi_create_certificate.py"
+ " --id=", UUID,
+ " --device_key=", KeyDir, "/dev_pub.pem",
+ " --start='", Start, "'"
+ " --stop='", Stop, "'"
+ " --root_key=", root_keys(), "/root_priv.pem"
+ " --register='jlr.com'"
+ " --invoke='jlr.com'"
+ " --jwt_out=", CertDir, "/backend_cert.jwt"
+ " --cert_out=", KeyDir, "/backend_cert.json"]),
ok.
+start_stop() ->
+ DT = erlang:localtime(),
+ GS = calendar:datetime_to_gregorian_seconds(DT),
+ Start_GS = GS - 3600, % valid since one hour ago
+ Stop_GS = GS + 24*3600, % valid for the next 24 hrs
+ Start = calendar:gregorian_seconds_to_datetime(Start_GS),
+ Stop = calendar:gregorian_seconds_to_datetime(Stop_GS),
+ {fmt_date(Start), fmt_date(Stop)}.
+
+fmt_date({{Y,Mo,D}, {H,Mi,S}}) ->
+ io_lib:fwrite("~4..0w-~2..0w-~2..0w "
+ "~2..0w:~2..0w:~2..0w", [Y,Mo,D,H,Mi,S]).
+
ensure_dir(Dir) ->
ok = filelib:ensure_dir(filename:join(Dir, "foo")),
Dir.
@@ -145,9 +270,15 @@ root() ->
scripts() ->
[root(), "/scripts"].
+python() ->
+ [root(), "/python"].
+
root_keys() ->
"root_keys".
+service_edge("backend") -> "http://localhost:8801";
+service_edge("sample" ) -> "http://localhost:8901".
+
install_rvi_node(Name, Env, ConfigF) ->
Root = code:lib_dir(rvi_core),
Scripts = filename:join(Root, "scripts"),
@@ -227,3 +358,37 @@ save(Key, Data) ->
node_name(Name) ->
[_, Host] = re:split(atom_to_list(node()), "@", [{return, list}]),
list_to_atom(Name ++ "@" ++ Host).
+
+%% Copied from gsms_plivo.erl
+uuid() ->
+ %% For now, convert to list (TODO: shouldn't be necessary)
+ binary_to_list(uuid_()).
+
+uuid_() ->
+ %% https://en.wikipedia.org/wiki/Universally_unique_identifier
+ N = 4, M = 2, % version 4 - random bytes
+ <<A:48, _:4, B:12, _:2, C:62>> = crypto:rand_bytes(16),
+ UBin = <<A:48, N:4, B:12, M:2, C:62>>,
+ <<A1:8/binary, B1:4/binary, C1:4/binary, D1:4/binary, E1:12/binary>> =
+ << <<(hex(X)):8>> || <<X:4>> <= UBin >>,
+ <<A1:8/binary, "-",
+ B1:4/binary, "-",
+ C1:4/binary, "-",
+ D1:4/binary, "-",
+ E1:12/binary>>.
+
+hex(X) when X >= 0, X =< 9 ->
+ $0 + X;
+hex(X) when X >= 10, X =< 15 ->
+ $a + X - 10.
+
+
+json_rpc(URL, Method, Args) ->
+ Req = binary_to_list(
+ iolist_to_binary(
+ exo_json:encode({struct, [{"jsonrpc", "2.0"},
+ {"id", 1},
+ {"method", Method},
+ {"params", Args}]}))),
+ Hdrs = [{'Content-Type', "application/json"}],
+ exo_http:wpost(URL, {1,1}, Hdrs, Req, 1000).