From 844ff645b680ea365b7dd6e491f055b8a27645de Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 4 Jun 2013 15:46:43 +0100 Subject: Allow specifying a different queue name for the upstream, remove use of a fake exchange, start to write tests, remember to sepcify drain. --- .../src/rabbit_federation_upstream.erl | 35 ++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl') diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index c3bf9e05ee..4dd3a63655 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -25,7 +25,7 @@ -export([from_set/2, remove_credentials/1]). -import(rabbit_misc, [pget/2, pget/3]). --import(rabbit_federation_util, [name/1, vhost/1]). +-import(rabbit_federation_util, [name/1, vhost/1, r/1]). %%---------------------------------------------------------------------------- @@ -43,16 +43,17 @@ for(XorQ, UpstreamName) -> {error, not_found} -> [] end. -params_to_table(#upstream_params{uri = URI, - params = Params, - exchange = X}) -> +params_to_table(#upstream_params{uri = URI, + params = Params, + x_or_q = XorQ}) -> {table, [{<<"uri">>, longstr, remove_credentials(URI)}, {<<"virtual_host">>, longstr, vhost(Params)}, - {<<"exchange">>, longstr, name(X)}]}. + %% TODO derp + {<<"exchange">>, longstr, name(XorQ)}]}. -params_to_string(#upstream_params{uri = URI, - exchange = #exchange{name = XName}}) -> - print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]). +params_to_string(#upstream_params{uri = URI, + x_or_q = XorQ}) -> + print("~s on ~s", [rabbit_misc:rs(r(XorQ)), remove_credentials(URI)]). remove_credentials(URI) -> Props = uri_parser:parse(binary_to_list(URI), @@ -68,13 +69,13 @@ remove_credentials(URI) -> "~s://~s~s~s", [pget(scheme, Props), PGet(host, Props), PortPart, PGet(path, Props)])). -to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) -> +to_params(Upstream = #upstream{uris = URIs}, XorQ) -> random:seed(now()), URI = lists:nth(random:uniform(length(URIs)), URIs), - {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), - #upstream_params{params = Params, - uri = URI, - exchange = with_name(XNameBin, vhost(Params), X)}. + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)), + #upstream_params{params = Params, + uri = URI, + x_or_q = with_name(Upstream, vhost(Params), XorQ)}. print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). @@ -115,6 +116,7 @@ from_props_connection(U, Name, C, XorQ) -> end, #upstream{uris = URIs, exchange_name = bget(exchange, U, C, name(XorQ)), + queue_name = bget(queue, U, C, name(XorQ)), prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH), reconnect_delay = bget('reconnect-delay', U, C, 1), max_hops = bget('max-hops', U, C, 1), @@ -140,5 +142,8 @@ bget(K0, L1, L2, D) -> a2b(A) -> list_to_binary(atom_to_list(A)). -with_name(XNameBin, VHostBin, X) -> - X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}. +with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) -> + X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}; + +with_name(#upstream{queue_name = QNameBin}, VHostBin, Q = #amqqueue{}) -> + Q#amqqueue{name = rabbit_misc:r(VHostBin, queue, QNameBin)}. -- cgit v1.2.1