summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-12 12:46:08 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-12 12:46:08 +0000
commit174b3de97ccb32b27ca3dc14a699a20e2294a6d8 (patch)
tree7c43c60c8eaaa6148c2c12985db5be794e2999cf /deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
parent92ac3905c35b507aa78a41f0e92ef6d061a33d25 (diff)
parenta49f1d93b0f877e56c63073d8657029149eb932e (diff)
downloadrabbitmq-server-git-174b3de97ccb32b27ca3dc14a699a20e2294a6d8.tar.gz
Stable to default
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl')
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl37
1 files changed, 25 insertions, 12 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
index f79cb1dda1..d03606e63f 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -19,7 +19,8 @@
-include("rabbit_federation.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
--export([set_for/1, for/1, for/2, to_table/1, to_string/1]).
+-export([set_for/1, for/1, for/2, params_to_table/1, params_to_string/1,
+ to_params/2]).
%% For testing
-export([from_set/2, remove_credentials/1]).
@@ -42,15 +43,15 @@ for(X, UpstreamName) ->
{error, not_found} -> []
end.
-to_table(#upstream{original_uri = URI,
- params = Params,
- exchange = X}) ->
+params_to_table(#upstream_params{uri = URI,
+ params = Params,
+ exchange = X}) ->
{table, [{<<"uri">>, longstr, remove_credentials(URI)},
{<<"virtual_host">>, longstr, vhost(Params)},
{<<"exchange">>, longstr, name(X)}]}.
-to_string(#upstream{original_uri = URI,
- exchange = #exchange{name = XName}}) ->
+params_to_string(#upstream_params{uri = URI,
+ exchange = #exchange{name = XName}}) ->
print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]).
remove_credentials(URI) ->
@@ -67,6 +68,14 @@ remove_credentials(URI) ->
"~s://~s~s~s", [pget(scheme, Props), PGet(host, Props),
PortPart, PGet(path, Props)])).
+to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) ->
+ random:seed(now()),
+ URI = lists:nth(random:uniform(length(URIs)), URIs),
+ {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)),
+ #upstream_params{params = Params,
+ uri = URI,
+ exchange = with_name(XNameBin, vhost(Params), X)}.
+
print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
from_set(SetName, X, UpstName) ->
@@ -98,18 +107,22 @@ from_set_element(UpstreamSetElem, X) ->
end.
from_props_connection(U, Name, C, X) ->
- URI = bget(uri, U, C),
- {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)),
- XNameBin = bget(exchange, U, C, name(X)),
- #upstream{params = Params,
- original_uri = URI,
- exchange = with_name(XNameBin, vhost(Params), X),
+ URIParam = bget(uri, U, C),
+ URIs = case URIParam of
+ B when is_binary(B) -> [B];
+ L when is_list(L) -> L
+ end,
+ #upstream{uris = URIs,
+ exchange_name = bget(exchange, U, C, name(X)),
prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH),
reconnect_delay = bget('reconnect-delay', U, C, 1),
max_hops = bget('max-hops', U, C, 1),
expires = bget(expires, U, C, none),
message_ttl = bget('message-ttl', U, C, none),
trust_user_id = bget('trust-user-id', U, C, false),
+ ack_mode = list_to_atom(
+ binary_to_list(
+ bget('ack-mode', U, C, <<"on-confirm">>))),
ha_policy = bget('ha-policy', U, C, none),
name = Name}.