summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-04 15:46:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-04 15:46:43 +0100
commit844ff645b680ea365b7dd6e491f055b8a27645de (patch)
tree2da578775757d6fe8d899ff9d46acb64e99b7038 /deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
parent68a0b5f5bad49c6106cd289e6bae2335b29b1a3d (diff)
downloadrabbitmq-server-git-844ff645b680ea365b7dd6e491f055b8a27645de.tar.gz
Allow specifying a different queue name for the upstream, remove use of a fake exchange, start to write tests, remember to sepcify drain.
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_upstream.erl')
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl35
1 files changed, 20 insertions, 15 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
index c3bf9e05ee..4dd3a63655 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -25,7 +25,7 @@
-export([from_set/2, remove_credentials/1]).
-import(rabbit_misc, [pget/2, pget/3]).
--import(rabbit_federation_util, [name/1, vhost/1]).
+-import(rabbit_federation_util, [name/1, vhost/1, r/1]).
%%----------------------------------------------------------------------------
@@ -43,16 +43,17 @@ for(XorQ, UpstreamName) ->
{error, not_found} -> []
end.
-params_to_table(#upstream_params{uri = URI,
- params = Params,
- exchange = X}) ->
+params_to_table(#upstream_params{uri = URI,
+ params = Params,
+ x_or_q = XorQ}) ->
{table, [{<<"uri">>, longstr, remove_credentials(URI)},
{<<"virtual_host">>, longstr, vhost(Params)},
- {<<"exchange">>, longstr, name(X)}]}.
+ %% TODO derp
+ {<<"exchange">>, longstr, name(XorQ)}]}.
-params_to_string(#upstream_params{uri = URI,
- exchange = #exchange{name = XName}}) ->
- print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]).
+params_to_string(#upstream_params{uri = URI,
+ x_or_q = XorQ}) ->
+ print("~s on ~s", [rabbit_misc:rs(r(XorQ)), remove_credentials(URI)]).
remove_credentials(URI) ->
Props = uri_parser:parse(binary_to_list(URI),
@@ -68,13 +69,13 @@ remove_credentials(URI) ->
"~s://~s~s~s", [pget(scheme, Props), PGet(host, Props),
PortPart, PGet(path, Props)])).
-to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) ->
+to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
random:seed(now()),
URI = lists:nth(random:uniform(length(URIs)), URIs),
- {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)),
- #upstream_params{params = Params,
- uri = URI,
- exchange = with_name(XNameBin, vhost(Params), X)}.
+ {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)),
+ #upstream_params{params = Params,
+ uri = URI,
+ x_or_q = with_name(Upstream, vhost(Params), XorQ)}.
print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
@@ -115,6 +116,7 @@ from_props_connection(U, Name, C, XorQ) ->
end,
#upstream{uris = URIs,
exchange_name = bget(exchange, U, C, name(XorQ)),
+ queue_name = bget(queue, U, C, name(XorQ)),
prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH),
reconnect_delay = bget('reconnect-delay', U, C, 1),
max_hops = bget('max-hops', U, C, 1),
@@ -140,5 +142,8 @@ bget(K0, L1, L2, D) ->
a2b(A) -> list_to_binary(atom_to_list(A)).
-with_name(XNameBin, VHostBin, X) ->
- X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}.
+with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) ->
+ X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)};
+
+with_name(#upstream{queue_name = QNameBin}, VHostBin, Q = #amqqueue{}) ->
+ Q#amqqueue{name = rabbit_misc:r(VHostBin, queue, QNameBin)}.