diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 12:43:08 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-31 12:43:08 +0100 |
commit | 992e24b44109679d60ec7dc808548b9d57efffd4 (patch) | |
tree | d08869a5fc15b64784b60fd5f18ec8c4b9217082 | |
parent | 6019f2da620f17174e677b175ca938f335d8390e (diff) | |
download | rabbitmq-server-992e24b44109679d60ec7dc808548b9d57efffd4.tar.gz |
Change exchange type API to not distinguish between creating and recovering, and to allow recovering bindings. Recover bindings when needed.
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 9 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 3 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 42 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 13 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 7 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 7 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
9 files changed, 62 insertions, 48 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 45c475d8..8163b6f2 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -20,13 +20,12 @@ -spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> rabbit_router:match_result()). -spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok'). --spec(recover/2 :: (rabbit_types:exchange(), - [rabbit_types:binding()]) -> 'ok'). +-spec(start/3 :: (boolean(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'). -spec(delete/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/3 :: (boolean(), rabbit_types:exchange(), - rabbit_types:binding()) -> 'ok'). +-spec(add_bindings/3 :: (boolean(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'). -spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(), [rabbit_types:binding()]) -> 'ok'). -spec(assert_args_equivalence/2 :: diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 359d4287..84ae789c 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -129,7 +129,8 @@ add(Binding, InnerFun) -> fun mnesia:write/3), fun (Tx) -> ok = rabbit_exchange:callback( - Src, add_binding, [Tx, Src, B]), + Src, add_bindings, + [Tx, Src, [B]]), rabbit_event:notify_if( not Tx, binding_created, info(B)) end; diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 0d13a684..f6ab9d74 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -92,18 +92,34 @@ recover() -> end end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), - recover_with_bindings( - lists:keysort(#binding.source, Bs), - lists:keysort(#exchange.name, Xs), []). - -recover_with_bindings([B = #binding{source = XName} | Rest], - Xs = [#exchange{name = XName} | _], - Bindings) -> - recover_with_bindings(Rest, Xs, [B | Bindings]); -recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> - (type_to_module(Type)):recover(X, Bindings), - recover_with_bindings(Bs, Xs, []); -recover_with_bindings([], [], []) -> + {RecXBs, NoRecXBs} = filter_recovered_exchanges(Xs, Bs), + ok = recovery_callbacks(RecXBs, NoRecXBs). + +%% TODO strip out bindings that are to queues not on this node +filter_recovered_exchanges(Xs, Bs) -> + RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]), + lists:foldl( + fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> + case dict:find(Src, RecXs) of + {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs}; + error -> {ok, X} = lookup(Src), + {RecXBs, dict:append(X, B, NoRecXBs)} + end + end, {dict:new(), dict:new()}, Bs). + +recovery_callbacks(RecXBs, NoRecXBs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + dict:map(fun (X = #exchange{type = Type}, Bs) -> + io:format("Recover X ~p~n", [X]), + (type_to_module(Type)):start(Tx, X, Bs) + end, RecXBs), + dict:map(fun (X = #exchange{type = Type}, Bs) -> + io:format("Recover Bs ~p~n", [Bs]), + (type_to_module(Type)):add_bindings(Tx, X, Bs) + end, NoRecXBs) + end), ok. callback(#exchange{type = XType}, Fun, Args) -> @@ -134,7 +150,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = (type_to_module(Type)):create(Tx, Exchange), + ok = (type_to_module(Type)):start(Tx, Exchange, []), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 547583e9..ad08eb86 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -26,17 +26,14 @@ behaviour_info(callbacks) -> %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} {validate, 1}, - %% called after declaration when previously absent - {create, 2}, + %% called after declaration and recovery + {start, 3}, - %% called when recovering - {recover, 2}, - - %% called after exchange deletion. + %% called after exchange (auto)deletion. {delete, 3}, - %% called after a binding has been added - {add_binding, 3}, + %% called after a binding has been added or bindings have been recovered + {add_bindings, 3}, %% called after bindings have been deleted. {remove_bindings, 3}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 349c2f6e..1658c9f8 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,8 +20,8 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, - add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([validate/1, start/3, delete/3, + add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -40,10 +40,9 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. -create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. +start(_Tx, _X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. +add_bindings(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index bc5293c8..83afdd71 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, start/3, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -39,10 +39,9 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. -create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. +start(_Tx, _X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. +add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index d3529b06..0fe8404f 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, start/3, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -113,10 +113,9 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. +start(_Tx, _X, _Bs) -> ok. delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. +add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index ffd1e583..52f468ee 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, route/2]). --export([validate/1, create/2, recover/2, delete/3, add_binding/3, +-export([validate/1, start/3, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -47,13 +47,14 @@ route(#exchange{name = X}, end || RKey <- Routes]). validate(_X) -> ok. -create(_Tx, _X) -> ok. -recover(_Exchange, Bs) -> +start(true, _X, Bs) -> rabbit_misc:execute_mnesia_transaction( fun () -> lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) - end). + end); +start(false, _X, _Bs) -> + ok. delete(true, #exchange{name = X}, _Bs) -> trie_remove_all_edges(X), @@ -62,9 +63,12 @@ delete(true, #exchange{name = X}, _Bs) -> delete(false, _Exchange, _Bs) -> ok. -add_binding(true, _Exchange, Binding) -> - internal_add_binding(Binding); -add_binding(false, _Exchange, _Binding) -> +add_bindings(true, _X, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end); +add_bindings(false, _X, _Bs) -> ok. remove_bindings(true, #exchange{name = X}, Bs) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fb1c9a34..075258e5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -629,7 +629,7 @@ test_topic_matching() -> {"#.#.#", "t24"}, {"*", "t25"}, {"#.b.#", "t26"}]], - lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + lists:foreach(fun (B) -> exchange_op_callback(X, add_bindings, [[B]]) end, Bindings), %% test some matches |