summaryrefslogtreecommitdiff
path: root/deps
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-06-04 15:46:43 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-06-04 15:46:43 +0100
commit844ff645b680ea365b7dd6e491f055b8a27645de (patch)
tree2da578775757d6fe8d899ff9d46acb64e99b7038 /deps
parent68a0b5f5bad49c6106cd289e6bae2335b29b1a3d (diff)
downloadrabbitmq-server-git-844ff645b680ea365b7dd6e491f055b8a27645de.tar.gz
Allow specifying a different queue name for the upstream, remove use of a fake exchange, start to write tests, remember to sepcify drain.
Diffstat (limited to 'deps')
-rw-r--r--deps/rabbitmq_federation/etc/rabbit-test.sh2
-rw-r--r--deps/rabbitmq_federation/include/rabbit_federation.hrl3
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_link.erl10
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl5
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_parameters.erl1
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl25
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_upstream.erl35
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_util.erl7
-rw-r--r--deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl63
-rw-r--r--deps/rabbitmq_federation/test/src/rabbit_federation_test.erl44
-rw-r--r--deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl3
-rw-r--r--deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl65
12 files changed, 184 insertions, 79 deletions
diff --git a/deps/rabbitmq_federation/etc/rabbit-test.sh b/deps/rabbitmq_federation/etc/rabbit-test.sh
index e64fe99e9d..11d3c99357 100644
--- a/deps/rabbitmq_federation/etc/rabbit-test.sh
+++ b/deps/rabbitmq_federation/etc/rabbit-test.sh
@@ -6,7 +6,7 @@ $CTL set_parameter federation-upstream localhost '{"uri": "amqp://"}'
# We will test the guest:guest gets stripped out in user_id_test
$CTL set_parameter federation-upstream local5673 '{"uri": "amqp://guest:guest@localhost:5673"}'
-$CTL set_parameter federation-upstream-set upstream '[{"upstream": "localhost", "exchange": "upstream"}]'
+$CTL set_parameter federation-upstream-set upstream '[{"upstream": "localhost", "exchange": "upstream", "queue": "upstream"}]'
$CTL set_parameter federation-upstream-set localhost '[{"upstream": "localhost"}]'
$CTL set_parameter federation-upstream-set upstream12 '[{"upstream": "localhost", "exchange": "upstream"},
{"upstream": "localhost", "exchange": "upstream2"}]'
diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl
index 86815a006d..a90bb062c9 100644
--- a/deps/rabbitmq_federation/include/rabbit_federation.hrl
+++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl
@@ -16,6 +16,7 @@
-record(upstream, {uris,
exchange_name,
+ queue_name,
prefetch_count,
max_hops,
reconnect_delay,
@@ -26,7 +27,7 @@
ha_policy,
name}).
--record(upstream_params, {uri, params, exchange}).
+-record(upstream_params, {uri, params, x_or_q}).
-define(ROUTING_HEADER, <<"x-received-from">>).
-define(BINDING_HEADER, <<"x-bound-from">>).
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_link.erl
index 83cabac753..0685359026 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_link.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_link.erl
@@ -358,7 +358,7 @@ binding_op(UpdateFun, Cmd, B = #binding{args = Args},
bind_cmd(Type, #binding{key = Key, args = Args},
State = #state{internal_exchange = IntXNameBin,
upstream_params = UpstreamParams}) ->
- #upstream_params{exchange = X} = UpstreamParams,
+ #upstream_params{x_or_q = X} = UpstreamParams,
case update_binding(Args, State) of
ignore -> ignore;
NewArgs -> bind_cmd0(Type, name(X), IntXNameBin, Key, NewArgs)
@@ -564,8 +564,8 @@ consume_from_upstream_queue(
expires = Expiry,
message_ttl = TTL,
ha_policy = HA} = Upstream,
- #upstream_params{exchange = X,
- params = Params} = UParams,
+ #upstream_params{x_or_q = X,
+ params = Params} = UParams,
Q = upstream_queue_name(name(X), vhost(Params), DownXName),
Args = [Arg || {_K, _T, V} = Arg <- [{<<"x-expires">>, long, Expiry},
{<<"x-message-ttl">>, long, TTL},
@@ -588,7 +588,7 @@ ensure_upstream_bindings(State = #state{upstream = Upstream,
channel = Ch,
downstream_exchange = DownXName,
queue = Q}, Bindings) ->
- #upstream_params{exchange = X, params = Params} = UParams,
+ #upstream_params{x_or_q = X, params = Params} = UParams,
OldSuffix = rabbit_federation_db:get_active_suffix(
DownXName, Upstream, <<"A">>),
Suffix = case OldSuffix of
@@ -611,7 +611,7 @@ ensure_upstream_bindings(State = #state{upstream = Upstream,
ensure_upstream_exchange(#state{upstream_params = UParams,
connection = Conn,
channel = Ch}) ->
- #upstream_params{exchange = X} = UParams,
+ #upstream_params{x_or_q = X} = UParams,
#exchange{type = Type,
durable = Durable,
auto_delete = AutoDelete,
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
index 24501c07d1..72690f698c 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
@@ -104,10 +104,7 @@ spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
[rabbit_federation_link]};
spec(Upstream = #upstream{reconnect_delay = Delay}, Q = #amqqueue{}) ->
- %% TODO don't depend on an exchange here!
- FakeX = rabbit_exchange:lookup_or_die(
- rabbit_misc:r(<<"/">>, exchange, <<"">>)),
- Params = rabbit_federation_upstream:to_params(Upstream, FakeX),
+ Params = rabbit_federation_upstream:to_params(Upstream, Q),
{Upstream, {rabbit_federation_queue_link, start_link, [{Params, Q}]},
{permanent, Delay}, ?MAX_WAIT, worker,
[rabbit_federation_queue_link]}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
index 43ed020b54..edc79bd7f0 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl
@@ -90,6 +90,7 @@ adjust(Thing) ->
shared_validation() ->
[{<<"exchange">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"reconnect-delay">>,fun rabbit_parameter_validation:number/2, optional},
{<<"max-hops">>, fun rabbit_parameter_validation:number/2, optional},
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
index eec59fb9ed..c18ed1b2c2 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl
@@ -26,6 +26,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-import(rabbit_federation_util, [name/1]).
+
-record(not_started, {queue, run, params}).
-record(state, {queue, run, conn, ch, dconn, dch, credit, ctag}).
@@ -65,7 +67,7 @@ federation_up() ->
%%----------------------------------------------------------------------------
-init({#upstream_params{params = Params}, Queue = #amqqueue{name = QName}}) ->
+init({Params, Queue = #amqqueue{name = QName}}) ->
join(rabbit_federation_queues),
join({rabbit_federation_queue, QName}),
gen_server2:cast(self(), maybe_go),
@@ -90,7 +92,8 @@ handle_cast(go, State) ->
handle_cast(run, State = #state{ch = Ch, run = false, ctag = CTag}) ->
amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag,
- credit = ?CREDIT}),
+ credit = ?CREDIT,
+ drain = false}),
{noreply, State#state{run = true, credit = ?CREDIT}};
handle_cast(run, State = #not_started{}) ->
@@ -109,7 +112,8 @@ handle_cast(stop, State = #not_started{}) ->
handle_cast(stop, State = #state{ch = Ch, ctag = CTag}) ->
amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag,
- credit = 0}),
+ credit = 0,
+ drain = false}),
{noreply, State#state{run = false, credit = 0}};
handle_cast(basic_get, State = #not_started{}) ->
@@ -119,7 +123,8 @@ handle_cast(basic_get, State = #state{ch = Ch, credit = Credit, ctag = CTag}) ->
Credit1 = Credit + 1,
amqp_channel:cast(
Ch, #'basic.credit'{consumer_tag = CTag,
- credit = Credit1}),
+ credit = Credit1,
+ drain = false}),
{noreply, State#state{credit = Credit1}};
handle_cast(Msg, State) ->
@@ -148,7 +153,8 @@ handle_info({#'basic.deliver'{}, Msg},
More = ?CREDIT - ?CREDIT_MORE_AT,
amqp_channel:cast(
Ch, #'basic.credit'{consumer_tag = CTag,
- credit = More}),
+ credit = More,
+ drain = false}),
I - 1 + More;
I ->
I - 1
@@ -167,9 +173,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-go(#not_started{run = Run,
- params = Params,
- queue = Queue = #amqqueue{name = QName}}) ->
+go(#not_started{run = Run,
+ params = #upstream_params{x_or_q = UQueue,
+ params = Params},
+ queue = Queue}) ->
Credit = case Run of
true -> ?CREDIT;
false -> 0
@@ -181,7 +188,7 @@ go(#not_started{run = Run,
{ok, DCh} = amqp_connection:open_channel(DConn),
#'basic.consume_ok'{consumer_tag = CTag} =
amqp_channel:call(Ch, #'basic.consume'{
- queue = QName#resource.name,
+ queue = name(UQueue),
no_ack = true,
arguments = [{<<"x-purpose">>, longstr, <<"federation">>},
{<<"x-credit">>, table,
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
index c3bf9e05ee..4dd3a63655 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl
@@ -25,7 +25,7 @@
-export([from_set/2, remove_credentials/1]).
-import(rabbit_misc, [pget/2, pget/3]).
--import(rabbit_federation_util, [name/1, vhost/1]).
+-import(rabbit_federation_util, [name/1, vhost/1, r/1]).
%%----------------------------------------------------------------------------
@@ -43,16 +43,17 @@ for(XorQ, UpstreamName) ->
{error, not_found} -> []
end.
-params_to_table(#upstream_params{uri = URI,
- params = Params,
- exchange = X}) ->
+params_to_table(#upstream_params{uri = URI,
+ params = Params,
+ x_or_q = XorQ}) ->
{table, [{<<"uri">>, longstr, remove_credentials(URI)},
{<<"virtual_host">>, longstr, vhost(Params)},
- {<<"exchange">>, longstr, name(X)}]}.
+ %% TODO derp
+ {<<"exchange">>, longstr, name(XorQ)}]}.
-params_to_string(#upstream_params{uri = URI,
- exchange = #exchange{name = XName}}) ->
- print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]).
+params_to_string(#upstream_params{uri = URI,
+ x_or_q = XorQ}) ->
+ print("~s on ~s", [rabbit_misc:rs(r(XorQ)), remove_credentials(URI)]).
remove_credentials(URI) ->
Props = uri_parser:parse(binary_to_list(URI),
@@ -68,13 +69,13 @@ remove_credentials(URI) ->
"~s://~s~s~s", [pget(scheme, Props), PGet(host, Props),
PortPart, PGet(path, Props)])).
-to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) ->
+to_params(Upstream = #upstream{uris = URIs}, XorQ) ->
random:seed(now()),
URI = lists:nth(random:uniform(length(URIs)), URIs),
- {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)),
- #upstream_params{params = Params,
- uri = URI,
- exchange = with_name(XNameBin, vhost(Params), X)}.
+ {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)),
+ #upstream_params{params = Params,
+ uri = URI,
+ x_or_q = with_name(Upstream, vhost(Params), XorQ)}.
print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)).
@@ -115,6 +116,7 @@ from_props_connection(U, Name, C, XorQ) ->
end,
#upstream{uris = URIs,
exchange_name = bget(exchange, U, C, name(XorQ)),
+ queue_name = bget(queue, U, C, name(XorQ)),
prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH),
reconnect_delay = bget('reconnect-delay', U, C, 1),
max_hops = bget('max-hops', U, C, 1),
@@ -140,5 +142,8 @@ bget(K0, L1, L2, D) ->
a2b(A) -> list_to_binary(atom_to_list(A)).
-with_name(XNameBin, VHostBin, X) ->
- X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}.
+with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) ->
+ X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)};
+
+with_name(#upstream{queue_name = QNameBin}, VHostBin, Q = #amqqueue{}) ->
+ Q#amqqueue{name = rabbit_misc:r(VHostBin, queue, QNameBin)}.
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_util.erl
index 1ee2792bb8..1f818acc14 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_util.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_util.erl
@@ -21,7 +21,7 @@
-include("rabbit_federation.hrl").
-export([local_params/2, local_nodename/1, should_forward/2, find_upstreams/2]).
--export([validate_arg/3, fail/2, name/1, vhost/1]).
+-export([validate_arg/3, fail/2, name/1, vhost/1, r/1]).
-import(rabbit_misc, [pget_or_die/2, pget/3]).
@@ -77,10 +77,13 @@ fail(Fmt, Args) -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args).
name( #resource{name = XName}) -> XName;
name(#exchange{name = #resource{name = XName}}) -> XName;
-name(#amqqueue{name = #resource{name = XName}}) -> XName.
+name(#amqqueue{name = #resource{name = QName}}) -> QName.
vhost( #resource{virtual_host = VHost}) -> VHost;
vhost(#exchange{name = #resource{virtual_host = VHost}}) -> VHost;
vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost;
vhost( #amqp_params_direct{virtual_host = VHost}) -> VHost;
vhost(#amqp_params_network{virtual_host = VHost}) -> VHost.
+
+r(#exchange{name = XName}) -> XName;
+r(#amqqueue{name = QName}) -> QName.
diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl
new file mode 100644
index 0000000000..42732a4aa1
--- /dev/null
+++ b/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl
@@ -0,0 +1,63 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ Federation.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_federation_queue_test).
+
+-include("rabbit_federation.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-import(rabbit_misc, [pget/2]).
+-import(rabbit_federation_util, [name/1]).
+
+-import(rabbit_federation_test_util, [publish_expect/5]).
+
+-define(UPSTREAM_DOWNSTREAM, [q(<<"upstream">>),
+ q(<<"fed.downstream">>)]).
+
+simple_test() ->
+ with_ch(
+ fun (Ch) ->
+ publish_expect(Ch, <<>>, <<"upstream">>, <<"fed.downstream">>,
+ <<"HELLO">>)
+ end, [q(<<"upstream">>),
+ q(<<"fed.downstream">>)]).
+
+%%----------------------------------------------------------------------------
+
+with_ch(Fun, Qs) ->
+ {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ declare_all(Ch, Qs),
+ %%assert_status(Qs),
+ Fun(Ch),
+ delete_all(Ch, Qs),
+ amqp_connection:close(Conn),
+ ok.
+
+declare_all(Ch, Qs) -> [declare_queue(Ch, Q) || Q <- Qs].
+delete_all(Ch, Qs) ->
+ [delete_queue(Ch, Q) || #'queue.declare'{queue = Q} <- Qs].
+
+declare_queue(Ch, Q) ->
+ amqp_channel:call(Ch, Q).
+
+delete_queue(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+q(Name) ->
+ #'queue.declare'{queue = Name,
+ durable = true}.
diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl
index b3d88755d2..02a8890cc1 100644
--- a/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl
+++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl
@@ -23,6 +23,9 @@
-import(rabbit_misc, [pget/2]).
-import(rabbit_federation_util, [name/1]).
+-import(rabbit_federation_test_util,
+ [publish_expect/5, publish/4, expect/3, expect_empty/2]).
+
-define(UPSTREAM_DOWNSTREAM, [x(<<"upstream">>),
x(<<"fed.downstream">>)]).
@@ -612,47 +615,6 @@ delete_exchange(Ch, X) ->
delete_queue(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
-publish(Ch, X, Key, Payload) when is_binary(Payload) ->
- publish(Ch, X, Key, #amqp_msg{payload = Payload});
-
-publish(Ch, X, Key, Msg = #amqp_msg{}) ->
- %% The trouble is that we transmit bindings upstream asynchronously...
- timer:sleep(5000),
- amqp_channel:call(Ch, #'basic.publish'{exchange = X,
- routing_key = Key}, Msg).
-
-
-expect(Ch, Q, Fun) when is_function(Fun) ->
- amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
- no_ack = true}, self()),
- receive
- #'basic.consume_ok'{consumer_tag = CTag} -> ok
- end,
- Fun(),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag});
-
-expect(Ch, Q, Payloads) ->
- expect(Ch, Q, fun() -> expect(Payloads) end).
-
-expect([]) ->
- ok;
-expect(Payloads) ->
- receive
- {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
- case lists:member(Payload, Payloads) of
- true -> expect(Payloads -- [Payload]);
- false -> throw({expected, Payloads, actual, Payload})
- end
- end.
-
-publish_expect(Ch, X, Key, Q, Payload) ->
- publish(Ch, X, Key, Payload),
- expect(Ch, Q, [Payload]).
-
-expect_empty(Ch, Q) ->
- ?assertMatch(#'basic.get_empty'{},
- amqp_channel:call(Ch, #'basic.get'{ queue = Q })).
-
assert_bindings(Nodename, X, BindingsExp) ->
Bindings0 = rpc:call(n(Nodename), rabbit_binding, list_for_source, [r(X)]),
BindingsAct = [Key || #binding{key = Key} <- Bindings0],
diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl
index 47a3248c89..0bc0682940 100644
--- a/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl
+++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl
@@ -20,7 +20,8 @@
all_tests() ->
ok = eunit:test(tests(rabbit_federation_unit_test, 60), [verbose]),
- ok = eunit:test(tests(rabbit_federation_test, 60), [verbose]).
+ ok = eunit:test(tests(rabbit_federation_test, 60), [verbose]),
+ ok = eunit:test(tests(rabbit_federation_queue_test, 60), [verbose]).
tests(Module, Timeout) ->
{foreach, fun() -> ok end,
diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl
new file mode 100644
index 0000000000..2db4e305b5
--- /dev/null
+++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl
@@ -0,0 +1,65 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ Federation.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_federation_test_util).
+
+-include("rabbit_federation.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+publish(Ch, X, Key, Payload) when is_binary(Payload) ->
+ publish(Ch, X, Key, #amqp_msg{payload = Payload});
+
+publish(Ch, X, Key, Msg = #amqp_msg{}) ->
+ %% The trouble is that we transmit bindings upstream asynchronously...
+ timer:sleep(5000),
+ amqp_channel:call(Ch, #'basic.publish'{exchange = X,
+ routing_key = Key}, Msg).
+
+
+expect(Ch, Q, Fun) when is_function(Fun) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = true}, self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = CTag} -> ok
+ end,
+ Fun(),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag});
+
+expect(Ch, Q, Payloads) ->
+ expect(Ch, Q, fun() -> expect(Payloads) end).
+
+expect([]) ->
+ ok;
+expect(Payloads) ->
+ receive
+ {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
+ case lists:member(Payload, Payloads) of
+ true -> expect(Payloads -- [Payload]);
+ false -> throw({expected, Payloads, actual, Payload})
+ end
+ end.
+
+publish_expect(Ch, X, Key, Q, Payload) ->
+ publish(Ch, X, Key, Payload),
+ expect(Ch, Q, [Payload]).
+
+expect_empty(Ch, Q) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{ queue = Q })).
+