summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-12-17 20:07:48 +0000
committerTony Garnock-Jones <tonyg@lshift.net>2009-12-17 20:07:48 +0000
commitdae4d5cbef1262b291af716eb989233202ccc8bb (patch)
tree60ab0a9286f9feb95560e31d56182405bd66f0df
parentbb173a12eb269eeca191369b8ac71dffce8292a1 (diff)
parent5dff43f73928720d7d4dd592f8db983976549b8c (diff)
downloadrabbitmq-server-bug22068.tar.gz
Merge default into bug22068bug22068
-rw-r--r--include/rabbit_exchange_behaviour_spec.hrl2
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_exchange.erl84
-rw-r--r--src/rabbit_exchange_behaviour.erl12
-rw-r--r--src/rabbit_exchange_events.erl108
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl4
9 files changed, 155 insertions, 70 deletions
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl
index 30662af8..7e965fc7 100644
--- a/include/rabbit_exchange_behaviour_spec.hrl
+++ b/include/rabbit_exchange_behaviour_spec.hrl
@@ -32,7 +32,7 @@
-spec(description/0 :: () -> [{atom(), any()}]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(recover/1 :: (exchange()) -> 'ok').
+-spec(declare/1 :: (exchange()) -> 'ok').
-spec(init/1 :: (exchange()) -> 'ok').
-spec(delete/1 :: (exchange()) -> 'ok').
-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3293927a..a90e682d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -155,7 +155,8 @@ start(normal, []) ->
ok = rabbit_amqqueue:start(),
ok = start_child(rabbit_router),
- ok = start_child(rabbit_node_monitor)
+ ok = start_child(rabbit_node_monitor),
+ ok = start_child(rabbit_exchange_events)
end},
{"recovery",
fun () ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index be73e818..495fc4b3 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -105,13 +105,7 @@ recover() ->
Route, write),
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
- end, rabbit_durable_route),
- %% Tell exchanges to recover themselves only *after* we've
- %% recovered their bindings.
- ok = rabbit_misc:table_foreach(
- fun(Exchange = #exchange{type = Type}) ->
- ok = Type:recover(Exchange)
- end, rabbit_durable_exchange).
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -119,6 +113,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
+ ok = Type:declare(Exchange),
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_exchange, ExchangeName}) of
@@ -128,7 +123,6 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange, write);
true -> ok
end,
- ok = Type:init(Exchange),
Exchange;
[ExistingX] -> ExistingX
end
@@ -263,43 +257,21 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
delete_queue_bindings(QueueName, FwdDeleteFun) ->
- DeletedBindings =
- [begin
- FwdRoute = reverse_route(Route),
- ok = FwdDeleteFun(FwdRoute),
- ok = mnesia:delete_object(rabbit_reverse_route, Route, write),
- FwdRoute#route.binding
- end || Route <- mnesia:match_object(
- rabbit_reverse_route,
- reverse_route(
- #route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- write)],
- %% We need the keysort to group the bindings by exchange name, so
- %% that cleanup_deleted_queue_bindings can inform the exchange of
- %% its vanished bindings before maybe_auto_delete'ing the
- %% exchange.
- ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings),
- none, []).
-
-%% Requires that its input binding list is sorted in exchange-name
-%% order, so that the grouping of bindings (for passing to
-%% cleanup_deleted_queue_bindings1) works properly.
-cleanup_deleted_queue_bindings([], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings)
- when N =:= ExchangeName ->
- cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]);
-cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) ->
- cleanup_deleted_queue_bindings1(ExchangeName, Bindings),
- cleanup_deleted_queue_bindings(Rest, N, [B]).
-
-cleanup_deleted_queue_bindings1(none, []) ->
- ok;
-cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
- [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}),
- [ok = Type:delete_binding(X, B) || B <- Bindings],
- ok = maybe_auto_delete(X).
+ Exchanges = exchanges_for_queue(QueueName),
+ [begin
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
@@ -308,6 +280,15 @@ delete_forward_routes(Route) ->
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
+
contains(Table, MatchHead) ->
try
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
@@ -346,25 +327,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true -> ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- ok = Type:add_binding(X, B)
+ fun mnesia:write/3)
end
end).
delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
- fun (X = #exchange{type = Type}, Q, B) ->
+ fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
_ -> ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
- ok = Type:delete_binding(X, B),
maybe_auto_delete(X)
end
end).
@@ -455,11 +434,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
true -> {error, in_use}
end.
-unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) ->
+unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- ok = Type:delete(X).
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API
diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl
index 7935df6b..4b275c00 100644
--- a/src/rabbit_exchange_behaviour.erl
+++ b/src/rabbit_exchange_behaviour.erl
@@ -35,16 +35,14 @@
behaviour_info(callbacks) ->
[
- %% Called *outside* mnesia transactions.
{description, 0},
{publish, 2},
- %% Called *inside* mnesia transactions, must be idempotent.
- {recover, 1}, %% like init, but called on server startup for durable exchanges
- {init, 1}, %% like recover, but called on declaration when previously absent
- {delete, 1}, %% called on deletion
- {add_binding, 2},
- {delete_binding, 2}
+ {declare, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+ {init, 1}, %% called after declaration when previously absent, or during recovery
+ {delete, 1}, %% called after exchange deletion
+ {add_binding, 2}, %% called after a binding has been added
+ {delete_binding, 2} %% called after a binding has been deleted
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_events.erl b/src/rabbit_exchange_events.erl
new file mode 100644
index 00000000..77647135
--- /dev/null
+++ b/src/rabbit_exchange_events.erl
@@ -0,0 +1,108 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_exchange_events).
+-include("rabbit.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-ifdef(use_specs).
+-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+%%---------------------------------------------------------------------------
+
+with_exchange(#binding{exchange_name = ExchangeName}, Fun) ->
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, X} ->
+ Fun(X);
+ not_found ->
+ ok
+ end.
+
+handle_table_event({write, rabbit_exchange, X = #exchange{type = Type}, _OldRecs, _ActivityId}) ->
+ %% Exchange created/recovered.
+ ok = Type:init(X);
+handle_table_event({delete, rabbit_exchange, {rabbit_exchange, _ExchangeName},
+ [X = #exchange{type = Type}], _ActivityId}) ->
+ %% Exchange deleted.
+ ok = Type:delete(X);
+handle_table_event({write, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) ->
+ %% New binding.
+ ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:add_binding(X, B) end);
+handle_table_event({delete, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) ->
+ %% Deleted binding.
+ ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:delete_binding(X, B) end);
+handle_table_event(_Event) ->
+ {error, unhandled_table_event}.
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ mnesia:subscribe({table, rabbit_exchange, detailed}),
+ mnesia:subscribe({table, rabbit_route, detailed}),
+ {ok, no_state}.
+
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info({mnesia_table_event, Event}, State) ->
+ case catch handle_table_event(Event) of
+ {'EXIT', Reason} ->
+ rabbit_log:error("Exchange event callback failed~n~p~n", [[{event, Event},
+ {reason, Reason}]]);
+ ok ->
+ ok;
+ {error, unhandled_table_event} ->
+ rabbit_log:error("Unexpected mnesia_table_event~n~p~n", [Event])
+ end,
+ {noreply, State};
+handle_info(Info, State) ->
+ {stop, {unhandled_info, Info}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index e6e6ae99..dff06b25 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -46,7 +46,7 @@ publish(#exchange{name = Name},
Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery).
-recover(_X) -> ok.
+declare(_X) -> ok.
init(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 2194abd4..b4654b0c 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
description() ->
@@ -45,7 +45,7 @@ description() ->
publish(#exchange{name = Name}, Delivery) ->
rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery).
-recover(_X) -> ok.
+declare(_X) -> ok.
init(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 72c85b06..f28bfdc7 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,7 +36,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-ifdef(use_specs).
@@ -120,7 +120,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
end,
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
-recover(_X) -> ok.
+declare(_X) -> ok.
init(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 738ff595..ecb65807 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,7 +35,7 @@
-behaviour(rabbit_exchange_behaviour).
-export([description/0, publish/2]).
--export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]).
+-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]).
-include("rabbit_exchange_behaviour_spec.hrl").
-export([topic_matches/2]).
@@ -83,7 +83,7 @@ last_topic_match(P, R, []) ->
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-recover(_X) -> ok.
+declare(_X) -> ok.
init(_X) -> ok.
delete(_X) -> ok.
add_binding(_X, _B) -> ok.