diff options
Diffstat (limited to 'components')
-rw-r--r-- | components/dlink_tcp/src/connection.erl | 9 | ||||
-rw-r--r-- | components/dlink_tcp/src/dlink_tcp_rpc.erl | 2 | ||||
-rw-r--r-- | components/rvi_common/src/rvi_common.erl | 41 | ||||
-rw-r--r-- | components/service_edge/src/service_edge_rpc.erl | 43 |
4 files changed, 60 insertions, 35 deletions
diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index c6a9531..5444d11 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -46,7 +46,6 @@ sock = undefined, mod = undefined, func = undefined, - args = undefined, packet_mod = ?PACKET_MOD, packet_st = [], decode_st = <<>>, @@ -289,9 +288,9 @@ handle_info({tcp_closed, Sock}, port = Port, mod = Mod, func = Fun, - args = Arg } = State) -> + cs = CS } = State) -> ?debug("handle_info(tcp_closed): Address: ~p:~p ", [IP, Port]), - Mod:Fun(self(), IP, Port, closed, Arg), + Mod:Fun(self(), IP, Port, closed, CS), gen_tcp:close(Sock), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; @@ -302,10 +301,10 @@ handle_info({tcp_error, _Sock}, port = Port, mod = Mod, func = Fun, - args = Arg} = State) -> + cs = CS} = State) -> ?debug("handle_info(tcp_error): Address: ~p:~p ", [IP, Port]), - Mod:Fun(self(), IP, Port, error, Arg), + Mod:Fun(self(), IP, Port, error, CS), connection_manager:delete_connection_by_pid(self()), {stop, normal, State}; diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index afabafe..31184bd 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -375,7 +375,6 @@ handle_socket_(FromPid, PeerIP, PeerPort, data, Elems, CompSpec) -> ProtoMod, Data, CompSpec); ?DLINK_CMD_PING -> - ?info("dlink_tcp:ping(): Pinged from: ~p:~p", [ PeerIP, PeerPort ]), ok; undefined -> @@ -574,7 +573,6 @@ handle_info({ rvi_ping, Pid, Address, Port, Timeout}, St) -> %% Check that connection is up case connection:is_connection_up(Pid) of true -> - ?info("dlink_tcp:ping(): Pinging: ~p:~p", [Address, Port]), connection:send(Pid, [{?DLINK_ARG_CMD, ?DLINK_CMD_PING}]), erlang:send_after(Timeout, self(), { rvi_ping, Pid, Address, Port, Timeout }); diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index ca81b40..574aa51 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -102,7 +102,11 @@ status_values() -> {6, no_route}, {7, unauthorized}]. -get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) -> +get_request_result(R) -> + ?debug("get_request_result(~p)", [R]), + get_request_result_(R). + +get_request_result_({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody}) -> ?debug("JSONBody = ~p", [JSONBody]), case get_json_element(["result", "status"], JSONBody) of {ok, Value} -> @@ -112,16 +116,16 @@ get_request_result({ok, {http_response, {_V1, _V2}, 200, _Text, _Hdr}, JSONBody} {ok, undefined } end; -get_request_result({ok, {http_response, {_V1, _V2}, Status, Reason, _Hdr}, _JSONBody}) -> +get_request_result_({ok, {http_response, {_V1, _V2}, Status, Reason, _Hdr}, _JSONBody}) -> {error, {http, Status, Reason}}; -get_request_result({error, Reason})-> +get_request_result_({error, Reason})-> { error, Reason}; -get_request_result(ok)-> +get_request_result_(ok)-> { ok, ok, "{}"}; -get_request_result(Other)-> +get_request_result_(Other)-> ?error("get_request_result(): Unhandled result: ~p", [Other]), { error, format }. @@ -306,22 +310,31 @@ get_json_element(_, []) -> get_json_element(ElemPath, JSON) when is_atom(ElemPath) -> get_json_element([ElemPath], JSON); -get_json_element(ElemPath, JSON) when is_binary(JSON) -> - get_json_element(ElemPath, binary_to_list(JSON)); +%% get_json_element(ElemPath, JSON) when is_binary(JSON) -> +%% get_json_element(ElemPath, binary_to_list(JSON)); + +get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) -> + get_json_element(ElemPath, iolist_to_binary(JSON)); get_json_element(ElemPath, JSON) when is_tuple(JSON) -> get_json_element_(ElemPath, JSON); get_json_element(ElemPath, [T|_] = JSON) when is_tuple(T) -> get_json_element_(ElemPath, JSON); -get_json_element(ElemPath, [H|_] = JSON) when is_integer(H) -> - case exo_json:decode_string(JSON) of - {ok, Data } -> - get_json_element_(ElemPath, Data); - - Err -> - Err +get_json_element(ElemPath, JSON) when is_binary(JSON) -> + try jsx:decode(JSON) of + Decoded -> + get_json_element_(ElemPath, Decoded) + catch error:Err -> + {error, Err} end; + %% case exo_json:decode_string(JSON) of + %% {ok, Data } -> + %% get_json_element_(ElemPath, Data); + + %% Err -> + %% Err + %% end; get_json_element(P, J) -> ?warning("get_json_element(): Unknown call structure; Path: ~p | JSON: ~p", diff --git a/components/service_edge/src/service_edge_rpc.erl b/components/service_edge/src/service_edge_rpc.erl index 42e15fb..f27448e 100644 --- a/components/service_edge/src/service_edge_rpc.erl +++ b/components/service_edge/src/service_edge_rpc.erl @@ -215,10 +215,20 @@ handle_local_timeout(CompSpec, SvcName, TransID) -> handle_websocket(WSock, Mesg, Arg) -> + Decoded = try jsx:decode(Mesg) + catch error:E0 -> + ?debug("Failed decode of ~p: ~p", [Mesg, E0]), + Mesg + end, + ?debug("Decoded Mesg = ~p", [Decoded]), { ok, Method } = rvi_common:get_json_element(["method"], Mesg), - { ok, Params } = rvi_common:get_json_element(["params"], Mesg), + { ok, Params0 } = rvi_common:get_json_element(["params"], Mesg), { ok, ID } = rvi_common:get_json_element(["id"], Mesg), - + Params = try jsx:decode(Params0) + catch error:E -> + ?debug("Failed decode of ~p:~p", [Params0, E]), + Params0 + end, ?debug("service_edge_rpc:handle_websocket(~p/~p) method: ~p", [ WSock, ID,Method ]), case handle_ws_json_rpc(WSock, Method, Params, Arg) of @@ -233,12 +243,13 @@ handle_websocket(WSock, Mesg, Arg) -> %% Websocket interface -handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> +handle_ws_json_rpc(WSock, <<"message">>, Params, _Arg ) -> { ok, SvcName0 } = rvi_common:get_json_element(["service_name"], Params), { ok, Timeout } = rvi_common:get_json_element(["timeout"], Params), - { ok, Parameters0 } = rvi_common:get_json_element(["parameters"], Params), + { ok, Parameters } = rvi_common:get_json_element(["parameters"], Params), SvcName = iolist_to_binary(SvcName0), - Parameters = parse_ws_params(Parameters0), + ?debug("WS Parameters: ~p", [Parameters]), + %% Parameters = parse_ws_params(Parameters0), LogId = log_id_json_tail(Params ++ Parameters), ?debug("service_edge_rpc:handle_websocket(~p) params!: ~p", [ WSock, Params ]), ?debug("service_edge_rpc:handle_websocket(~p) service: ~p", [ WSock, SvcName ]), @@ -257,7 +268,7 @@ handle_ws_json_rpc(WSock, "message", Params, _Arg ) -> { method, <<"message">>}] } end; -handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) -> +handle_ws_json_rpc(WSock, <<"register_service">>, Params,_Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), ?debug("service_edge_rpc:websocket_register(~p) service: ~p", [ WSock, SvcName ]), [ok, FullSvcName ] = gen_server:call(?SERVER, @@ -270,24 +281,28 @@ handle_ws_json_rpc(WSock, "register_service", Params,_Arg ) -> { service, FullSvcName }, { method, <<"register_service">>}]}; -handle_ws_json_rpc(WSock, "unregister_service", Params, _Arg ) -> +handle_ws_json_rpc(WSock, <<"unregister_service">>, Params, _Arg ) -> { ok, SvcName } = rvi_common:get_json_element(["service_name"], Params), ?debug("service_edge_rpc:websocket_unregister(~p) service: ~p", [ WSock, SvcName ]), gen_server:call(?SERVER, { rvi, unregister_local_service, [ SvcName ]}), { ok, [ { status, rvi_common:json_rpc_status(ok)} ]}; -handle_ws_json_rpc(_Ws , "get_available_services", _Params, _Arg ) -> +handle_ws_json_rpc(_Ws , <<"get_available_services">>, _Params, _Arg ) -> ?debug("service_edge_rpc:websocket_get_available()"), - [ Services ] = gen_server:call(?SERVER, { rvi, get_available_services, []}), + [ ok, Services ] = + gen_server:call(?SERVER, { rvi, get_available_services, []}), { ok, [ { status, rvi_common:json_rpc_status(ok)}, { services, Services}, { method, <<"get_available_services">>}] }. -parse_ws_params([{K, V}|T]) -> - [{iolist_to_binary(K), jsx:decode(iolist_to_binary(V))} - | parse_ws_params(T)]; -parse_ws_params([]) -> - []. +%% parse_ws_params([{K, V}|T]) -> +%% K1 = iolist_to_binary(K), +%% V1 = iolist_to_binary(V), +%% ?debug("K1 = ~p, V1 = ~p", [K1, V1]), +%% [{K1, jsx:decode(iolist_to_binary(V1))} +%% | parse_ws_params(T)]; +%% parse_ws_params([]) -> +%% []. %% Invoked by locally connected services. %% Will always be routed as JSON-RPC since that, and websocket, |