summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
blob: 1f37a74c706e732c1d9a102f2470e869ae434dec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ Federation.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2007-2012 VMware, Inc.  All rights reserved.
%%

-module(rabbit_federation_upstream).

-include("rabbit_federation.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-export([set_for/1, for/1, for/2, to_table/1, to_string/1]).
%% For testing
-export([from_set/2]).

-import(rabbit_misc, [pget/2, pget/3]).
-import(rabbit_federation_util, [name/1, vhost/1]).

%%----------------------------------------------------------------------------

set_for(X) -> rabbit_policy:get(<<"federation-upstream-set">>, X).

for(X) ->
    case set_for(X) of
        {ok, UpstreamSet}  -> from_set(UpstreamSet, X);
        {error, not_found} -> []
    end.

for(X, UpstreamName) ->
    case set_for(X) of
        {ok, UpstreamSet}  -> from_set(UpstreamSet, X, UpstreamName);
        {error, not_found} -> []
    end.

to_table(#upstream{original_uri = URI,
                   params       = Params,
                   exchange = X}) ->
    {table, [{<<"uri">>,          longstr, URI},
             {<<"virtual_host">>, longstr, vhost(Params)},
             {<<"exchange">>,     longstr, name(X)}]}.

to_string(#upstream{original_uri = URI,
                    exchange     = #exchange{name = XName}}) ->
    print("~s on ~s", [rabbit_misc:rs(XName), URI]).

print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).

from_set(SetName, X, UpstName) ->
    rabbit_federation_util:find_upstreams(UpstName, from_set(SetName, X)).

from_set(<<"all">>, X) ->
    Connections = rabbit_runtime_parameters:list(<<"federation-upstream">>),
    Set = [[{<<"upstream">>, pget(key, C)}] || C <- Connections],
    from_set_contents(Set, X);

from_set(SetName, X) ->
    case rabbit_runtime_parameters:value(
           <<"federation-upstream-set">>, SetName) of
        not_found -> [];
        Set       -> from_set_contents(Set, X)
    end.

from_set_contents(Set, X) ->
    Results = [from_set_element(P, X) || P <- Set],
    [R || R <- Results, R =/= not_found].

from_set_element(UpstreamSetElem, X) ->
    Name = bget(upstream, UpstreamSetElem, []),
    case rabbit_runtime_parameters:value(<<"federation-upstream">>, Name) of
        not_found  -> not_found;
        Upstream   -> from_props_connection(UpstreamSetElem, Name, Upstream, X)
    end.

from_props_connection(U, Name, C, X) ->
    URI = bget(uri, U, C),
    {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)),
    XNameBin = bget(exchange, U, C, name(X)),
    #upstream{params          = Params,
              original_uri    = URI,
              exchange        = with_name(XNameBin, vhost(Params), X),
              prefetch_count  = bget('prefetch-count',  U, C, ?DEFAULT_PREFETCH),
              reconnect_delay = bget('reconnect-delay', U, C, 1),
              max_hops        = bget('max-hops',        U, C, 1),
              expires         = bget(expires,           U, C, none),
              message_ttl     = bget('message-ttl',     U, C, none),
              ha_policy       = bget('ha-policy',       U, C, none),
              name            = Name}.

%%----------------------------------------------------------------------------

bget(K, L1, L2) -> bget(K, L1, L2, undefined).

bget(K0, L1, L2, D) ->
    K = a2b(K0),
    case pget(K, L1, undefined) of
        undefined -> pget(K, L2, D);
        Result    -> Result
    end.

a2b(A) -> list_to_binary(atom_to_list(A)).

with_name(XNameBin, VHostBin, X) ->
    X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}.