summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-01 17:40:32 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-01 17:40:32 +0100
commitd76f7ffdf5aa02bb1031c9cf1891791607db25a0 (patch)
tree9f5c2a6f6521d08c53a2cc897a74580ad21468f2 /src
parentb2da2bb6b0f82eeb3d090c39fed2894f165d51da (diff)
parent34ecbeb4c9b16cb44dd0bdbc757a96e7f77af944 (diff)
downloadrabbitmq-server-d76f7ffdf5aa02bb1031c9cf1891791607db25a0.tar.gz
Merge bug 24009 into bug 23939.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl37
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_binding.erl51
-rw-r--r--src/rabbit_exchange.erl30
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl9
-rw-r--r--src/rabbit_tests.erl4
10 files changed, 71 insertions, 85 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7942962c..2840a5b7 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -124,7 +124,7 @@
{enables, routing_ready}]}).
-rabbit_boot_step({recovery,
- [{description, "exchange / queue recovery"},
+ [{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
{requires, empty_db_check},
{enables, routing_ready}]}).
@@ -461,39 +461,8 @@ boot_delegate() ->
recover() ->
XNames = rabbit_exchange:recover(),
- QNames = rabbit_amqqueue:start(),
- Bs = rabbit_binding:recover(XNames, QNames),
- {RecXBs, NoRecXBs} = filter_recovered_exchanges(XNames, Bs),
- ok = recovery_callbacks(RecXBs, NoRecXBs).
-
-filter_recovered_exchanges(Xs, Bs) ->
- RecXs = sets:from_list(Xs),
- lists:foldl(
- fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) ->
- case sets:is_element(Src, RecXs) of
- true -> {dict:append(Src, B, RecXBs), NoRecXBs};
- false -> {RecXBs, dict:append(Src, B, NoRecXBs)}
- end
- end, {dict:new(), dict:new()}, Bs).
-
-recovery_callbacks(RecXBs, NoRecXBs) ->
- CB = fun (Tx, F, XBs) ->
- dict:map(fun (XName, Bs) ->
- {ok, X} = rabbit_exchange:lookup(XName),
- rabbit_exchange:callback(X, F, [Tx, X, Bs])
- end, XBs)
- end,
- rabbit_misc:execute_mnesia_transaction(
- fun () -> ok end,
- fun (ok, Tx0) ->
- Tx = case Tx0 of
- true -> transaction;
- false -> none
- end,
- CB(Tx, start, RecXBs),
- CB(Tx, add_bindings, NoRecXBs)
- end),
- ok.
+ QNames = rabbit_amqqueue:recover(),
+ rabbit_binding:recover(XNames, QNames).
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 100c8b4d..b3eca747 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -16,7 +16,8 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
+-export([recover/0, stop/0, declare/5, delete_immediately/1, delete/3,
+ purge/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -57,7 +58,7 @@
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
--spec(start/0 :: () -> [rabbit_types:amqqueue()]).
+-spec(recover/0 :: () -> [rabbit_types:amqqueue()]).
-spec(stop/0 :: () -> 'ok').
-spec(declare/5 ::
(name(), boolean(), boolean(),
@@ -157,7 +158,7 @@
%%----------------------------------------------------------------------------
-start() ->
+recover() ->
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -186,8 +187,8 @@ find_durable_queues() ->
recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
- [Q#amqqueue.name || Q <- Qs,
- gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}].
+ [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
+ gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index e474d6ab..84379e13 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -50,8 +50,8 @@
-opaque(deletions() :: dict()).
--spec(recover/2 :: ([rabbit_types:exchange()], [rabbit_types:amqqueue()]) ->
- [rabbit_types:binding()]).
+-spec(recover/2 :: ([rabbit_types:resource()], [rabbit_types:resource()]) ->
+ 'ok').
-spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()).
-spec(add/1 :: (rabbit_types:binding()) -> add_res()).
-spec(remove/1 :: (rabbit_types:binding()) -> remove_res()).
@@ -94,30 +94,45 @@
destination_name, destination_kind,
routing_key, arguments]).
-recover(XsL, QsL) ->
- Xs = sets:from_list(XsL),
- Qs = sets:from_list(QsL),
+recover(XNames, QNames) ->
+ XNameSet = sets:from_list(XNames),
+ QNameSet = sets:from_list(QNames),
rabbit_misc:table_fold(
fun (Route, ok) ->
ok = mnesia:write(rabbit_semi_durable_route, Route, write)
end, ok, rabbit_durable_route),
- rabbit_misc:table_fold(
- fun (Route = #route{binding = B}, Acc) ->
- case should_recover(B, Xs, Qs) of
- true -> {_, Rev} = route_with_reverse(Route),
- ok = mnesia:write(rabbit_route, Route, write),
- ok = mnesia:write(rabbit_reverse_route, Rev, write),
- [B | Acc];
- false -> Acc
- end
- end, [], rabbit_semi_durable_route).
+ XBs = rabbit_misc:table_fold(
+ fun (Route = #route{binding = B = #binding{source = Src}}, Acc) ->
+ case should_recover(B, XNameSet, QNameSet) of
+ true -> {_, Rev} = route_with_reverse(Route),
+ ok = mnesia:write(rabbit_route, Route, write),
+ ok = mnesia:write(rabbit_reverse_route, Rev,
+ write),
+ rabbit_misc:dict_cons(Src, B, Acc);
+ false -> Acc
+ end
+ end, dict:new(), rabbit_semi_durable_route),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ dict:map(fun (XName, Bindings) ->
+ {ok, X} = rabbit_exchange:lookup(XName),
+ rabbit_exchange:callback(
+ X, add_bindings,
+ [case Tx of
+ true -> transaction;
+ false -> rabbit_exchange:serial(X)
+ end, X, Bindings])
+ end, XBs)
+ end),
+ ok.
should_recover(B = #binding{destination = Dst = #resource{ kind = Kind }},
- XNames, QNames) ->
+ XNameSet, QNameSet) ->
case mnesia:read({rabbit_route, B}) of
[] -> sets:is_element(Dst, case Kind of
- exchange -> XNames;
- queue -> QNames
+ exchange -> XNameSet;
+ queue -> QNameSet
end);
_ -> false
end.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e0dd3d9b..bcbfdfd0 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-type(type() :: atom()).
-type(fun_name() :: atom()).
--spec(recover/0 :: () -> 'ok').
+-spec(recover/0 :: () -> [rabbit_types:resource()]).
-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
@@ -84,14 +84,20 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
recover() ->
- rabbit_misc:table_fold(
- fun (X = #exchange{name = XName}, Acc) ->
- case mnesia:read({rabbit_exchange, XName}) of
+ Xs = rabbit_misc:table_fold(
+ fun (X = #exchange{name = XName}, Acc) ->
+ case mnesia:read({rabbit_exchange, XName}) of
[] -> store(X),
- [XName | Acc];
- [_] -> Acc
- end
- end, [], rabbit_durable_exchange).
+ [X | Acc];
+ [_] -> Acc
+ end
+ end, [], rabbit_durable_exchange),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> ok end,
+ fun (ok, Tx) ->
+ [rabbit_exchange:callback(X, create, [Tx, X]) || X <- Xs]
+ end),
+ [XName || #exchange{name = XName} <- Xs].
callback(#exchange{type = Type}, Fun, Args) ->
apply(type_to_module(Type), Fun, Args).
@@ -122,10 +128,10 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = XT:start(case Tx of
- true -> transaction;
- false -> none
- end, Exchange, []),
+ ok = XT:create(case Tx of
+ true -> transaction;
+ false -> none
+ end, Exchange),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 0468683e..39b90497 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -34,7 +34,7 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration and recovery
- {start, 3},
+ {create, 2},
%% called after exchange (auto)deletion.
{delete, 3},
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 372541d3..68107ca8 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, start/3, delete/3,
+-export([validate/1, create/2, delete/3,
add_bindings/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -42,7 +42,7 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 3d3bb919..40aab06b 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -41,7 +41,7 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 5326ebd0..949eb964 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -115,7 +115,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-start(_Tx, _X, _Bs) -> ok.
+create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
add_bindings(_Tx, _X, _Bs) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index c50072ca..352a2a12 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, start/3, delete/3, add_bindings/3,
+-export([validate/1, create/2, delete/3, add_bindings/3,
remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
@@ -50,12 +50,7 @@ route(#exchange{name = X},
validate(_X) -> ok.
-start(transaction, _X, Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
- end);
-start(none, _X, _Bs) ->
+create(_Tx, _X) ->
ok.
delete(transaction, #exchange{name = X}, _Bs) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9cfe8392..4f8138c7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -595,7 +595,7 @@ test_topic_matching() ->
auto_delete = false, arguments = []},
%% create
rabbit_exchange_type_topic:validate(X),
- exchange_op_callback(X, start, [[]]),
+ exchange_op_callback(X, create, []),
%% add some bindings
Bindings = [#binding{source = XName,
@@ -2322,7 +2322,7 @@ test_queue_recover() ->
after 10000 -> exit(timeout_waiting_for_queue_death)
end,
rabbit_amqqueue:stop(),
- rabbit_amqqueue:start(),
+ rabbit_amqqueue:recover(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->