diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-06-04 15:46:43 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-06-04 15:46:43 +0100 |
commit | 844ff645b680ea365b7dd6e491f055b8a27645de (patch) | |
tree | 2da578775757d6fe8d899ff9d46acb64e99b7038 | |
parent | 68a0b5f5bad49c6106cd289e6bae2335b29b1a3d (diff) | |
download | rabbitmq-server-git-844ff645b680ea365b7dd6e491f055b8a27645de.tar.gz |
Allow specifying a different queue name for the upstream, remove use of a fake exchange, start to write tests, remember to sepcify drain.
12 files changed, 184 insertions, 79 deletions
diff --git a/deps/rabbitmq_federation/etc/rabbit-test.sh b/deps/rabbitmq_federation/etc/rabbit-test.sh index e64fe99e9d..11d3c99357 100644 --- a/deps/rabbitmq_federation/etc/rabbit-test.sh +++ b/deps/rabbitmq_federation/etc/rabbit-test.sh @@ -6,7 +6,7 @@ $CTL set_parameter federation-upstream localhost '{"uri": "amqp://"}' # We will test the guest:guest gets stripped out in user_id_test $CTL set_parameter federation-upstream local5673 '{"uri": "amqp://guest:guest@localhost:5673"}' -$CTL set_parameter federation-upstream-set upstream '[{"upstream": "localhost", "exchange": "upstream"}]' +$CTL set_parameter federation-upstream-set upstream '[{"upstream": "localhost", "exchange": "upstream", "queue": "upstream"}]' $CTL set_parameter federation-upstream-set localhost '[{"upstream": "localhost"}]' $CTL set_parameter federation-upstream-set upstream12 '[{"upstream": "localhost", "exchange": "upstream"}, {"upstream": "localhost", "exchange": "upstream2"}]' diff --git a/deps/rabbitmq_federation/include/rabbit_federation.hrl b/deps/rabbitmq_federation/include/rabbit_federation.hrl index 86815a006d..a90bb062c9 100644 --- a/deps/rabbitmq_federation/include/rabbit_federation.hrl +++ b/deps/rabbitmq_federation/include/rabbit_federation.hrl @@ -16,6 +16,7 @@ -record(upstream, {uris, exchange_name, + queue_name, prefetch_count, max_hops, reconnect_delay, @@ -26,7 +27,7 @@ ha_policy, name}). --record(upstream_params, {uri, params, exchange}). +-record(upstream_params, {uri, params, x_or_q}). -define(ROUTING_HEADER, <<"x-received-from">>). -define(BINDING_HEADER, <<"x-bound-from">>). diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_link.erl index 83cabac753..0685359026 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link.erl @@ -358,7 +358,7 @@ binding_op(UpdateFun, Cmd, B = #binding{args = Args}, bind_cmd(Type, #binding{key = Key, args = Args}, State = #state{internal_exchange = IntXNameBin, upstream_params = UpstreamParams}) -> - #upstream_params{exchange = X} = UpstreamParams, + #upstream_params{x_or_q = X} = UpstreamParams, case update_binding(Args, State) of ignore -> ignore; NewArgs -> bind_cmd0(Type, name(X), IntXNameBin, Key, NewArgs) @@ -564,8 +564,8 @@ consume_from_upstream_queue( expires = Expiry, message_ttl = TTL, ha_policy = HA} = Upstream, - #upstream_params{exchange = X, - params = Params} = UParams, + #upstream_params{x_or_q = X, + params = Params} = UParams, Q = upstream_queue_name(name(X), vhost(Params), DownXName), Args = [Arg || {_K, _T, V} = Arg <- [{<<"x-expires">>, long, Expiry}, {<<"x-message-ttl">>, long, TTL}, @@ -588,7 +588,7 @@ ensure_upstream_bindings(State = #state{upstream = Upstream, channel = Ch, downstream_exchange = DownXName, queue = Q}, Bindings) -> - #upstream_params{exchange = X, params = Params} = UParams, + #upstream_params{x_or_q = X, params = Params} = UParams, OldSuffix = rabbit_federation_db:get_active_suffix( DownXName, Upstream, <<"A">>), Suffix = case OldSuffix of @@ -611,7 +611,7 @@ ensure_upstream_bindings(State = #state{upstream = Upstream, ensure_upstream_exchange(#state{upstream_params = UParams, connection = Conn, channel = Ch}) -> - #upstream_params{exchange = X} = UParams, + #upstream_params{x_or_q = X} = UParams, #exchange{type = Type, durable = Durable, auto_delete = AutoDelete, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl index 24501c07d1..72690f698c 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl @@ -104,10 +104,7 @@ spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> [rabbit_federation_link]}; spec(Upstream = #upstream{reconnect_delay = Delay}, Q = #amqqueue{}) -> - %% TODO don't depend on an exchange here! - FakeX = rabbit_exchange:lookup_or_die( - rabbit_misc:r(<<"/">>, exchange, <<"">>)), - Params = rabbit_federation_upstream:to_params(Upstream, FakeX), + Params = rabbit_federation_upstream:to_params(Upstream, Q), {Upstream, {rabbit_federation_queue_link, start_link, [{Params, Q}]}, {permanent, Delay}, ?MAX_WAIT, worker, [rabbit_federation_queue_link]}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl index 43ed020b54..edc79bd7f0 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_parameters.erl @@ -90,6 +90,7 @@ adjust(Thing) -> shared_validation() -> [{<<"exchange">>, fun rabbit_parameter_validation:binary/2, optional}, + {<<"queue">>, fun rabbit_parameter_validation:binary/2, optional}, {<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional}, {<<"reconnect-delay">>,fun rabbit_parameter_validation:number/2, optional}, {<<"max-hops">>, fun rabbit_parameter_validation:number/2, optional}, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl index eec59fb9ed..c18ed1b2c2 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_queue_link.erl @@ -26,6 +26,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-import(rabbit_federation_util, [name/1]). + -record(not_started, {queue, run, params}). -record(state, {queue, run, conn, ch, dconn, dch, credit, ctag}). @@ -65,7 +67,7 @@ federation_up() -> %%---------------------------------------------------------------------------- -init({#upstream_params{params = Params}, Queue = #amqqueue{name = QName}}) -> +init({Params, Queue = #amqqueue{name = QName}}) -> join(rabbit_federation_queues), join({rabbit_federation_queue, QName}), gen_server2:cast(self(), maybe_go), @@ -90,7 +92,8 @@ handle_cast(go, State) -> handle_cast(run, State = #state{ch = Ch, run = false, ctag = CTag}) -> amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag, - credit = ?CREDIT}), + credit = ?CREDIT, + drain = false}), {noreply, State#state{run = true, credit = ?CREDIT}}; handle_cast(run, State = #not_started{}) -> @@ -109,7 +112,8 @@ handle_cast(stop, State = #not_started{}) -> handle_cast(stop, State = #state{ch = Ch, ctag = CTag}) -> amqp_channel:cast(Ch, #'basic.credit'{consumer_tag = CTag, - credit = 0}), + credit = 0, + drain = false}), {noreply, State#state{run = false, credit = 0}}; handle_cast(basic_get, State = #not_started{}) -> @@ -119,7 +123,8 @@ handle_cast(basic_get, State = #state{ch = Ch, credit = Credit, ctag = CTag}) -> Credit1 = Credit + 1, amqp_channel:cast( Ch, #'basic.credit'{consumer_tag = CTag, - credit = Credit1}), + credit = Credit1, + drain = false}), {noreply, State#state{credit = Credit1}}; handle_cast(Msg, State) -> @@ -148,7 +153,8 @@ handle_info({#'basic.deliver'{}, Msg}, More = ?CREDIT - ?CREDIT_MORE_AT, amqp_channel:cast( Ch, #'basic.credit'{consumer_tag = CTag, - credit = More}), + credit = More, + drain = false}), I - 1 + More; I -> I - 1 @@ -167,9 +173,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -go(#not_started{run = Run, - params = Params, - queue = Queue = #amqqueue{name = QName}}) -> +go(#not_started{run = Run, + params = #upstream_params{x_or_q = UQueue, + params = Params}, + queue = Queue}) -> Credit = case Run of true -> ?CREDIT; false -> 0 @@ -181,7 +188,7 @@ go(#not_started{run = Run, {ok, DCh} = amqp_connection:open_channel(DConn), #'basic.consume_ok'{consumer_tag = CTag} = amqp_channel:call(Ch, #'basic.consume'{ - queue = QName#resource.name, + queue = name(UQueue), no_ack = true, arguments = [{<<"x-purpose">>, longstr, <<"federation">>}, {<<"x-credit">>, table, diff --git a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl index c3bf9e05ee..4dd3a63655 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_upstream.erl @@ -25,7 +25,7 @@ -export([from_set/2, remove_credentials/1]). -import(rabbit_misc, [pget/2, pget/3]). --import(rabbit_federation_util, [name/1, vhost/1]). +-import(rabbit_federation_util, [name/1, vhost/1, r/1]). %%---------------------------------------------------------------------------- @@ -43,16 +43,17 @@ for(XorQ, UpstreamName) -> {error, not_found} -> [] end. -params_to_table(#upstream_params{uri = URI, - params = Params, - exchange = X}) -> +params_to_table(#upstream_params{uri = URI, + params = Params, + x_or_q = XorQ}) -> {table, [{<<"uri">>, longstr, remove_credentials(URI)}, {<<"virtual_host">>, longstr, vhost(Params)}, - {<<"exchange">>, longstr, name(X)}]}. + %% TODO derp + {<<"exchange">>, longstr, name(XorQ)}]}. -params_to_string(#upstream_params{uri = URI, - exchange = #exchange{name = XName}}) -> - print("~s on ~s", [rabbit_misc:rs(XName), remove_credentials(URI)]). +params_to_string(#upstream_params{uri = URI, + x_or_q = XorQ}) -> + print("~s on ~s", [rabbit_misc:rs(r(XorQ)), remove_credentials(URI)]). remove_credentials(URI) -> Props = uri_parser:parse(binary_to_list(URI), @@ -68,13 +69,13 @@ remove_credentials(URI) -> "~s://~s~s~s", [pget(scheme, Props), PGet(host, Props), PortPart, PGet(path, Props)])). -to_params(#upstream{uris = URIs, exchange_name = XNameBin}, X) -> +to_params(Upstream = #upstream{uris = URIs}, XorQ) -> random:seed(now()), URI = lists:nth(random:uniform(length(URIs)), URIs), - {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(X)), - #upstream_params{params = Params, - uri = URI, - exchange = with_name(XNameBin, vhost(Params), X)}. + {ok, Params} = amqp_uri:parse(binary_to_list(URI), vhost(XorQ)), + #upstream_params{params = Params, + uri = URI, + x_or_q = with_name(Upstream, vhost(Params), XorQ)}. print(Fmt, Args) -> iolist_to_binary(io_lib:format(Fmt, Args)). @@ -115,6 +116,7 @@ from_props_connection(U, Name, C, XorQ) -> end, #upstream{uris = URIs, exchange_name = bget(exchange, U, C, name(XorQ)), + queue_name = bget(queue, U, C, name(XorQ)), prefetch_count = bget('prefetch-count', U, C, ?DEFAULT_PREFETCH), reconnect_delay = bget('reconnect-delay', U, C, 1), max_hops = bget('max-hops', U, C, 1), @@ -140,5 +142,8 @@ bget(K0, L1, L2, D) -> a2b(A) -> list_to_binary(atom_to_list(A)). -with_name(XNameBin, VHostBin, X) -> - X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}. +with_name(#upstream{exchange_name = XNameBin}, VHostBin, X = #exchange{}) -> + X#exchange{name = rabbit_misc:r(VHostBin, exchange, XNameBin)}; + +with_name(#upstream{queue_name = QNameBin}, VHostBin, Q = #amqqueue{}) -> + Q#amqqueue{name = rabbit_misc:r(VHostBin, queue, QNameBin)}. diff --git a/deps/rabbitmq_federation/src/rabbit_federation_util.erl b/deps/rabbitmq_federation/src/rabbit_federation_util.erl index 1ee2792bb8..1f818acc14 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_util.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_util.erl @@ -21,7 +21,7 @@ -include("rabbit_federation.hrl"). -export([local_params/2, local_nodename/1, should_forward/2, find_upstreams/2]). --export([validate_arg/3, fail/2, name/1, vhost/1]). +-export([validate_arg/3, fail/2, name/1, vhost/1, r/1]). -import(rabbit_misc, [pget_or_die/2, pget/3]). @@ -77,10 +77,13 @@ fail(Fmt, Args) -> rabbit_misc:protocol_error(precondition_failed, Fmt, Args). name( #resource{name = XName}) -> XName; name(#exchange{name = #resource{name = XName}}) -> XName; -name(#amqqueue{name = #resource{name = XName}}) -> XName. +name(#amqqueue{name = #resource{name = QName}}) -> QName. vhost( #resource{virtual_host = VHost}) -> VHost; vhost(#exchange{name = #resource{virtual_host = VHost}}) -> VHost; vhost(#amqqueue{name = #resource{virtual_host = VHost}}) -> VHost; vhost( #amqp_params_direct{virtual_host = VHost}) -> VHost; vhost(#amqp_params_network{virtual_host = VHost}) -> VHost. + +r(#exchange{name = XName}) -> XName; +r(#amqqueue{name = QName}) -> QName. diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl new file mode 100644 index 0000000000..42732a4aa1 --- /dev/null +++ b/deps/rabbitmq_federation/test/src/rabbit_federation_queue_test.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ Federation. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_federation_queue_test). + +-include("rabbit_federation.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-import(rabbit_misc, [pget/2]). +-import(rabbit_federation_util, [name/1]). + +-import(rabbit_federation_test_util, [publish_expect/5]). + +-define(UPSTREAM_DOWNSTREAM, [q(<<"upstream">>), + q(<<"fed.downstream">>)]). + +simple_test() -> + with_ch( + fun (Ch) -> + publish_expect(Ch, <<>>, <<"upstream">>, <<"fed.downstream">>, + <<"HELLO">>) + end, [q(<<"upstream">>), + q(<<"fed.downstream">>)]). + +%%---------------------------------------------------------------------------- + +with_ch(Fun, Qs) -> + {ok, Conn} = amqp_connection:start(#amqp_params_network{}), + {ok, Ch} = amqp_connection:open_channel(Conn), + declare_all(Ch, Qs), + %%assert_status(Qs), + Fun(Ch), + delete_all(Ch, Qs), + amqp_connection:close(Conn), + ok. + +declare_all(Ch, Qs) -> [declare_queue(Ch, Q) || Q <- Qs]. +delete_all(Ch, Qs) -> + [delete_queue(Ch, Q) || #'queue.declare'{queue = Q} <- Qs]. + +declare_queue(Ch, Q) -> + amqp_channel:call(Ch, Q). + +delete_queue(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + +q(Name) -> + #'queue.declare'{queue = Name, + durable = true}. diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl index b3d88755d2..02a8890cc1 100644 --- a/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl +++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test.erl @@ -23,6 +23,9 @@ -import(rabbit_misc, [pget/2]). -import(rabbit_federation_util, [name/1]). +-import(rabbit_federation_test_util, + [publish_expect/5, publish/4, expect/3, expect_empty/2]). + -define(UPSTREAM_DOWNSTREAM, [x(<<"upstream">>), x(<<"fed.downstream">>)]). @@ -612,47 +615,6 @@ delete_exchange(Ch, X) -> delete_queue(Ch, Q) -> amqp_channel:call(Ch, #'queue.delete'{queue = Q}). -publish(Ch, X, Key, Payload) when is_binary(Payload) -> - publish(Ch, X, Key, #amqp_msg{payload = Payload}); - -publish(Ch, X, Key, Msg = #amqp_msg{}) -> - %% The trouble is that we transmit bindings upstream asynchronously... - timer:sleep(5000), - amqp_channel:call(Ch, #'basic.publish'{exchange = X, - routing_key = Key}, Msg). - - -expect(Ch, Q, Fun) when is_function(Fun) -> - amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, - no_ack = true}, self()), - receive - #'basic.consume_ok'{consumer_tag = CTag} -> ok - end, - Fun(), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}); - -expect(Ch, Q, Payloads) -> - expect(Ch, Q, fun() -> expect(Payloads) end). - -expect([]) -> - ok; -expect(Payloads) -> - receive - {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> - case lists:member(Payload, Payloads) of - true -> expect(Payloads -- [Payload]); - false -> throw({expected, Payloads, actual, Payload}) - end - end. - -publish_expect(Ch, X, Key, Q, Payload) -> - publish(Ch, X, Key, Payload), - expect(Ch, Q, [Payload]). - -expect_empty(Ch, Q) -> - ?assertMatch(#'basic.get_empty'{}, - amqp_channel:call(Ch, #'basic.get'{ queue = Q })). - assert_bindings(Nodename, X, BindingsExp) -> Bindings0 = rpc:call(n(Nodename), rabbit_binding, list_for_source, [r(X)]), BindingsAct = [Key || #binding{key = Key} <- Bindings0], diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl index 47a3248c89..0bc0682940 100644 --- a/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl +++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test_all.erl @@ -20,7 +20,8 @@ all_tests() -> ok = eunit:test(tests(rabbit_federation_unit_test, 60), [verbose]), - ok = eunit:test(tests(rabbit_federation_test, 60), [verbose]). + ok = eunit:test(tests(rabbit_federation_test, 60), [verbose]), + ok = eunit:test(tests(rabbit_federation_queue_test, 60), [verbose]). tests(Module, Timeout) -> {foreach, fun() -> ok end, diff --git a/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl b/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl new file mode 100644 index 0000000000..2db4e305b5 --- /dev/null +++ b/deps/rabbitmq_federation/test/src/rabbit_federation_test_util.erl @@ -0,0 +1,65 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ Federation. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_federation_test_util). + +-include("rabbit_federation.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +publish(Ch, X, Key, Payload) when is_binary(Payload) -> + publish(Ch, X, Key, #amqp_msg{payload = Payload}); + +publish(Ch, X, Key, Msg = #amqp_msg{}) -> + %% The trouble is that we transmit bindings upstream asynchronously... + timer:sleep(5000), + amqp_channel:call(Ch, #'basic.publish'{exchange = X, + routing_key = Key}, Msg). + + +expect(Ch, Q, Fun) when is_function(Fun) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, + no_ack = true}, self()), + receive + #'basic.consume_ok'{consumer_tag = CTag} -> ok + end, + Fun(), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}); + +expect(Ch, Q, Payloads) -> + expect(Ch, Q, fun() -> expect(Payloads) end). + +expect([]) -> + ok; +expect(Payloads) -> + receive + {#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> + case lists:member(Payload, Payloads) of + true -> expect(Payloads -- [Payload]); + false -> throw({expected, Payloads, actual, Payload}) + end + end. + +publish_expect(Ch, X, Key, Q, Payload) -> + publish(Ch, X, Key, Payload), + expect(Ch, Q, [Payload]). + +expect_empty(Ch, Q) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{ queue = Q })). + |