From 2fd6bfb1fa6f1f72c3de7a758f4c9a2b955b4b39 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 2 Dec 2009 06:09:11 +0000 Subject: Use mnesia table events instead of in-transaction callbacks. --- include/rabbit_exchange_behaviour_spec.hrl | 1 - src/rabbit.erl | 3 +- src/rabbit_exchange.erl | 83 +++++++++--------------- src/rabbit_exchange_behaviour.erl | 11 ++-- src/rabbit_exchange_events.erl | 100 +++++++++++++++++++++++++++++ src/rabbit_exchange_type_direct.erl | 3 +- src/rabbit_exchange_type_fanout.erl | 3 +- src/rabbit_exchange_type_headers.erl | 3 +- src/rabbit_exchange_type_topic.erl | 3 +- 9 files changed, 140 insertions(+), 70 deletions(-) create mode 100644 src/rabbit_exchange_events.erl diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl index 30662af8..e4e0b7ba 100644 --- a/include/rabbit_exchange_behaviour_spec.hrl +++ b/include/rabbit_exchange_behaviour_spec.hrl @@ -32,7 +32,6 @@ -spec(description/0 :: () -> [{atom(), any()}]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(recover/1 :: (exchange()) -> 'ok'). -spec(init/1 :: (exchange()) -> 'ok'). -spec(delete/1 :: (exchange()) -> 'ok'). -spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). diff --git a/src/rabbit.erl b/src/rabbit.erl index c6dde385..4cba1fa9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -154,7 +154,8 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), - ok = start_child(rabbit_node_monitor) + ok = start_child(rabbit_node_monitor), + ok = start_child(rabbit_exchange_events) end}, {"recovery", fun () -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 09ea1e96..e796b16f 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -105,13 +105,7 @@ recover() -> Route, write), ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) - end, rabbit_durable_route), - %% Tell exchanges to recover themselves only *after* we've - %% recovered their bindings. - ok = rabbit_misc:table_foreach( - fun(Exchange = #exchange{type = Type}) -> - ok = Type:recover(Exchange) - end, rabbit_durable_exchange). + end, rabbit_durable_route). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -128,7 +122,6 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange, write); true -> ok end, - ok = Type:init(Exchange), Exchange; [ExistingX] -> ExistingX end @@ -263,43 +256,21 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - DeletedBindings = - [begin - FwdRoute = reverse_route(Route), - ok = FwdDeleteFun(FwdRoute), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write), - FwdRoute#route.binding - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - %% We need the keysort to group the bindings by exchange name, so - %% that cleanup_deleted_queue_bindings can inform the exchange of - %% its vanished bindings before maybe_auto_delete'ing the - %% exchange. - ok = cleanup_deleted_queue_bindings(lists:keysort(#binding.exchange_name, DeletedBindings), - none, []). - -%% Requires that its input binding list is sorted in exchange-name -%% order, so that the grouping of bindings (for passing to -%% cleanup_deleted_queue_bindings1) works properly. -cleanup_deleted_queue_bindings([], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) - when N =:= ExchangeName -> - cleanup_deleted_queue_bindings(Rest, ExchangeName, [B | Bindings]); -cleanup_deleted_queue_bindings([B = #binding{exchange_name = N} | Rest], ExchangeName, Bindings) -> - cleanup_deleted_queue_bindings1(ExchangeName, Bindings), - cleanup_deleted_queue_bindings(Rest, N, [B]). - -cleanup_deleted_queue_bindings1(none, []) -> - ok; -cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> - [X = #exchange{type = Type}] = mnesia:read({rabbit_exchange, ExchangeName}), - [ok = Type:delete_binding(X, B) || B <- Bindings], - ok = maybe_auto_delete(X). + Exchanges = exchanges_for_queue(QueueName), + [begin + ok = FwdDeleteFun(reverse_route(Route)), + ok = mnesia:delete_object(rabbit_reverse_route, Route, write) + end || Route <- mnesia:match_object( + rabbit_reverse_route, + reverse_route( + #route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + write)], + [begin + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -308,6 +279,15 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). + contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -346,25 +326,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - ok = Type:add_binding(X, B) + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> binding_action( ExchangeName, QueueName, RoutingKey, Arguments, - fun (X = #exchange{type = Type}, Q, B) -> + fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; _ -> ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), - ok = Type:delete_binding(X, B), maybe_auto_delete(X) end end). @@ -455,11 +433,10 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> true -> {error, in_use} end. -unconditional_delete(X = #exchange{name = ExchangeName, type = Type}) -> +unconditional_delete(#exchange{name = ExchangeName}) -> ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - ok = Type:delete(X). + ok = mnesia:delete({rabbit_exchange, ExchangeName}). %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl index 7935df6b..d518891e 100644 --- a/src/rabbit_exchange_behaviour.erl +++ b/src/rabbit_exchange_behaviour.erl @@ -35,16 +35,13 @@ behaviour_info(callbacks) -> [ - %% Called *outside* mnesia transactions. {description, 0}, {publish, 2}, - %% Called *inside* mnesia transactions, must be idempotent. - {recover, 1}, %% like init, but called on server startup for durable exchanges - {init, 1}, %% like recover, but called on declaration when previously absent - {delete, 1}, %% called on deletion - {add_binding, 2}, - {delete_binding, 2} + {init, 1}, %% called after declaration when previously absent, or during recovery + {delete, 1}, %% called after deletion + {add_binding, 2}, %% called after a new binding has appeared + {delete_binding, 2} %% called after a binding has been removed ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_events.erl b/src/rabbit_exchange_events.erl new file mode 100644 index 00000000..f5372732 --- /dev/null +++ b/src/rabbit_exchange_events.erl @@ -0,0 +1,100 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_events). +-include("rabbit.hrl"). + +-behaviour(gen_server2). + +-export([start_link/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-ifdef(use_specs). +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +with_exchange(#binding{exchange_name = ExchangeName}, Fun) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + Fun(X); + not_found -> + ok + end. + +handle_table_event({write, rabbit_exchange, X = #exchange{type = Type}, [], _ActivityId}) -> + %% Exchange created/recovered. + ok = Type:init(X); +handle_table_event({delete, rabbit_exchange, {rabbit_exchange, _ExchangeName}, + [X = #exchange{type = Type}], _ActivityId}) -> + %% Exchange deleted. + ok = Type:delete(X); +handle_table_event({write, rabbit_route, #route{binding = B}, [], _ActivityId}) -> + %% New binding. + ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:add_binding(X, B) end); +handle_table_event({delete, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> + %% Deleted binding. + ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:delete_binding(X, B) end); +handle_table_event(Event) -> + exit({unhandled_table_event, Event}). + +%%--------------------------------------------------------------------------- + +init([]) -> + mnesia:subscribe({table, rabbit_exchange, detailed}), + mnesia:subscribe({table, rabbit_route, detailed}), + {ok, no_state}. + +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info({mnesia_table_event, Event}, State) -> + ok = handle_table_event(Event), + {noreply, State}; +handle_info(Info, State) -> + {stop, {unhandled_info, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index e6e6ae99..da19d2c2 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -46,7 +46,6 @@ publish(#exchange{name = Name}, Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). -recover(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 2194abd4..df3e31af 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -45,7 +45,6 @@ description() -> publish(#exchange{name = Name}, Delivery) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). -recover(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 72c85b06..dbd6c988 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,7 +36,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -ifdef(use_specs). @@ -120,7 +120,6 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], end, headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). -recover(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 738ff595..90257573 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([recover/1, init/1, delete/1, add_binding/2, delete_binding/2]). +-export([init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -export([topic_matches/2]). @@ -83,7 +83,6 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -recover(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. -- cgit v1.2.1 From fe2ba279e62c2f928f4585a7b7be7d4144a8c482 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 2 Dec 2009 06:47:57 +0000 Subject: Add declare/1 callback. --- include/rabbit_exchange_behaviour_spec.hrl | 1 + src/rabbit_exchange.erl | 1 + src/rabbit_exchange_behaviour.erl | 7 ++++--- src/rabbit_exchange_type_direct.erl | 3 ++- src/rabbit_exchange_type_fanout.erl | 3 ++- src/rabbit_exchange_type_headers.erl | 3 ++- src/rabbit_exchange_type_topic.erl | 3 ++- 7 files changed, 14 insertions(+), 7 deletions(-) diff --git a/include/rabbit_exchange_behaviour_spec.hrl b/include/rabbit_exchange_behaviour_spec.hrl index e4e0b7ba..7e965fc7 100644 --- a/include/rabbit_exchange_behaviour_spec.hrl +++ b/include/rabbit_exchange_behaviour_spec.hrl @@ -32,6 +32,7 @@ -spec(description/0 :: () -> [{atom(), any()}]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). +-spec(declare/1 :: (exchange()) -> 'ok'). -spec(init/1 :: (exchange()) -> 'ok'). -spec(delete/1 :: (exchange()) -> 'ok'). -spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e796b16f..2c98deee 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -113,6 +113,7 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, + ok = Type:declare(Exchange), rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, ExchangeName}) of diff --git a/src/rabbit_exchange_behaviour.erl b/src/rabbit_exchange_behaviour.erl index d518891e..4b275c00 100644 --- a/src/rabbit_exchange_behaviour.erl +++ b/src/rabbit_exchange_behaviour.erl @@ -38,10 +38,11 @@ behaviour_info(callbacks) -> {description, 0}, {publish, 2}, + {declare, 1}, %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {init, 1}, %% called after declaration when previously absent, or during recovery - {delete, 1}, %% called after deletion - {add_binding, 2}, %% called after a new binding has appeared - {delete_binding, 2} %% called after a binding has been removed + {delete, 1}, %% called after exchange deletion + {add_binding, 2}, %% called after a binding has been added + {delete_binding, 2} %% called after a binding has been deleted ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index da19d2c2..dff06b25 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -46,6 +46,7 @@ publish(#exchange{name = Name}, Delivery = #delivery{message = #basic_message{routing_key = RoutingKey}}) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), Delivery). +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index df3e31af..b4654b0c 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). description() -> @@ -45,6 +45,7 @@ description() -> publish(#exchange{name = Name}, Delivery) -> rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index dbd6c988..f28bfdc7 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -36,7 +36,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -ifdef(use_specs). @@ -120,6 +120,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], end, headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 90257573..ecb65807 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -35,7 +35,7 @@ -behaviour(rabbit_exchange_behaviour). -export([description/0, publish/2]). --export([init/1, delete/1, add_binding/2, delete_binding/2]). +-export([declare/1, init/1, delete/1, add_binding/2, delete_binding/2]). -include("rabbit_exchange_behaviour_spec.hrl"). -export([topic_matches/2]). @@ -83,6 +83,7 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). +declare(_X) -> ok. init(_X) -> ok. delete(_X) -> ok. add_binding(_X, _B) -> ok. -- cgit v1.2.1 From 2dd656cde0df68f6272be111346754b9354172c4 Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 2 Dec 2009 06:55:17 +0000 Subject: Cope more gracefully with failed callbacks. --- src/rabbit_exchange_events.erl | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/rabbit_exchange_events.erl b/src/rabbit_exchange_events.erl index f5372732..b9dfef86 100644 --- a/src/rabbit_exchange_events.erl +++ b/src/rabbit_exchange_events.erl @@ -71,8 +71,8 @@ handle_table_event({write, rabbit_route, #route{binding = B}, [], _ActivityId}) handle_table_event({delete, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> %% Deleted binding. ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:delete_binding(X, B) end); -handle_table_event(Event) -> - exit({unhandled_table_event, Event}). +handle_table_event(_Event) -> + {error, unhandled_table_event}. %%--------------------------------------------------------------------------- @@ -88,7 +88,15 @@ handle_cast(Request, State) -> {stop, {unhandled_cast, Request}, State}. handle_info({mnesia_table_event, Event}, State) -> - ok = handle_table_event(Event), + case catch handle_table_event(Event) of + {'EXIT', Reason} -> + rabbit_log:error("Exchange event callback failed~n~p~n", [[{event, Event}, + {reason, Reason}]]); + ok -> + ok; + {error, unhandled_table_event} -> + rabbit_log:error("Unexpected mnesia_table_event~n~p~n", [Event]) + end, {noreply, State}; handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. -- cgit v1.2.1 From 5dff43f73928720d7d4dd592f8db983976549b8c Mon Sep 17 00:00:00 2001 From: Tony Garnock-Jones Date: Wed, 2 Dec 2009 08:01:03 +0000 Subject: Ignore any old records --- src/rabbit_exchange_events.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_exchange_events.erl b/src/rabbit_exchange_events.erl index b9dfef86..77647135 100644 --- a/src/rabbit_exchange_events.erl +++ b/src/rabbit_exchange_events.erl @@ -58,14 +58,14 @@ with_exchange(#binding{exchange_name = ExchangeName}, Fun) -> ok end. -handle_table_event({write, rabbit_exchange, X = #exchange{type = Type}, [], _ActivityId}) -> +handle_table_event({write, rabbit_exchange, X = #exchange{type = Type}, _OldRecs, _ActivityId}) -> %% Exchange created/recovered. ok = Type:init(X); handle_table_event({delete, rabbit_exchange, {rabbit_exchange, _ExchangeName}, [X = #exchange{type = Type}], _ActivityId}) -> %% Exchange deleted. ok = Type:delete(X); -handle_table_event({write, rabbit_route, #route{binding = B}, [], _ActivityId}) -> +handle_table_event({write, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> %% New binding. ok = with_exchange(B, fun (X = #exchange{type = Type}) -> Type:add_binding(X, B) end); handle_table_event({delete, rabbit_route, #route{binding = B}, _OldRecs, _ActivityId}) -> -- cgit v1.2.1