summaryrefslogtreecommitdiff
path: root/components/proto_msgpack/src/proto_msgpack_rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'components/proto_msgpack/src/proto_msgpack_rpc.erl')
-rw-r--r--components/proto_msgpack/src/proto_msgpack_rpc.erl180
1 files changed, 180 insertions, 0 deletions
diff --git a/components/proto_msgpack/src/proto_msgpack_rpc.erl b/components/proto_msgpack/src/proto_msgpack_rpc.erl
new file mode 100644
index 0000000..2b1f59c
--- /dev/null
+++ b/components/proto_msgpack/src/proto_msgpack_rpc.erl
@@ -0,0 +1,180 @@
+%% -*- erlang-indent-level:4; indent-tabs-mode: nil -*-
+%% Copyright (C) 2015, Jaguar Land Rover
+%%
+%% This program is licensed under the terms and conditions of the
+%% Mozilla Public License, version 2.0. The full text of the
+%% Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
+%%
+
+
+-module(proto_msgpack_rpc).
+-behaviour(gen_server).
+
+-export([handle_rpc/2,
+ handle_notification/2]).
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+
+-include_lib("lager/include/log.hrl").
+-include_lib("rvi_common/include/rvi_common.hrl").
+
+-define(SERVER, ?MODULE).
+-export([start_json_server/0]).
+-export([send_message/8,
+ receive_message/3]).
+
+-record(st, {
+ %% Component specification
+ queue = [],
+ cs = #component_spec{},
+ pack_opts = [{allow_atom, pack},
+ {enable_str, true},
+ jsx]
+ }).
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ ?debug("init(): called."),
+ {ok, #st{ cs = rvi_common:get_component_specification() }}.
+
+start_json_server() ->
+ rvi_common:start_json_rpc_server(protocol, ?MODULE, proto_msgpack_sup).
+
+
+
+send_message(CompSpec,
+ TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters) ->
+ rvi_common:request(protocol, ?MODULE, send_message,
+ [{ transaction_id, TID },
+ { service, ServiceName },
+ { timeout, Timeout },
+ { protocol_opts, ProtoOpts },
+ { data_link_mod, DataLinkMod },
+ { data_link_opts, DataLinkOpts },
+ { parameters, Parameters }],
+ [ status ], CompSpec).
+
+receive_message(CompSpec, {IP, Port}, Data) ->
+ rvi_common:notification(protocol, ?MODULE, receive_message,
+ [ {data, Data },
+ {remote_ip, IP},
+ {remote_port, Port} ],
+ CompSpec).
+
+%% JSON-RPC entry point
+
+%% CAlled by local exo http server
+handle_rpc("send_message", Args) ->
+ {ok, TID} = rvi_common:get_json_element(["transaction_id"], Args),
+ {ok, ServiceName} = rvi_common:get_json_element(["service_name"], Args),
+ {ok, Timeout} = rvi_common:get_json_element(["timeout"], Args),
+ {ok, ProtoOpts} = rvi_common:get_json_element(["protocol_opts"], Args),
+ {ok, DataLinkMod} = rvi_common:get_json_element(["data_link_mod"], Args),
+ {ok, DataLinkOpts} = rvi_common:get_json_element(["data_link_opts"], Args),
+ {ok, Parameters} = rvi_common:get_json_element(["parameters"], Args),
+ [ ok ] = gen_server:call(?SERVER, { rvi, send_message,
+ [TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters]}),
+ {ok, [ {status, rvi_common:json_rpc_status(ok)} ]};
+
+
+handle_rpc(Other, _Args) ->
+ ?warning("proto_msgpack_rpc:handle_rpc(~p): Unknown~n", [ Other ]),
+ { ok, [ { status, rvi_common:json_rpc_status(invalid_command)} ] }.
+
+
+handle_notification("receive_message", Args) ->
+ {ok, Data} = rvi_common:get_json_element(["data"], Args),
+ {ok, RemoteIP} = rvi_common:get_json_element(["remote_ip"], Args),
+ {ok, RemotePort} = rvi_common:get_json_element(["remote_port"], Args),
+ gen_server:cast(?SERVER, { rvi, receive_message, [Data, RemoteIP, RemotePort]}),
+ ok;
+
+handle_notification(Other, _Args) ->
+ ?debug("handle_notification(Other=~p): unknown", [ Other ]),
+ ok.
+
+
+handle_call({rvi, send_message,
+ [TID,
+ ServiceName,
+ Timeout,
+ ProtoOpts,
+ DataLinkMod,
+ DataLinkOpts,
+ Parameters]}, _From, St) ->
+ ?debug(" protocol:send(): transaction id: ~p~n", [TID]),
+ ?debug(" protocol:send(): service name: ~p~n", [ServiceName]),
+ ?debug(" protocol:send(): timeout: ~p~n", [Timeout]),
+ ?debug(" protocol:send(): opts: ~p~n", [ProtoOpts]),
+ ?debug(" protocol:send(): data_link_mod: ~p~n", [DataLinkMod]),
+ ?debug(" protocol:send(): data_link_opts: ~p~n", [DataLinkOpts]),
+ ?debug(" protocol:send(): parameters: ~p~n", [Parameters]),
+ Data = msgpack:pack([ { <<"tid">>, TID },
+ { <<"service">>, ServiceName },
+ { <<"timeout">>, Timeout },
+ { <<"parameters">>, Parameters } ], St#st.pack_opts),
+ Res = DataLinkMod:send_data(
+ St#st.cs, ?MODULE, ServiceName, DataLinkOpts, Data),
+ {reply, Res, St};
+
+handle_call(Other, _From, St) ->
+ ?warning("proto_msgpack_rpc:handle_call(~p): unknown", [ Other ]),
+ { reply, [ invalid_command ], St}.
+
+
+%% Convert list-based data to binary.
+handle_cast({rvi, receive_message, [Payload, IP, Port]} = Msg, St) ->
+ ?debug("~p:handle_cast(~p)", [?MODULE, Msg]),
+ {ok, Elems} = msgpack:unpack(Payload, St#st.pack_opts),
+
+ [ ServiceName, Timeout, Parameters ] =
+ opts([<<"service">>, <<"timeout">>, <<"parameters">>],
+ Elems, undefined),
+
+ ?debug(" protocol:rcv(): service name: ~p~n", [ServiceName]),
+ ?debug(" protocol:rcv(): timeout: ~p~n", [Timeout]),
+ ?debug(" protocol:rcv(): remote IP/Port: ~p~n", [{IP, Port}]),
+
+ service_edge_rpc:handle_remote_message(St#st.cs,
+ {IP, Port},
+ ServiceName,
+ Timeout,
+ Parameters),
+ {noreply, St};
+
+handle_cast(Other, St) ->
+ ?warning("proto_msgpack_rpc:handle_cast(~p): unknown", [ Other ]),
+ {noreply, St}.
+
+handle_info(_Info, St) ->
+ {noreply, St}.
+
+terminate(_Reason, _St) ->
+ ok.
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
+
+opt(K, L, Def) ->
+ case lists:keyfind(K, 1, L) of
+ {_, V} -> V;
+ false -> Def
+ end.
+
+opts(Keys, Elems, Def) ->
+ [ opt(K, Elems, Def) || K <- Keys].