summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-31 12:43:08 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-03-31 12:43:08 +0100
commit992e24b44109679d60ec7dc808548b9d57efffd4 (patch)
treed08869a5fc15b64784b60fd5f18ec8c4b9217082
parent6019f2da620f17174e677b175ca938f335d8390e (diff)
downloadrabbitmq-server-992e24b44109679d60ec7dc808548b9d57efffd4.tar.gz
Change exchange type API to not distinguish between creating and recovering, and to allow recovering bindings. Recover bindings when needed.
-rw-r--r--include/rabbit_exchange_type_spec.hrl9
-rw-r--r--src/rabbit_binding.erl3
-rw-r--r--src/rabbit_exchange.erl42
-rw-r--r--src/rabbit_exchange_type.erl13
-rw-r--r--src/rabbit_exchange_type_direct.erl9
-rw-r--r--src/rabbit_exchange_type_fanout.erl7
-rw-r--r--src/rabbit_exchange_type_headers.erl7
-rw-r--r--src/rabbit_exchange_type_topic.erl18
-rw-r--r--src/rabbit_tests.erl2
9 files changed, 62 insertions, 48 deletions
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 45c475d8..8163b6f2 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -20,13 +20,12 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
--spec(recover/2 :: (rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
+-spec(start/3 :: (boolean(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
- rabbit_types:binding()) -> 'ok').
+-spec(add_bindings/3 :: (boolean(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok').
-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(assert_args_equivalence/2 ::
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 359d4287..84ae789c 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -129,7 +129,8 @@ add(Binding, InnerFun) ->
fun mnesia:write/3),
fun (Tx) ->
ok = rabbit_exchange:callback(
- Src, add_binding, [Tx, Src, B]),
+ Src, add_bindings,
+ [Tx, Src, [B]]),
rabbit_event:notify_if(
not Tx, binding_created, info(B))
end;
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 0d13a684..f6ab9d74 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -92,18 +92,34 @@ recover() ->
end
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
- recover_with_bindings(
- lists:keysort(#binding.source, Bs),
- lists:keysort(#exchange.name, Xs), []).
-
-recover_with_bindings([B = #binding{source = XName} | Rest],
- Xs = [#exchange{name = XName} | _],
- Bindings) ->
- recover_with_bindings(Rest, Xs, [B | Bindings]);
-recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
- (type_to_module(Type)):recover(X, Bindings),
- recover_with_bindings(Bs, Xs, []);
-recover_with_bindings([], [], []) ->
+ {RecXBs, NoRecXBs} = filter_recovered_exchanges(Xs, Bs),
+ ok = recovery_callbacks(RecXBs, NoRecXBs).
+
+%% TODO strip out bindings that are to queues not on this node
+filter_recovered_exchanges(Xs, Bs) ->
+ RecXs = dict:from_list([{XName, X} || X = #exchange{name = XName} <- Xs]),
+ lists:foldl(
+ fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
+ case dict:find(Src, RecXs) of
+ {ok, X} -> {dict:append(X, B, RecXBs), NoRecXBs};
+ error -> {ok, X} = lookup(Src),
+ {RecXBs, dict:append(X, B, NoRecXBs)}
+ end
+ end, {dict:new(), dict:new()}, Bs).
+
+recovery_callbacks(RecXBs, NoRecXBs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ dict:map(fun (X = #exchange{type = Type}, Bs) ->
+ io:format("Recover X ~p~n", [X]),
+ (type_to_module(Type)):start(Tx, X, Bs)
+ end, RecXBs),
+ dict:map(fun (X = #exchange{type = Type}, Bs) ->
+ io:format("Recover Bs ~p~n", [Bs]),
+ (type_to_module(Type)):add_bindings(Tx, X, Bs)
+ end, NoRecXBs)
+ end),
ok.
callback(#exchange{type = XType}, Fun, Args) ->
@@ -134,7 +150,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = (type_to_module(Type)):create(Tx, Exchange),
+ ok = (type_to_module(Type)):start(Tx, Exchange, []),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 547583e9..ad08eb86 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -26,17 +26,14 @@ behaviour_info(callbacks) ->
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
{validate, 1},
- %% called after declaration when previously absent
- {create, 2},
+ %% called after declaration and recovery
+ {start, 3},
- %% called when recovering
- {recover, 2},
-
- %% called after exchange deletion.
+ %% called after exchange (auto)deletion.
{delete, 3},
- %% called after a binding has been added
- {add_binding, 3},
+ %% called after a binding has been added or bindings have been recovered
+ {add_bindings, 3},
%% called after bindings have been deleted.
{remove_bindings, 3},
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 349c2f6e..1658c9f8 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,8 +20,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, start/3, delete/3,
+ add_bindings/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -40,10 +40,9 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
-create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
+start(_Tx, _X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
+add_bindings(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index bc5293c8..83afdd71 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, start/3, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -39,10 +39,9 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
-create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
+start(_Tx, _X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
+add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index d3529b06..0fe8404f 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, start/3, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -113,10 +113,9 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
+start(_Tx, _X, _Bs) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
+add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index ffd1e583..52f468ee 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+-export([validate/1, start/3, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -47,13 +47,14 @@ route(#exchange{name = X},
end || RKey <- Routes]).
validate(_X) -> ok.
-create(_Tx, _X) -> ok.
-recover(_Exchange, Bs) ->
+start(true, _X, Bs) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end).
+ end);
+start(false, _X, _Bs) ->
+ ok.
delete(true, #exchange{name = X}, _Bs) ->
trie_remove_all_edges(X),
@@ -62,9 +63,12 @@ delete(true, #exchange{name = X}, _Bs) ->
delete(false, _Exchange, _Bs) ->
ok.
-add_binding(true, _Exchange, Binding) ->
- internal_add_binding(Binding);
-add_binding(false, _Exchange, _Binding) ->
+add_bindings(true, _X, Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
+ end);
+add_bindings(false, _X, _Bs) ->
ok.
remove_bindings(true, #exchange{name = X}, Bs) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fb1c9a34..075258e5 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -629,7 +629,7 @@ test_topic_matching() ->
{"#.#.#", "t24"},
{"*", "t25"},
{"#.b.#", "t26"}]],
- lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
+ lists:foreach(fun (B) -> exchange_op_callback(X, add_bindings, [[B]]) end,
Bindings),
%% test some matches