summaryrefslogtreecommitdiff
path: root/test/quorum_queue_utils.erl
blob: 224abeeeebfa5b16b19ab9706faa44b2d0fe04b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
-module(quorum_queue_utils).

-include_lib("eunit/include/eunit.hrl").

-export([
         wait_for_messages_ready/3,
         wait_for_messages_pending_ack/3,
         wait_for_messages_total/3,
         wait_for_messages/2,
         dirty_query/3,
         ra_name/1,
         fifo_machines_use_same_version/1,
         fifo_machines_use_same_version/2,
         is_mixed_versions/0
        ]).

wait_for_messages_ready(Servers, QName, Ready) ->
    wait_for_messages(Servers, QName, Ready,
                      fun rabbit_fifo:query_messages_ready/1, 60).

wait_for_messages_pending_ack(Servers, QName, Ready) ->
    wait_for_messages(Servers, QName, Ready,
                      fun rabbit_fifo:query_messages_checked_out/1, 60).

wait_for_messages_total(Servers, QName, Total) ->
    wait_for_messages(Servers, QName, Total,
                      fun rabbit_fifo:query_messages_total/1, 60).

wait_for_messages(Servers, QName, Number, Fun, 0) ->
    Msgs = dirty_query(Servers, QName, Fun),
    ?assertEqual([Number || _ <- lists:seq(1, length(Servers))], Msgs);
wait_for_messages(Servers, QName, Number, Fun, N) ->
    Msgs = dirty_query(Servers, QName, Fun),
    ct:pal("Got messages ~p ~p", [QName, Msgs]),
    %% hack to allow the check to succeed in mixed versions clusters if at
    %% least one node matches the criteria rather than all nodes for
    F = case is_mixed_versions() of
            true ->
                any;
            false ->
                all
        end,
    case lists:F(fun(C) when is_integer(C) ->
                         C == Number;
                    (_) ->
                         false
                 end, Msgs) of
        true ->
            ok;
        _ ->
            timer:sleep(500),
            wait_for_messages(Servers, QName, Number, Fun, N - 1)
    end.

wait_for_messages(Config, Stats) ->
    wait_for_messages(Config, lists:sort(Stats), 60).

wait_for_messages(Config, Stats, 0) ->
    ?assertEqual(Stats,
                 lists:sort(
                   filter_queues(Stats,
                                 rabbit_ct_broker_helpers:rabbitmqctl_list(
                                   Config, 0, ["list_queues", "name", "messages", "messages_ready",
                                               "messages_unacknowledged"]))));
wait_for_messages(Config, Stats, N) ->
    case lists:sort(
           filter_queues(Stats,
                         rabbit_ct_broker_helpers:rabbitmqctl_list(
                           Config, 0, ["list_queues", "name", "messages", "messages_ready",
                                       "messages_unacknowledged"]))) of
        Stats0 when Stats0 == Stats ->
            ok;
        _ ->
            timer:sleep(500),
            wait_for_messages(Config, Stats, N - 1)
    end.

dirty_query(Servers, QName, Fun) ->
    lists:map(
      fun(N) ->
              case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
                  {ok, {_, Msgs}, _} ->
                      Msgs;
                  _E ->
                      undefined
              end
      end, Servers).

ra_name(Q) ->
    binary_to_atom(<<"%2F_", Q/binary>>, utf8).

filter_queues(Expected, Got) ->
    Keys = [K || [K, _, _, _] <- Expected],
    lists:filter(fun([K, _, _, _]) ->
                         lists:member(K, Keys)
                 end, Got).

fifo_machines_use_same_version(Config) ->
    Nodenames = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
    fifo_machines_use_same_version(Config, Nodenames).

fifo_machines_use_same_version(Config, Nodenames)
  when length(Nodenames) >= 1 ->
    [MachineAVersion | OtherMachinesVersions] =
    [(catch rabbit_ct_broker_helpers:rpc(
              Config, Nodename,
              rabbit_fifo, version, []))
     || Nodename <- Nodenames],
    lists:all(fun(V) -> V =:= MachineAVersion end, OtherMachinesVersions).

is_mixed_versions() ->
    not (false == os:getenv("SECONDARY_UMBRELLA")).