summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-30 14:29:16 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-03-30 14:29:16 +0100
commit5b32ca8029dcbc08d97851935c61f237cb455365 (patch)
treeeae6ae8ed6acce72a040884c8a78895711e2e077
parent5de9f9dc0af669df764db3a3915fd810918f232c (diff)
parent1312b972cf32478b68223af795a40c979e65b4d3 (diff)
downloadrabbitmq-server-5b32ca8029dcbc08d97851935c61f237cb455365.tar.gz
merge bug23993 into default (OCF script is broken when specifying an alternative config file)
-rw-r--r--src/rabbit_backing_queue.erl38
-rw-r--r--src/rabbit_control.erl37
-rw-r--r--src/rabbit_exchange.erl30
-rw-r--r--src/rabbit_tests.erl183
4 files changed, 142 insertions, 146 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a15ff846..0ca8d260 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -39,13 +39,12 @@ behaviour_info(callbacks) ->
%% 2. a boolean indicating whether the queue is durable
%% 3. a boolean indicating whether the queue is an existing queue
%% that should be recovered
- %% 4. an asynchronous callback which accepts a function from
- %% state to state and invokes it with the current backing
- %% queue state. This is useful for handling events, e.g. when
- %% the backing queue does not have its own process to receive
- %% such events, or when the processing of an event results in
- %% a state transition the queue logic needs to know about
- %% (such as messages getting confirmed).
+ %% 4. an asynchronous callback which accepts a function of type
+ %% backing-queue-state to backing-queue-state. This callback
+ %% function can be safely invoked from any process, which
+ %% makes it useful for passing messages back into the backing
+ %% queue, especially as the backing queue does not have
+ %% control of its own mailbox.
%% 5. a synchronous callback. Same as the asynchronous callback
%% but waits for completion and returns 'error' on error.
{init, 5},
@@ -71,6 +70,31 @@ behaviour_info(callbacks) ->
%% Return ids of messages which have been confirmed since
%% the last invocation of this function (or initialisation).
+ %%
+ %% Message ids should only appear in the result of
+ %% drain_confirmed under the following circumstances:
+ %%
+ %% 1. The message appears in a call to publish_delivered/4 and
+ %% the first argument (ack_required) is false; or
+ %% 2. The message is fetched from the queue with fetch/2 and the
+ %% first argument (ack_required) is false; or
+ %% 3. The message is acked (ack/2 is called for the message); or
+ %% 4. The message is fully fsync'd to disk in such a way that the
+ %% recovery of the message is guaranteed in the event of a
+ %% crash of this rabbit node (excluding hardware failure).
+ %%
+ %% In addition to the above conditions, a message id may only
+ %% appear in the result of drain_confirmed if
+ %% #message_properties.needs_confirming = true when the msg was
+ %% published (through whichever means) to the backing queue.
+ %%
+ %% It is legal for the same message id to appear in the results
+ %% of multiple calls to drain_confirmed, which means that the
+ %% backing queue is not required to keep track of which messages
+ %% it has already confirmed. The confirm will be issued to the
+ %% publisher the first time the message id appears in the result
+ %% of drain_confirmed. All subsequent appearances of that message
+ %% id will be ignored.
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8364ecd8..4a2858f0 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,6 +127,8 @@ usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
quit(1).
+%%----------------------------------------------------------------------------
+
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
call(Node, {rabbit, stop_and_halt, []});
@@ -159,6 +161,10 @@ action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
[Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(wait, Node, [], _Opts, Inform) ->
+ Inform("Waiting for ~p", [Node]),
+ wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+
action(status, Node, [], _Opts, Inform) ->
Inform("Status of node ~p", [Node]),
case call(Node, {rabbit, status, []}) of
@@ -294,9 +300,7 @@ action(list_permissions, Node, [], Opts, Inform) ->
display_list(call(Node, {rabbit_auth_backend_internal,
list_vhost_permissions, [VHost]}));
-action(wait, Node, [], _Opts, Inform) ->
- Inform("Waiting for ~p", [Node]),
- wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+%%----------------------------------------------------------------------------
wait_for_application(Node, Attempts) ->
case rpc_call(Node, application, which_applications, [infinity]) of
@@ -382,12 +386,9 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Atom) when is_atom(Atom) ->
- escape(atom_to_list(Atom));
-escape(Bin) when is_binary(Bin) ->
- escape(binary_to_list(Bin));
-escape(L) when is_list(L) ->
- escape_char(lists:reverse(L), []).
+escape(Atom) when is_atom(Atom) -> escape(atom_to_list(Atom));
+escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin));
+escape(L) when is_list(L) -> escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
@@ -402,19 +403,15 @@ escape_char([], Acc) ->
prettify_amqp_table(Table) ->
[{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
-prettify_typed_amqp_value(Type, Value) ->
- case Type of
- longstr -> escape(Value);
- table -> prettify_amqp_table(Value);
- array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
- _ -> Value
- end.
+prettify_typed_amqp_value(longstr, Value) -> escape(Value);
+prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
+prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
+ {T, V} <- Value];
+prettify_typed_amqp_value(_Type, Value) -> Value.
%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
- {unix, _} ->
- halt(Status);
- {win32, _} ->
- init:stop(Status)
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status)
end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a463e570..9d9b07af 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,12 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
- info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
--export([callback/3]).
+-export([recover/0, callback/3, declare/6,
+ assert_equivalence/6, assert_args_equivalence/2, check_type/1,
+ lookup/1, lookup_or_die/1, list/1,
+ info_keys/0, info/1, info/2, info_all/1, info_all/2,
+ publish/2, delete/2]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
--export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
%%----------------------------------------------------------------------------
@@ -33,8 +34,10 @@
-type(name() :: rabbit_types:r('exchange')).
-type(type() :: atom()).
+-type(fun_name() :: atom()).
-spec(recover/0 :: () -> 'ok').
+-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -72,7 +75,6 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
-endif.
@@ -101,6 +103,9 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
+callback(#exchange{type = XType}, Fun, Args) ->
+ apply(type_to_module(XType), Fun, Args).
+
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
X = #exchange{name = XName,
type = Type,
@@ -126,7 +131,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- callback(Exchange, create, [Tx, Exchange]),
+ ok = (type_to_module(Type)):create(Tx, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -135,11 +140,6 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
Err
end).
-%% Used with atoms from records; e.g., the type is expected to exist.
-type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
-
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
case rabbit_registry:binary_to_type(TypeBin) of
@@ -294,9 +294,6 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
-callback(#exchange{type = XType}, Fun, Args) ->
- apply(type_to_module(XType), Fun, Args).
-
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
@@ -308,3 +305,8 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+
+%% Used with atoms from records; e.g., the type is expected to exist.
+type_to_module(T) ->
+ {ok, Module} = rabbit_registry:lookup_module(exchange, T),
+ Module.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ca046c91..fb1c9a34 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -598,39 +598,37 @@ test_topic_matching() ->
exchange_op_callback(X, create, []),
%% add some bindings
- Bindings = lists:map(
- fun ({Key, Q}) ->
- #binding{source = XName,
- key = list_to_binary(Key),
- destination = #resource{virtual_host = <<"/">>,
- kind = queue,
- name = list_to_binary(Q)}}
- end, [{"a.b.c", "t1"},
- {"a.*.c", "t2"},
- {"a.#.b", "t3"},
- {"a.b.b.c", "t4"},
- {"#", "t5"},
- {"#.#", "t6"},
- {"#.b", "t7"},
- {"*.*", "t8"},
- {"a.*", "t9"},
- {"*.b.c", "t10"},
- {"a.#", "t11"},
- {"a.#.#", "t12"},
- {"b.b.c", "t13"},
- {"a.b.b", "t14"},
- {"a.b", "t15"},
- {"b.c", "t16"},
- {"", "t17"},
- {"*.*.*", "t18"},
- {"vodka.martini", "t19"},
- {"a.b.c", "t20"},
- {"*.#", "t21"},
- {"#.*.#", "t22"},
- {"*.#.#", "t23"},
- {"#.#.#", "t24"},
- {"*", "t25"},
- {"#.b.#", "t26"}]),
+ Bindings = [#binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}} ||
+ {Key, Q} <- [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]],
lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
Bindings),
@@ -669,22 +667,23 @@ test_topic_matching() ->
ordsets:from_list(RemovedBindings))),
%% test some matches
- test_topic_expect_match(X,
- [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
- "t23", "t24", "t26"]},
- {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
- "t22", "t23", "t24", "t26"]},
- {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
- "t23", "t24", "t26"]},
- {"", ["t6", "t17", "t24"]},
- {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
- {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
- {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
- {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
- {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
- "t24", "t26"]},
- {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
- {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+ test_topic_expect_match(
+ X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
%% remove the entire exchange
exchange_op_callback(X, delete, [RemainingBindings]),
@@ -701,9 +700,14 @@ test_topic_expect_match(X, List) ->
lists:foreach(
fun ({Key, Expected}) ->
BinKey = list_to_binary(Key),
+ Message = rabbit_basic:message(X#exchange.name, BinKey,
+ #'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
- X, #delivery{message = #basic_message{routing_keys =
- [BinKey]}}),
+ X, #delivery{mandatory = false,
+ immediate = false,
+ txn = none,
+ sender = self(),
+ message = Message}),
ExpectedRes = lists:map(
fun (Q) -> #resource{virtual_host = <<"/">>,
kind = queue,
@@ -1178,9 +1182,15 @@ test_server_status() ->
passed.
-test_spawn(Receiver) ->
+test_writer(Pid) ->
+ receive
+ shutdown -> ok;
+ {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ end.
+
+test_spawn() ->
Me = self(),
- Writer = spawn(fun () -> Receiver(Me) end),
+ Writer = spawn(fun () -> test_writer(Me) end),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], self(),
@@ -1198,20 +1208,9 @@ user(Username) ->
impl = #internal_user{username = Username,
is_admin = true}}.
-test_statistics_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_statistics_receiver(Pid)
- end.
-
test_statistics_event_receiver(Pid) ->
receive
- Foo ->
- Pid ! Foo,
- test_statistics_event_receiver(Pid)
+ Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
end.
test_statistics_receive_event(Ch, Matcher) ->
@@ -1228,17 +1227,8 @@ test_statistics_receive_event1(Ch, Matcher) ->
after 1000 -> throw(failed_to_receive_event)
end.
-test_confirms_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_confirms_receiver(Pid)
- end.
-
test_confirms() ->
- {_Writer, Ch} = test_spawn(fun test_confirms_receiver/1),
+ {_Writer, Ch} = test_spawn(),
DeclareBindDurableQueue =
fun() ->
rabbit_channel:do(Ch, #'queue.declare'{durable = true}),
@@ -1264,10 +1254,9 @@ test_confirms() ->
QPid1 = Q1#amqqueue.pid,
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
- receive #'confirm.select_ok'{} ->
- ok
- after 1000 ->
- throw(failed_to_enable_confirms)
+ receive
+ #'confirm.select_ok'{} -> ok
+ after 1000 -> throw(failed_to_enable_confirms)
end,
%% Publish a message
rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>,
@@ -1279,25 +1268,19 @@ test_confirms() ->
QPid1 ! boom,
%% Wait for a nack
receive
- #'basic.nack'{} ->
- ok;
- #'basic.ack'{} ->
- throw(received_ack_instead_of_nack)
- after 2000 ->
- throw(did_not_receive_nack)
+ #'basic.nack'{} -> ok;
+ #'basic.ack'{} -> throw(received_ack_instead_of_nack)
+ after 2000 -> throw(did_not_receive_nack)
end,
receive
- #'basic.ack'{} ->
- throw(received_ack_when_none_expected)
- after 1000 ->
- ok
+ #'basic.ack'{} -> throw(received_ack_when_none_expected)
+ after 1000 -> ok
end,
%% Cleanup
rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}),
- receive #'queue.delete_ok'{} ->
- ok
- after 1000 ->
- throw(failed_to_cleanup_queue)
+ receive
+ #'queue.delete_ok'{} -> ok
+ after 1000 -> throw(failed_to_cleanup_queue)
end,
unlink(Ch),
ok = rabbit_channel:shutdown(Ch),
@@ -1311,7 +1294,7 @@ test_statistics() ->
%% by far the most complex code though.
%% Set up a channel and queue
- {_Writer, Ch} = test_spawn(fun test_statistics_receiver/1),
+ {_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{}),
QName = receive #'queue.declare_ok'{queue = Q0} ->
Q0
@@ -1462,18 +1445,8 @@ test_delegates_sync(SecondaryNode) ->
passed.
-test_queue_cleanup_receiver(Pid) ->
- receive
- shutdown ->
- ok;
- {send_command, Method} ->
- Pid ! Method,
- test_queue_cleanup_receiver(Pid)
- end.
-
-
test_queue_cleanup(_SecondaryNode) ->
- {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ {_Writer, Ch} = test_spawn(),
rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
ok