diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-03-12 12:46:08 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-03-12 12:46:08 +0000 |
commit | 174b3de97ccb32b27ca3dc14a699a20e2294a6d8 (patch) | |
tree | 7c43c60c8eaaa6148c2c12985db5be794e2999cf /deps/rabbitmq_federation/src/rabbit_federation_upstream.erl | |
parent | 92ac3905c35b507aa78a41f0e92ef6d061a33d25 (diff) | |
parent | a49f1d93b0f877e56c63073d8657029149eb932e (diff) | |
download | rabbitmq-server-git-174b3de97ccb32b27ca3dc14a699a20e2294a6d8.tar.gz |
Stable to default
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl')
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_upstream.erl | 37 |
1 files changed, 25 insertions, 12 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index f79cb1dda1..d03606e63f 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -19,7 +19,8 @@ -include("rabbit_federation.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --export([set_for/1, for/1, for/2, to_table/1, to_string/1]). +-export([set_for/1, for/1, for/2, params_to_table/1, params_to_string/1, + to_params/2]). %% For testing -export([from_set/2, remove_credentials/1]). @@ -42,15 +43,15 @@ for(X, UpstreamName) -> {error, not_found} -> [] end. -to_table(#upstream{original_uri = URI, - params = Params, - exchange = X}) -> +params_to_table(#upstream_params{uri = URI, + params = Params, + exchange = X}) -> {table, [{<<"uri">>, longstr, remove_credentials(URI)}, {<<"virtual_host">>, longstr, vhost(Params)}, {<<"exchange">>, longstr, name(X)}]}. -to_string(#upstream{original_uri = URI, - exchange = #exchange{name = XName}}) -> +params_to_string(#upstream_params{uri = URI, + exchange = #exchange{name = XName}}) -> print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]). remove_credentials(URI) -> @@ -67,6 +68,14 @@ remove_credentials(URI) -> "~s://~s~s~s", [pget(scheme, Props), PGet(host, Props), PortPart, PGet(path, Props)])). +to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) -> + random:seed(now()), + URI = lists:nth(random:uniform(length(URIs)), URIs), + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), + #upstream_params{params = Params, + uri = URI, + exchange = with_name(XNameBin, vhost(Params), X)}. + print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). from_set(SetName, X, UpstName) -> @@ -98,18 +107,22 @@ from_set_element(UpstreamSetElem, X) -> end. from_props_connection(U, Name, C, X) -> - URI = bget(uri, U, C), - {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), - XNameBin = bget(exchange, U, C, name(X)), - #upstream{params = Params, - original_uri = URI, - exchange = with_name(XNameBin, vhost(Params), X), + URIParam = bget(uri, U, C), + URIs = case URIParam of + B when is_binary(B) -> [B]; + L when is_list(L) -> L + end, + #upstream{uris = URIs, + exchange_name = bget(exchange, U, C, name(X)), prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH), reconnect_delay = bget('reconnect-delay', U, C, 1), max_hops = bget('max-hops', U, C, 1), expires = bget(expires, U, C, none), message_ttl = bget('message-ttl', U, C, none), trust_user_id = bget('trust-user-id', U, C, false), + ack_mode = list_to_atom( + binary_to_list( + bget('ack-mode', U, C, <<"on-confirm">>))), ha_policy = bget('ha-policy', U, C, none), name = Name}. |