summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_direct_consumer.erl
blob: f2bded5a83c8524efda285a0a159333818fb333e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2011-2021 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% @doc This module is an implementation of the amqp_gen_consumer
%% behaviour and can be used as part of the Consumer parameter when
%% opening AMQP channels.
%% <br/>
%% <br/>
%% The Consumer parameter for this implementation is {{@module},
%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
%% queue subscription-related messages.<br/>
%% <br/>
%% This consumer implementation causes the channel to send to the
%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
%% basic.cancel_ok and basic.deliver messages received from the
%% server.
%% <br/>
%% <br/>
%% In addition, this consumer implementation monitors the ConsumerPid
%% and exits with the same shutdown reason when it dies.  'DOWN'
%% messages from other sources are passed to ConsumerPid.
%% <br/>
%% Warning! It is not recommended to rely on a consumer on killing off the
%% channel (through the exit signal). That may cause messages to get lost.
%% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
%% <br/>
%% This module has no public functions.
-module(amqp_direct_consumer).

-include("amqp_gen_consumer_spec.hrl").

-behaviour(amqp_gen_consumer).

-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
         handle_cancel/2, handle_server_cancel/2,
         handle_deliver/3, handle_deliver/4,
         handle_info/2, handle_call/3, terminate/2]).

%%---------------------------------------------------------------------------
%% amqp_gen_consumer callbacks
%%---------------------------------------------------------------------------

%% @private
init([ConsumerPid]) ->
    erlang:monitor(process, ConsumerPid),
    {ok, ConsumerPid}.

%% @private
handle_consume(M, A, C) ->
    C ! {M, A},
    {ok, C}.

%% @private
handle_consume_ok(M, _, C) ->
    C ! M,
    {ok, C}.

%% @private
handle_cancel(M, C) ->
    C ! M,
    {ok, C}.

%% @private
handle_cancel_ok(M, _, C) ->
    C ! M,
    {ok, C}.

%% @private
handle_server_cancel(M, C) ->
    C ! {server_cancel, M},
    {ok, C}.

%% @private
handle_deliver(M, A, C) ->
    C ! {M, A},
    {ok, C}.
handle_deliver(M, A, DeliveryCtx, C) ->
    C ! {M, A, DeliveryCtx},
    {ok, C}.


%% @private
handle_info({'DOWN', _MRef, process, C, normal}, C) ->
    %% The channel was closed.
    {ok, C};
handle_info({'DOWN', _MRef, process, C, Info}, C) ->
    {error, {consumer_died, Info}, C};
handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
    C ! {'DOWN', MRef, process, Pid, Info},
    {ok, C}.

%% @private
handle_call(M, A, C) ->
    C ! {M, A},
    {reply, ok, C}.

%% @private
terminate(_Reason, C) ->
    C.