summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_rpc_server.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/amqp_client/src/amqp_rpc_server.erl')
-rw-r--r--deps/amqp_client/src/amqp_rpc_server.erl138
1 files changed, 138 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_rpc_server.erl b/deps/amqp_client/src/amqp_rpc_server.erl
new file mode 100644
index 0000000000..44e5113a94
--- /dev/null
+++ b/deps/amqp_client/src/amqp_rpc_server.erl
@@ -0,0 +1,138 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This is a utility module that is used to expose an arbitrary function
+%% via an asynchronous RPC over AMQP mechanism. It frees the implementor of
+%% a simple function from having to plumb this into AMQP. Note that the
+%% RPC server does not handle any data encoding, so it is up to the callback
+%% function to marshall and unmarshall message payloads accordingly.
+-module(amqp_rpc_server).
+
+-behaviour(gen_server).
+
+-include("amqp_client.hrl").
+
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+ handle_cast/2, handle_info/2]).
+-export([start/3, start_link/3]).
+-export([stop/1]).
+
+-record(state, {channel,
+ handler}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue, RpcHandler) -> RpcServer
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcHandler = function()
+%% RpcServer = pid()
+%% @doc Starts a new RPC server instance that receives requests via a
+%% specified queue and dispatches them to a specified handler function. This
+%% function returns the pid of the RPC server that can be used to stop the
+%% server.
+start(Connection, Queue, Fun) ->
+ {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue, Fun], []),
+ Pid.
+
+%% @spec (Connection, Queue, RpcHandler) -> RpcServer
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcHandler = function()
+%% RpcServer = pid()
+%% @doc Starts, and links to, a new RPC server instance that receives
+%% requests via a specified queue and dispatches them to a specified
+%% handler function. This function returns the pid of the RPC server that
+%% can be used to stop the server.
+start_link(Connection, Queue, Fun) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [Connection, Queue, Fun], []),
+ Pid.
+
+%% @spec (RpcServer) -> ok
+%% where
+%% RpcServer = pid()
+%% @doc Stops an existing RPC server.
+stop(Pid) ->
+ gen_server:call(Pid, stop, amqp_util:call_timeout()).
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+init([Connection, Q, Fun]) ->
+ {ok, Channel} = amqp_connection:open_channel(
+ Connection, {amqp_direct_consumer, [self()]}),
+ amqp_channel:call(Channel, #'queue.declare'{queue = Q}),
+ amqp_channel:call(Channel, #'basic.consume'{queue = Q}),
+ {ok, #state{channel = Channel, handler = Fun} }.
+
+%% @private
+handle_info(shutdown, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.consume'{}, _}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = Props, payload = Payload}},
+ State = #state{handler = Fun, channel = Channel}) ->
+ #'P_basic'{correlation_id = CorrelationId,
+ reply_to = Q} = Props,
+ Response = Fun(Payload),
+ Properties = #'P_basic'{correlation_id = CorrelationId},
+ Publish = #'basic.publish'{exchange = <<>>,
+ routing_key = Q,
+ mandatory = true},
+ amqp_channel:call(Channel, Publish, #amqp_msg{props = Properties,
+ payload = Response}),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+ {noreply, State};
+
+%% @private
+handle_info({'DOWN', _MRef, process, _Pid, _Info}, State) ->
+ {noreply, State}.
+
+%% @private
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+%%--------------------------------------------------------------------------
+%% Rest of the gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+handle_cast(_Message, State) ->
+ {noreply, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.