From 3453e5d9312ab9eb8b68043efe184ef49f3b0313 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 1 Feb 2013 14:30:31 +0000 Subject: ack-mode --- deps/rabbitmq_federation/src/rabbit_federation_upstream.erl | 3 +++ 1 file changed, 3 insertions(+) (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl') diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 5a4a61b165..0d1c3a7565 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -96,6 +96,9 @@ from_props_connection(U, Name, C, X) -> expires = bget(expires, U, C, none), message_ttl = bget('message-ttl', U, C, none), trust_user_id = bget('trust-user-id', U, C, false), + ack_mode = list_to_atom( + binary_to_list( + bget('ack-mode', U, C, <<"on-confirm">>))), ha_policy = bget('ha-policy', U, C, none), name = Name}. -- cgit v1.2.1 From a01375272de6062e84b686f2e62a7ada65391418 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 1 Feb 2013 17:32:36 +0000 Subject: Allow URI to be multiple. --- deps/rabbitmq_federation/src/rabbit_federation_upstream.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl') diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 5a4a61b165..7eb5f39f09 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -84,7 +84,13 @@ from_set_element(UpstreamSetElem, X) -> end. from_props_connection(U, Name, C, X) -> - URI = bget(uri, U, C), + URIParam = bget(uri, U, C), + URIs = case URIParam of + B when is_binary(B) -> [B]; + L when is_list(L) -> L + end, + random:seed(now()), + URI = lists:nth(random:uniform(length(URIs)), URIs), {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), XNameBin = bget(exchange, U, C, name(X)), #upstream{params = Params, -- cgit v1.2.1 From 25dbc11929abc3f8463fa205ae2ba5118dda27b6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 4 Feb 2013 17:11:39 +0000 Subject: Rather fiddly refactoring to split up #upstream into #upstream{} and #upstream_params{} - the latter being instantiated by the link process, and representing a choice having been made amongst the URIs in the #upstream{}. So we now should fail over randomly to a new upstream from the set. --- .../src/rabbit_federation_upstream.erl | 30 ++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl') diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index 7eb5f39f09..c9950c37f2 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -19,7 +19,8 @@ -include("rabbit_federation.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --export([set_for/1, for/1, for/2, to_table/1, to_string/1]). +-export([set_for/1, for/1, for/2, params_to_table/1, params_to_string/1, + to_params/2]). %% For testing -export([from_set/2]). @@ -42,17 +43,25 @@ for(X, UpstreamName) -> {error, not_found} -> [] end. -to_table(#upstream{original_uri = URI, - params = Params, - exchange = X}) -> +params_to_table(#upstream_params{uri = URI, + params = Params, + exchange = X}) -> {table, [{<<"uri">>, longstr, URI}, {<<"virtual_host">>, longstr, vhost(Params)}, {<<"exchange">>, longstr, name(X)}]}. -to_string(#upstream{original_uri = URI, - exchange = #exchange{name = XName}}) -> +params_to_string(#upstream_params{uri = URI, + exchange = #exchange{name = XName}}) -> print("~s on ~s", [rabbit_misc:rs(XName), URI]). +to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) -> + random:seed(now()), + URI = lists:nth(random:uniform(length(URIs)), URIs), + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), + #upstream_params{params = Params, + uri = URI, + exchange = with_name(XNameBin, vhost(Params), X)}. + print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). from_set(SetName, X, UpstName) -> @@ -89,13 +98,8 @@ from_props_connection(U, Name, C, X) -> B when is_binary(B) -> [B]; L when is_list(L) -> L end, - random:seed(now()), - URI = lists:nth(random:uniform(length(URIs)), URIs), - {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), - XNameBin = bget(exchange, U, C, name(X)), - #upstream{params = Params, - original_uri = URI, - exchange = with_name(XNameBin, vhost(Params), X), + #upstream{uris = URIs, + exchange_name = bget(exchange, U, C, name(X)), prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH), reconnect_delay = bget('reconnect-delay', U, C, 1), max_hops = bget('max-hops', U, C, 1), -- cgit v1.2.1