diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-30 14:29:16 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-03-30 14:29:16 +0100 |
commit | 5b32ca8029dcbc08d97851935c61f237cb455365 (patch) | |
tree | eae6ae8ed6acce72a040884c8a78895711e2e077 | |
parent | 5de9f9dc0af669df764db3a3915fd810918f232c (diff) | |
parent | 1312b972cf32478b68223af795a40c979e65b4d3 (diff) | |
download | rabbitmq-server-5b32ca8029dcbc08d97851935c61f237cb455365.tar.gz |
merge bug23993 into default (OCF script is broken when specifying an alternative config file)
-rw-r--r-- | src/rabbit_backing_queue.erl | 38 | ||||
-rw-r--r-- | src/rabbit_control.erl | 37 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 30 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 183 |
4 files changed, 142 insertions, 146 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a15ff846..0ca8d260 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -39,13 +39,12 @@ behaviour_info(callbacks) -> %% 2. a boolean indicating whether the queue is durable %% 3. a boolean indicating whether the queue is an existing queue %% that should be recovered - %% 4. an asynchronous callback which accepts a function from - %% state to state and invokes it with the current backing - %% queue state. This is useful for handling events, e.g. when - %% the backing queue does not have its own process to receive - %% such events, or when the processing of an event results in - %% a state transition the queue logic needs to know about - %% (such as messages getting confirmed). + %% 4. an asynchronous callback which accepts a function of type + %% backing-queue-state to backing-queue-state. This callback + %% function can be safely invoked from any process, which + %% makes it useful for passing messages back into the backing + %% queue, especially as the backing queue does not have + %% control of its own mailbox. %% 5. a synchronous callback. Same as the asynchronous callback %% but waits for completion and returns 'error' on error. {init, 5}, @@ -71,6 +70,31 @@ behaviour_info(callbacks) -> %% Return ids of messages which have been confirmed since %% the last invocation of this function (or initialisation). + %% + %% Message ids should only appear in the result of + %% drain_confirmed under the following circumstances: + %% + %% 1. The message appears in a call to publish_delivered/4 and + %% the first argument (ack_required) is false; or + %% 2. The message is fetched from the queue with fetch/2 and the + %% first argument (ack_required) is false; or + %% 3. The message is acked (ack/2 is called for the message); or + %% 4. The message is fully fsync'd to disk in such a way that the + %% recovery of the message is guaranteed in the event of a + %% crash of this rabbit node (excluding hardware failure). + %% + %% In addition to the above conditions, a message id may only + %% appear in the result of drain_confirmed if + %% #message_properties.needs_confirming = true when the msg was + %% published (through whichever means) to the backing queue. + %% + %% It is legal for the same message id to appear in the results + %% of multiple calls to drain_confirmed, which means that the + %% backing queue is not required to keep track of which messages + %% it has already confirmed. The confirm will be issued to the + %% publisher the first time the message id appears in the result + %% of drain_confirmed. All subsequent appearances of that message + %% id will be ignored. {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8364ecd8..4a2858f0 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -127,6 +127,8 @@ usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), quit(1). +%%---------------------------------------------------------------------------- + action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), call(Node, {rabbit, stop_and_halt, []}); @@ -159,6 +161,10 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(wait, Node, [], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). + action(status, Node, [], _Opts, Inform) -> Inform("Status of node ~p", [Node]), case call(Node, {rabbit, status, []}) of @@ -294,9 +300,7 @@ action(list_permissions, Node, [], Opts, Inform) -> display_list(call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]})); -action(wait, Node, [], _Opts, Inform) -> - Inform("Waiting for ~p", [Node]), - wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). +%%---------------------------------------------------------------------------- wait_for_application(Node, Attempts) -> case rpc_call(Node, application, which_applications, [infinity]) of @@ -382,12 +386,9 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. -escape(Atom) when is_atom(Atom) -> - escape(atom_to_list(Atom)); -escape(Bin) when is_binary(Bin) -> - escape(binary_to_list(Bin)); -escape(L) when is_list(L) -> - escape_char(lists:reverse(L), []). +escape(Atom) when is_atom(Atom) -> escape(atom_to_list(Atom)); +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); @@ -402,19 +403,15 @@ escape_char([], Acc) -> prettify_amqp_table(Table) -> [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. -prettify_typed_amqp_value(Type, Value) -> - case Type of - longstr -> escape(Value); - table -> prettify_amqp_table(Value); - array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; - _ -> Value - end. +prettify_typed_amqp_value(longstr, Value) -> escape(Value); +prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value); +prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) || + {T, V} <- Value]; +prettify_typed_amqp_value(_Type, Value) -> Value. %% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of - {unix, _} -> - halt(Status); - {win32, _} -> - init:stop(Status) + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status) end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a463e570..9d9b07af 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,12 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). --export([callback/3]). +-export([recover/0, callback/3, declare/6, + assert_equivalence/6, assert_args_equivalence/2, check_type/1, + lookup/1, lookup_or_die/1, list/1, + info_keys/0, info/1, info/2, info_all/1, info_all/2, + publish/2, delete/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -33,8 +34,10 @@ -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). +-type(fun_name() :: atom()). -spec(recover/0 :: () -> 'ok'). +-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -72,7 +75,6 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). -endif. @@ -101,6 +103,9 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). + declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, @@ -126,7 +131,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - callback(Exchange, create, [Tx, Exchange]), + ok = (type_to_module(Type)):create(Tx, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -135,11 +140,6 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. - %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -294,9 +294,6 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. -callback(#exchange{type = XType}, Fun, Args) -> - apply(type_to_module(XType), Fun, Args). - conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); @@ -308,3 +305,8 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_registry:lookup_module(exchange, T), + Module. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ca046c91..fb1c9a34 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -598,39 +598,37 @@ test_topic_matching() -> exchange_op_callback(X, create, []), %% add some bindings - Bindings = lists:map( - fun ({Key, Q}) -> - #binding{source = XName, - key = list_to_binary(Key), - destination = #resource{virtual_host = <<"/">>, - kind = queue, - name = list_to_binary(Q)}} - end, [{"a.b.c", "t1"}, - {"a.*.c", "t2"}, - {"a.#.b", "t3"}, - {"a.b.b.c", "t4"}, - {"#", "t5"}, - {"#.#", "t6"}, - {"#.b", "t7"}, - {"*.*", "t8"}, - {"a.*", "t9"}, - {"*.b.c", "t10"}, - {"a.#", "t11"}, - {"a.#.#", "t12"}, - {"b.b.c", "t13"}, - {"a.b.b", "t14"}, - {"a.b", "t15"}, - {"b.c", "t16"}, - {"", "t17"}, - {"*.*.*", "t18"}, - {"vodka.martini", "t19"}, - {"a.b.c", "t20"}, - {"*.#", "t21"}, - {"#.*.#", "t22"}, - {"*.#.#", "t23"}, - {"#.#.#", "t24"}, - {"*", "t25"}, - {"#.b.#", "t26"}]), + Bindings = [#binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} || + {Key, Q} <- [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]], lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, Bindings), @@ -669,22 +667,23 @@ test_topic_matching() -> ordsets:from_list(RemovedBindings))), %% test some matches - test_topic_expect_match(X, - [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", - "t23", "t24", "t26"]}, - {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", - "t22", "t23", "t24", "t26"]}, - {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", - "t23", "t24", "t26"]}, - {"", ["t6", "t17", "t24"]}, - {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, - {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, - {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, - {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, - {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", - "t24", "t26"]}, - {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, - {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + test_topic_expect_match( + X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), %% remove the entire exchange exchange_op_callback(X, delete, [RemainingBindings]), @@ -701,9 +700,14 @@ test_topic_expect_match(X, List) -> lists:foreach( fun ({Key, Expected}) -> BinKey = list_to_binary(Key), + Message = rabbit_basic:message(X#exchange.name, BinKey, + #'P_basic'{}, <<>>), Res = rabbit_exchange_type_topic:route( - X, #delivery{message = #basic_message{routing_keys = - [BinKey]}}), + X, #delivery{mandatory = false, + immediate = false, + txn = none, + sender = self(), + message = Message}), ExpectedRes = lists:map( fun (Q) -> #resource{virtual_host = <<"/">>, kind = queue, @@ -1178,9 +1182,15 @@ test_server_status() -> passed. -test_spawn(Receiver) -> +test_writer(Pid) -> + receive + shutdown -> ok; + {send_command, Method} -> Pid ! Method, test_writer(Pid) + end. + +test_spawn() -> Me = self(), - Writer = spawn(fun () -> Receiver(Me) end), + Writer = spawn(fun () -> test_writer(Me) end), {ok, Ch} = rabbit_channel:start_link( 1, Me, Writer, Me, rabbit_framing_amqp_0_9_1, user(<<"guest">>), <<"/">>, [], self(), @@ -1198,20 +1208,9 @@ user(Username) -> impl = #internal_user{username = Username, is_admin = true}}. -test_statistics_receiver(Pid) -> - receive - shutdown -> - ok; - {send_command, Method} -> - Pid ! Method, - test_statistics_receiver(Pid) - end. - test_statistics_event_receiver(Pid) -> receive - Foo -> - Pid ! Foo, - test_statistics_event_receiver(Pid) + Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) end. test_statistics_receive_event(Ch, Matcher) -> @@ -1228,17 +1227,8 @@ test_statistics_receive_event1(Ch, Matcher) -> after 1000 -> throw(failed_to_receive_event) end. -test_confirms_receiver(Pid) -> - receive - shutdown -> - ok; - {send_command, Method} -> - Pid ! Method, - test_confirms_receiver(Pid) - end. - test_confirms() -> - {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1), + {_Writer, Ch} = test_spawn(), DeclareBindDurableQueue = fun() -> rabbit_channel:do(Ch, #'queue.declare'{durable = true}), @@ -1264,10 +1254,9 @@ test_confirms() -> QPid1 = Q1#amqqueue.pid, %% Enable confirms rabbit_channel:do(Ch, #'confirm.select'{}), - receive #'confirm.select_ok'{} -> - ok - after 1000 -> - throw(failed_to_enable_confirms) + receive + #'confirm.select_ok'{} -> ok + after 1000 -> throw(failed_to_enable_confirms) end, %% Publish a message rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, @@ -1279,25 +1268,19 @@ test_confirms() -> QPid1 ! boom, %% Wait for a nack receive - #'basic.nack'{} -> - ok; - #'basic.ack'{} -> - throw(received_ack_instead_of_nack) - after 2000 -> - throw(did_not_receive_nack) + #'basic.nack'{} -> ok; + #'basic.ack'{} -> throw(received_ack_instead_of_nack) + after 2000 -> throw(did_not_receive_nack) end, receive - #'basic.ack'{} -> - throw(received_ack_when_none_expected) - after 1000 -> - ok + #'basic.ack'{} -> throw(received_ack_when_none_expected) + after 1000 -> ok end, %% Cleanup rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), - receive #'queue.delete_ok'{} -> - ok - after 1000 -> - throw(failed_to_cleanup_queue) + receive + #'queue.delete_ok'{} -> ok + after 1000 -> throw(failed_to_cleanup_queue) end, unlink(Ch), ok = rabbit_channel:shutdown(Ch), @@ -1311,7 +1294,7 @@ test_statistics() -> %% by far the most complex code though. %% Set up a channel and queue - {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1), + {_Writer, Ch} = test_spawn(), rabbit_channel:do(Ch, #'queue.declare'{}), QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 @@ -1462,18 +1445,8 @@ test_delegates_sync(SecondaryNode) -> passed. -test_queue_cleanup_receiver(Pid) -> - receive - shutdown -> - ok; - {send_command, Method} -> - Pid ! Method, - test_queue_cleanup_receiver(Pid) - end. - - test_queue_cleanup(_SecondaryNode) -> - {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1), + {_Writer, Ch} = test_spawn(), rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }), receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} -> ok |