summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_rpc_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/amqp_client/src/amqp_rpc_client.erl')
-rw-r--r--deps/amqp_client/src/amqp_rpc_client.erl176
1 files changed, 176 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_rpc_client.erl b/deps/amqp_client/src/amqp_rpc_client.erl
new file mode 100644
index 0000000000..3fd9a34650
--- /dev/null
+++ b/deps/amqp_client/src/amqp_rpc_client.erl
@@ -0,0 +1,176 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @doc This module allows the simple execution of an asynchronous RPC over
+%% AMQP. It frees a client programmer of the necessary having to AMQP
+%% plumbing. Note that the this module does not handle any data encoding,
+%% so it is up to the caller to marshall and unmarshall message payloads
+%% accordingly.
+-module(amqp_rpc_client).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start/2, start_link/2, stop/1]).
+-export([call/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+ handle_cast/2, handle_info/2]).
+
+-record(state, {channel,
+ reply_queue,
+ exchange,
+ routing_key,
+ continuations = #{},
+ correlation_id = 0}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcClient = pid()
+%% @doc Starts a new RPC client instance that sends requests to a
+%% specified queue. This function returns the pid of the RPC client process
+%% that can be used to invoke RPCs and stop the client.
+start(Connection, Queue) ->
+ {ok, Pid} = gen_server:start(?MODULE, [Connection, Queue], []),
+ Pid.
+
+%% @spec (Connection, Queue) -> RpcClient
+%% where
+%% Connection = pid()
+%% Queue = binary()
+%% RpcClient = pid()
+%% @doc Starts, and links to, a new RPC client instance that sends requests
+%% to a specified queue. This function returns the pid of the RPC client
+%% process that can be used to invoke RPCs and stop the client.
+start_link(Connection, Queue) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [Connection, Queue], []),
+ Pid.
+
+%% @spec (RpcClient) -> ok
+%% where
+%% RpcClient = pid()
+%% @doc Stops an existing RPC client.
+stop(Pid) ->
+ gen_server:call(Pid, stop, amqp_util:call_timeout()).
+
+%% @spec (RpcClient, Payload) -> ok
+%% where
+%% RpcClient = pid()
+%% Payload = binary()
+%% @doc Invokes an RPC. Note the caller of this function is responsible for
+%% encoding the request and decoding the response.
+call(RpcClient, Payload) ->
+ gen_server:call(RpcClient, {call, Payload}, amqp_util:call_timeout()).
+
+%%--------------------------------------------------------------------------
+%% Plumbing
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue for this client to listen on
+setup_reply_queue(State = #state{channel = Channel}) ->
+ #'queue.declare_ok'{queue = Q} =
+ amqp_channel:call(Channel, #'queue.declare'{exclusive = true,
+ auto_delete = true}),
+ State#state{reply_queue = Q}.
+
+%% Registers this RPC client instance as a consumer to handle rpc responses
+setup_consumer(#state{channel = Channel, reply_queue = Q}) ->
+ amqp_channel:call(Channel, #'basic.consume'{queue = Q}).
+
+%% Publishes to the broker, stores the From address against
+%% the correlation id and increments the correlationid for
+%% the next request
+publish(Payload, From,
+ State = #state{channel = Channel,
+ reply_queue = Q,
+ exchange = X,
+ routing_key = RoutingKey,
+ correlation_id = CorrelationId,
+ continuations = Continuations}) ->
+ EncodedCorrelationId = base64:encode(<<CorrelationId:64>>),
+ Props = #'P_basic'{correlation_id = EncodedCorrelationId,
+ content_type = <<"application/octet-stream">>,
+ reply_to = Q},
+ Publish = #'basic.publish'{exchange = X,
+ routing_key = RoutingKey,
+ mandatory = true},
+ amqp_channel:call(Channel, Publish, #amqp_msg{props = Props,
+ payload = Payload}),
+ State#state{correlation_id = CorrelationId + 1,
+ continuations = maps:put(EncodedCorrelationId, From, Continuations)}.
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% Sets up a reply queue and consumer within an existing channel
+%% @private
+init([Connection, RoutingKey]) ->
+ {ok, Channel} = amqp_connection:open_channel(
+ Connection, {amqp_direct_consumer, [self()]}),
+ InitialState = #state{channel = Channel,
+ exchange = <<>>,
+ routing_key = RoutingKey},
+ State = setup_reply_queue(InitialState),
+ setup_consumer(State),
+ {ok, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+%% Handle the application initiated stop by just stopping this gen server
+%% @private
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
+
+%% @private
+handle_call({call, Payload}, From, State) ->
+ NewState = publish(Payload, From, State),
+ {noreply, NewState}.
+
+%% @private
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%% @private
+handle_info({#'basic.consume'{}, _Pid}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = #'P_basic'{correlation_id = Id},
+ payload = Payload}},
+ State = #state{continuations = Conts, channel = Channel}) ->
+ From = maps:get(Id, Conts),
+ gen_server:reply(From, Payload),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+ {noreply, State#state{continuations = maps:remove(Id, Conts) }}.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.