diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-12-17 20:07:48 +0000 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-12-17 20:07:48 +0000 |
commit | dae4d5cbef1262b291af716eb989233202ccc8bb (patch) | |
tree | 60ab0a9286f9feb95560e31d56182405bd66f0df | |
parent | bb173a12eb269eeca191369b8ac71dffce8292a1 (diff) | |
parent | 5dff43f73928720d7d4dd592f8db983976549b8c (diff) | |
download | rabbitmq-server-bug22068.tar.gz |
Merge default into bug22068bug22068
-rw-r--r-- | include/rabbit_exchange_behaviour_spec.hrl | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 84 | ||||
-rw-r--r-- | src/rabbit_exchange_behaviour.erl | 12 | ||||
-rw-r--r-- | src/rabbit_exchange_events.erl | 108 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 4 |
9 files changed, 155 insertions, 70 deletions
diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl index 30662af8..7e965fc7 100644 --- a/include/rabbit_exchange_behaviour_spec.hrl +++ b/include/rabbit_exchange_behaviour_spec.hrl @@ -32,7 +32,7 @@ -spec(description/0 :: () -> [{atom(), any()}]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(recover/1 :: (exchange()) -> 'ok'). +-spec(declare/1 :: (exchange()) -> 'ok'). -spec(init/1 :: (exchange()) -> 'ok'). -spec(delete/1 :: (exchange()) -> 'ok'). -spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). diff --git a/src/rabbit.erl b/src/rabbit.erl index 3293927a..a90e682d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -155,7 +155,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) + ok = start_child(rabbit_node_monitor), + ok = start_child(rabbit_exchange_events) end}, {"recovery", fun () -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index be73e818..495fc4b3 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -105,13 +105,7 @@ recover() -> Route, write), ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) - end, rabbit_durable_route), - %% Tell exchanges to recover themselves only *after* we've - %% recovered their bindings. - ok = rabbit_misc:table_foreach( - fun(Exchange = #exchange{type = Type}) -> - ok = Type:recover(Exchange) - end, rabbit_durable_exchange). + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -119,6 +113,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, + ok = Type:declare(Exchange), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, ExchangeName}) of @@ -128,7 +123,6 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange, write); true -> ok end, - ok = Type:init(Exchange), Exchange; [ExistingX] -> ExistingX end @@ -263,43 +257,21 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - FwdRoute = reverse_route(Route), - ok = FwdDeleteFun(FwdRoute), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write), - FwdRoute#route.binding - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - %% We need the keysort to group the bindings by exchange name, so - %% that cleanup_deleted_queue_bindings can inform the exchange of - %% its vanished bindings before maybe_auto_delete'ing the - %% exchange. - ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings), - none, []). - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) - when N =:= ExchangeName -> - cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings), - cleanup_deleted_queue_bindings(Rest, N, [B]). - -cleanup_deleted_queue_bindings1(none, []) -> - ok; -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}), - [ok = Type:delete_binding(X, B) || B <- Bindings], - ok = maybe_auto_delete(X). + Exchanges = exchanges_for_queue(QueueName), + [begin + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -308,6 +280,15 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -346,25 +327,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - ok = Type:add_binding(X, B) + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; _ -> ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), - ok = Type:delete_binding(X, B), maybe_auto_delete(X) end end). @@ -455,11 +434,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) -> +unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - ok = Type:delete(X). + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl index 7935df6b..4b275c00 100644 --- a/src/rabbit_exchange_behaviour.erl +++ b/src/rabbit_exchange_behaviour.erl @@ -35,16 +35,14 @@ behaviour_info(callbacks) -> [ - %% Called *outside* mnesia transactions. {description, 0}, {publish, 2}, - %% Called *inside* mnesia transactions, must be idempotent. - {recover, 1}, %% like init, but called on server startup for durable exchanges - {init, 1}, %% like recover, but called on declaration when previously absent - {delete, 1}, %% called on deletion - {add_binding, 2}, - {delete_binding, 2} + {declare, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} + {init, 1}, %% called after declaration when previously absent, or during recovery + {delete, 1}, %% called after exchange deletion + {add_binding, 2}, %% called after a binding has been added + {delete_binding, 2} %% called after a binding has been deleted ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_events.erl b/src/rabbit_exchange_events.erl new file mode 100644 index 00000000..77647135 --- /dev/null +++ b/src/rabbit_exchange_events.erl @@ -0,0 +1,108 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_events). +-include("rabbit.hrl"). + +-behaviour(gen_server2). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-ifdef(use_specs). +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +with_exchange(#binding{exchange_name = ExchangeName}, Fun) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + Fun(X); + not_found -> + ok + end. + +handle_table_event({write, rabbit_exchange, X = #exchange{type = Type}, _OldRecs, _ActivityId}) -> + %% Exchange created/recovered. + ok = Type:init(X); +handle_table_event({delete, rabbit_exchange, {rabbit_exchange, _ExchangeName}, + [X = #exchange{type = Type}], _ActivityId}) -> + %% Exchange deleted. + ok = Type:delete(X); +handle_table_event({write, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> + %% New binding. + ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:add_binding(X, B) end); +handle_table_event({delete, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> + %% Deleted binding. + ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:delete_binding(X, B) end); +handle_table_event(_Event) -> + {error, unhandled_table_event}. + +%%--------------------------------------------------------------------------- + +init([]) -> + mnesia:subscribe({table, rabbit_exchange, detailed}), + mnesia:subscribe({table, rabbit_route, detailed}), + {ok, no_state}. + +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info({mnesia_table_event, Event}, State) -> + case catch handle_table_event(Event) of + {'EXIT', Reason} -> + rabbit_log:error("Exchange event callback failed~n~p~n", [[{event, Event}, + {reason, Reason}]]); + ok -> + ok; + {error, unhandled_table_event} -> + rabbit_log:error("Unexpected mnesia_table_event~n~p~n", [Event]) + end, + {noreply, State}; +handle_info(Info, State) -> + {stop, {unhandled_info, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index e6e6ae99..dff06b25 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -46,7 +46,7 @@ publish(#exchange{name = Name}, Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). -recover(_X) -> ok. +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 2194abd4..b4654b0c 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -45,7 +45,7 @@ description() -> publish(#exchange{name = Name}, Delivery) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). -recover(_X) -> ok. +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 72c85b06..f28bfdc7 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,7 +36,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -ifdef(use_specs). @@ -120,7 +120,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], end, headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). -recover(_X) -> ok. +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 738ff595..ecb65807 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -export([topic_matches/2]). @@ -83,7 +83,7 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -recover(_X) -> ok. +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. |