summaryrefslogtreecommitdiff
path: root/components
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-15 12:12:29 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-15 12:12:29 -0800
commita3c2003b644cb321e389def8034995fd2073a226 (patch)
treeab408d4a7c2b3736db2e98aecf88f33d681fc865 /components
parente9b83c71386b9d4d6aa535a43d81bc48457015d7 (diff)
downloadrvi_core-a3c2003b644cb321e389def8034995fd2073a226.tar.gz
rewrite json decoder transition code in rvi_common
Diffstat (limited to 'components')
-rw-r--r--components/dlink_tcp/src/connection.erl9
-rw-r--r--components/dlink_tcp/src/dlink_tcp_rpc.erl2
-rw-r--r--components/rvi_common/src/rvi_common.erl41
-rw-r--r--components/service_edge/src/service_edge_rpc.erl43
4 files changed, 60 insertions, 35 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl
index c6a9531..5444d11 100644
--- a/components/dlink_tcp/src/connection.erl
+++ b/components/dlink_tcp/src/connection.erl
@@ -46,7 +46,6 @@
sock = undefined,
mod = undefined,
func = undefined,
- args = undefined,
packet_mod = ?PACKET_MOD,
packet_st = [],
decode_st = <<>>,
@@ -289,9 +288,9 @@ handle_info({tcp_closed, Sock},
port = Port,
mod = Mod,
func = Fun,
- args = Arg } = State) ->
+ cs = CS } = State) ->
?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port, closed, Arg),
+ Mod:Fun(self(), IP, Port, closed, CS),
gen_tcp:close(Sock),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
@@ -302,10 +301,10 @@ handle_info({tcp_error, _Sock},
port = Port,
mod = Mod,
func = Fun,
- args = Arg} = State) ->
+ cs = CS} = State) ->
?debug("handle_info(tcp_error): Address: ~p:~p ", [IP, Port]),
- Mod:Fun(self(), IP, Port, error, Arg),
+ Mod:Fun(self(), IP, Port, error, CS),
connection_manager:delete_connection_by_pid(self()),
{stop, normal, State};
diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl
index afabafe..31184bd 100644
--- a/components/dlink_tcp/src/dlink_tcp_rpc.erl
+++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl
@@ -375,7 +375,6 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) ->
ProtoMod, Data, CompSpec);
?DLINK_CMD_PING ->
- ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort ]),
ok;
undefined ->
@@ -574,7 +573,6 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) ->
%% Check that connection is up
case connection:is_connection_up(Pid) of
true ->
- ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]),
connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]),
erlang:send_after(Timeout, self(),
{ rvi_ping, Pid, Address, Port, Timeout });
diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl
index ca81b40..574aa51 100644
--- a/components/rvi_common/src/rvi_common.erl
+++ b/components/rvi_common/src/rvi_common.erl
@@ -102,7 +102,11 @@ status_values() ->
{6, no_route},
{7, unauthorized}].
-get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) ->
+get_request_result(R) ->
+ ?debug("get_request_result(~p)", [R]),
+ get_request_result_(R).
+
+get_request_result_({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) ->
?debug("JSONBody = ~p", [JSONBody]),
case get_json_element(["result", "status"], JSONBody) of
{ok, Value} ->
@@ -112,16 +116,16 @@ get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}
{ok, undefined }
end;
-get_request_result({ok, {http_response, {_V1, _V2}, Status, Reason, _Hdr}, _JSONBody}) ->
+get_request_result_({ok, {http_response, {_V1, _V2}, Status, Reason, _Hdr}, _JSONBody}) ->
{error, {http, Status, Reason}};
-get_request_result({error, Reason})->
+get_request_result_({error, Reason})->
{ error, Reason};
-get_request_result(ok)->
+get_request_result_(ok)->
{ ok, ok, "{}"};
-get_request_result(Other)->
+get_request_result_(Other)->
?error("get_request_result(): Unhandled result: ~p", [Other]),
{ error, format }.
@@ -306,22 +310,31 @@ get_json_element(_, []) ->
get_json_element(ElemPath, JSON) when is_atom(ElemPath) ->
get_json_element([ElemPath], JSON);
-get_json_element(ElemPath, JSON) when is_binary(JSON) ->
- get_json_element(ElemPath, binary_to_list(JSON));
+%% get_json_element(ElemPath, JSON) when is_binary(JSON) ->
+%% get_json_element(ElemPath, binary_to_list(JSON));
+
+get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) ->
+ get_json_element(ElemPath, iolist_to_binary(JSON));
get_json_element(ElemPath, JSON) when is_tuple(JSON) ->
get_json_element_(ElemPath, JSON);
get_json_element(ElemPath, [T|_] = JSON) when is_tuple(T) ->
get_json_element_(ElemPath, JSON);
-get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) ->
- case exo_json:decode_string(JSON) of
- {ok, Data } ->
- get_json_element_(ElemPath, Data);
-
- Err ->
- Err
+get_json_element(ElemPath, JSON) when is_binary(JSON) ->
+ try jsx:decode(JSON) of
+ Decoded ->
+ get_json_element_(ElemPath, Decoded)
+ catch error:Err ->
+ {error, Err}
end;
+ %% case exo_json:decode_string(JSON) of
+ %% {ok, Data } ->
+ %% get_json_element_(ElemPath, Data);
+
+ %% Err ->
+ %% Err
+ %% end;
get_json_element(P, J) ->
?warning("get_json_element(): Unknown call structure; Path: ~p | JSON: ~p",
diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl
index 42e15fb..f27448e 100644
--- a/components/service_edge/src/service_edge_rpc.erl
+++ b/components/service_edge/src/service_edge_rpc.erl
@@ -215,10 +215,20 @@ handle_local_timeout(CompSpec, SvcName, TransID) ->
handle_websocket(WSock, Mesg, Arg) ->
+ Decoded = try jsx:decode(Mesg)
+ catch error:E0 ->
+ ?debug("Failed decode of ~p: ~p", [Mesg, E0]),
+ Mesg
+ end,
+ ?debug("Decoded Mesg = ~p", [Decoded]),
{ ok, Method } = rvi_common:get_json_element(["method"], Mesg),
- { ok, Params } = rvi_common:get_json_element(["params"], Mesg),
+ { ok, Params0 } = rvi_common:get_json_element(["params"], Mesg),
{ ok, ID } = rvi_common:get_json_element(["id"], Mesg),
-
+ Params = try jsx:decode(Params0)
+ catch error:E ->
+ ?debug("Failed decode of ~p:~p", [Params0, E]),
+ Params0
+ end,
?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]),
case handle_ws_json_rpc(WSock, Method, Params, Arg) of
@@ -233,12 +243,13 @@ handle_websocket(WSock, Mesg, Arg) ->
%% Websocket interface
-handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
+handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) ->
{ ok, SvcName0 } = rvi_common:get_json_element(["service_name"], Params),
{ ok, Timeout } = rvi_common:get_json_element(["timeout"], Params),
- { ok, Parameters0 } = rvi_common:get_json_element(["parameters"], Params),
+ { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params),
SvcName = iolist_to_binary(SvcName0),
- Parameters = parse_ws_params(Parameters0),
+ ?debug("WS Parameters: ~p", [Parameters]),
+ %% Parameters = parse_ws_params(Parameters0),
LogId = log_id_json_tail(Params ++ Parameters),
?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]),
?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]),
@@ -257,7 +268,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) ->
{ method, <<"message">>}] }
end;
-handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) ->
+handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]),
[ok, FullSvcName ] = gen_server:call(?SERVER,
@@ -270,24 +281,28 @@ handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) ->
{ service, FullSvcName },
{ method, <<"register_service">>}]};
-handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) ->
+handle_ws_json_rpc(WSock, <<"unregister_service">>, Params, _Arg ) ->
{ ok, SvcName } = rvi_common:get_json_element(["service_name"], Params),
?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]),
gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}),
{ ok, [ { status, rvi_common:json_rpc_status(ok)} ]};
-handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) ->
+handle_ws_json_rpc(_Ws , <<"get_available_services">>, _Params, _Arg ) ->
?debug("service_edge_rpc:websocket_get_available()"),
- [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}),
+ [ ok, Services ] =
+ gen_server:call(?SERVER, { rvi, get_available_services, []}),
{ ok, [ { status, rvi_common:json_rpc_status(ok)},
{ services, Services},
{ method, <<"get_available_services">>}] }.
-parse_ws_params([{K, V}|T]) ->
- [{iolist_to_binary(K), jsx:decode(iolist_to_binary(V))}
- | parse_ws_params(T)];
-parse_ws_params([]) ->
- [].
+%% parse_ws_params([{K, V}|T]) ->
+%% K1 = iolist_to_binary(K),
+%% V1 = iolist_to_binary(V),
+%% ?debug("K1 = ~p, V1 = ~p", [K1, V1]),
+%% [{K1, jsx:decode(iolist_to_binary(V1))}
+%% | parse_ws_params(T)];
+%% parse_ws_params([]) ->
+%% [].
%% Invoked by locally connected services.
%% Will always be routed as JSON-RPC since that, and websocket,