1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at https://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_ha_test_consumer).
-include_lib("amqp_client/include/amqp_client.hrl").
-export([await_response/1, create/5, start/6]).
await_response(ConsumerPid) ->
case receive {ConsumerPid, Response} -> Response end of
{error, Reason} -> erlang:error(Reason);
ok -> ok
end.
create(Channel, Queue, TestPid, CancelOnFailover, ExpectingMsgs) ->
ConsumerPid = spawn_link(?MODULE, start,
[TestPid, Channel, Queue, CancelOnFailover,
ExpectingMsgs + 1, ExpectingMsgs]),
amqp_channel:subscribe(
Channel, consume_method(Queue, CancelOnFailover), ConsumerPid),
ConsumerPid.
start(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
error_logger:info_msg("consumer ~p on ~p awaiting ~w messages "
"(lowest seen = ~w, cancel-on-failover = ~w)~n",
[self(), Channel, MsgsToConsume, LowestSeen,
CancelOnFailover]),
run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume).
run(TestPid, _Channel, _Queue, _CancelOnFailover, _LowestSeen, 0) ->
consumer_reply(TestPid, ok);
run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
receive
#'basic.consume_ok'{} ->
run(TestPid, Channel, Queue,
CancelOnFailover, LowestSeen, MsgsToConsume);
{Delivery = #'basic.deliver'{ redelivered = Redelivered },
#amqp_msg{payload = Payload}} ->
MsgNum = list_to_integer(binary_to_list(Payload)),
ack(Delivery, Channel),
%% we can receive any message we've already seen and,
%% because of the possibility of multiple requeuings, we
%% might see these messages in any order. If we are seeing
%% a message again, we don't decrement the MsgsToConsume
%% counter.
if
MsgNum + 1 == LowestSeen ->
run(TestPid, Channel, Queue,
CancelOnFailover, MsgNum, MsgsToConsume - 1);
MsgNum >= LowestSeen ->
error_logger:info_msg(
"consumer ~p on ~p ignoring redelivered msg ~p~n",
[self(), Channel, MsgNum]),
true = Redelivered, %% ASSERTION
run(TestPid, Channel, Queue,
CancelOnFailover, LowestSeen, MsgsToConsume);
true ->
%% We received a message we haven't seen before,
%% but it is not the next message in the expected
%% sequence.
consumer_reply(TestPid,
{error, {unexpected_message, MsgNum}})
end;
#'basic.cancel'{} when CancelOnFailover ->
error_logger:info_msg("consumer ~p on ~p received basic.cancel: "
"resubscribing to ~p on ~p~n",
[self(), Channel, Queue, Channel]),
resubscribe(TestPid, Channel, Queue, CancelOnFailover,
LowestSeen, MsgsToConsume);
#'basic.cancel'{} ->
exit(cancel_received_without_cancel_on_failover)
end.
%%
%% Private API
%%
resubscribe(TestPid, Channel, Queue, CancelOnFailover, LowestSeen,
MsgsToConsume) ->
amqp_channel:subscribe(
Channel, consume_method(Queue, CancelOnFailover), self()),
ok = receive #'basic.consume_ok'{} -> ok
end,
error_logger:info_msg("re-subscripting consumer ~p on ~p complete "
"(received basic.consume_ok)",
[self(), Channel]),
start(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume).
consume_method(Queue, CancelOnFailover) ->
Args = [{<<"x-cancel-on-ha-failover">>, bool, CancelOnFailover}],
#'basic.consume'{queue = Queue,
arguments = Args}.
ack(#'basic.deliver'{delivery_tag = DeliveryTag}, Channel) ->
amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
ok.
consumer_reply(TestPid, Reply) ->
TestPid ! {self(), Reply}.
|