summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_queue_master_location_misc.erl
blob: 37698e184fdbd0f323025c4816938f2a9a5d37c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

-module(rabbit_queue_master_location_misc).

-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").

-export([lookup_master/2,
         lookup_queue/2,
         get_location/1,
         get_location_mod_by_config/1,
         get_location_mod_by_args/1,
         get_location_mod_by_policy/1,
         all_nodes/1]).

-spec lookup_master(binary(), binary()) -> {ok, node()} | {error, not_found}.
lookup_master(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
                                            is_binary(VHostPath) ->
    QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
    case rabbit_amqqueue:lookup(QueueR) of
        {ok, Queue} when ?amqqueue_has_valid_pid(Queue) ->
            Pid = amqqueue:get_pid(Queue),
            {ok, node(Pid)};
        Error -> Error
    end.

lookup_queue(QueueNameBin, VHostPath) when is_binary(QueueNameBin),
                                           is_binary(VHostPath) ->
    QueueR = rabbit_misc:r(VHostPath, queue, QueueNameBin),
    case rabbit_amqqueue:lookup(QueueR) of
        Reply = {ok, Queue} when ?is_amqqueue(Queue) ->
            Reply;
        Error ->
            Error
    end.

get_location(Queue) when ?is_amqqueue(Queue) ->
    Reply1 = case get_location_mod_by_args(Queue) of
                 _Err1 = {error, _} ->
                     case get_location_mod_by_policy(Queue) of
                         _Err2 = {error, _} ->
                             case get_location_mod_by_config(Queue) of
                                 Err3 = {error, _}      -> Err3;
                                 Reply0 = {ok, _Module} -> Reply0
                             end;
                         Reply0 = {ok, _Module} -> Reply0
                     end;
                 Reply0 = {ok, _Module} -> Reply0
             end,

    case Reply1 of
        {ok, CB} -> CB:queue_master_location(Queue);
        Error    -> Error
    end.

get_location_mod_by_args(Queue) when ?is_amqqueue(Queue) ->
    Args = amqqueue:get_arguments(Queue),
    case rabbit_misc:table_lookup(Args, <<"x-queue-master-locator">>) of
        {_Type, Strategy}  ->
            case rabbit_queue_location_validator:validate_strategy(Strategy) of
                Reply = {ok, _CB} -> Reply;
                Error             -> Error
            end;
        _ -> {error, "x-queue-master-locator undefined"}
    end.

get_location_mod_by_policy(Queue) when ?is_amqqueue(Queue) ->
    case rabbit_policy:get(<<"queue-master-locator">> , Queue) of
        undefined ->  {error, "queue-master-locator policy undefined"};
        Strategy  ->
            case rabbit_queue_location_validator:validate_strategy(Strategy) of
                Reply = {ok, _CB} -> Reply;
                Error             -> Error
            end
    end.

get_location_mod_by_config(Queue) when ?is_amqqueue(Queue) ->
    case application:get_env(rabbit, queue_master_locator) of
        {ok, Strategy} ->
            case rabbit_queue_location_validator:validate_strategy(Strategy) of
                Reply = {ok, _CB} -> Reply;
                Error             -> Error
            end;
        _ -> {error, "queue_master_locator undefined"}
    end.

all_nodes(Queue) when ?is_amqqueue(Queue) ->
    handle_is_mirrored_ha_nodes(rabbit_mirror_queue_misc:is_mirrored_ha_nodes(Queue), Queue).

handle_is_mirrored_ha_nodes(false, _Queue) ->
    % Note: ha-mode is NOT 'nodes' - it is either exactly or all, which means
    % that any node in the cluster is eligible to be the new queue master node
    rabbit_nodes:all_running();
handle_is_mirrored_ha_nodes(true, Queue) ->
    % Note: ha-mode is 'nodes', which explicitly specifies allowed nodes.
    % We must use suggested_queue_nodes to get that list of nodes as the
    % starting point for finding the queue master location
    handle_suggested_queue_nodes(rabbit_mirror_queue_misc:suggested_queue_nodes(Queue)).

handle_suggested_queue_nodes({_MNode, []}) ->
    rabbit_nodes:all_running();
handle_suggested_queue_nodes({MNode, SNodes}) ->
    [MNode | SNodes].