diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-01 17:40:32 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-01 17:40:32 +0100 |
commit | d76f7ffdf5aa02bb1031c9cf1891791607db25a0 (patch) | |
tree | 9f5c2a6f6521d08c53a2cc897a74580ad21468f2 /src | |
parent | b2da2bb6b0f82eeb3d090c39fed2894f165d51da (diff) | |
parent | 34ecbeb4c9b16cb44dd0bdbc757a96e7f77af944 (diff) | |
download | rabbitmq-server-d76f7ffdf5aa02bb1031c9cf1891791607db25a0.tar.gz |
Merge bug 24009 into bug 23939.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit.erl | 37 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_binding.erl | 51 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 30 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 |
10 files changed, 71 insertions, 85 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7942962c..2840a5b7 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -124,7 +124,7 @@ {enables, routing_ready}]}). -rabbit_boot_step({recovery, - [{description, "exchange / queue recovery"}, + [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, {requires, empty_db_check}, {enables, routing_ready}]}). @@ -461,39 +461,8 @@ boot_delegate() -> recover() -> XNames = rabbit_exchange:recover(), - QNames = rabbit_amqqueue:start(), - Bs = rabbit_binding:recover(XNames, QNames), - {RecXBs, NoRecXBs} = filter_recovered_exchanges(XNames, Bs), - ok = recovery_callbacks(RecXBs, NoRecXBs). - -filter_recovered_exchanges(Xs, Bs) -> - RecXs = sets:from_list(Xs), - lists:foldl( - fun (B = #binding{source = Src}, {RecXBs, NoRecXBs}) -> - case sets:is_element(Src, RecXs) of - true -> {dict:append(Src, B, RecXBs), NoRecXBs}; - false -> {RecXBs, dict:append(Src, B, NoRecXBs)} - end - end, {dict:new(), dict:new()}, Bs). - -recovery_callbacks(RecXBs, NoRecXBs) -> - CB = fun (Tx, F, XBs) -> - dict:map(fun (XName, Bs) -> - {ok, X} = rabbit_exchange:lookup(XName), - rabbit_exchange:callback(X, F, [Tx, X, Bs]) - end, XBs) - end, - rabbit_misc:execute_mnesia_transaction( - fun () -> ok end, - fun (ok, Tx0) -> - Tx = case Tx0 of - true -> transaction; - false -> none - end, - CB(Tx, start, RecXBs), - CB(Tx, add_bindings, NoRecXBs) - end), - ok. + QNames = rabbit_amqqueue:recover(), + rabbit_binding:recover(XNames, QNames). maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 100c8b4d..b3eca747 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -16,7 +16,8 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). +-export([recover/0, stop/0, declare/5, delete_immediately/1, delete/3, + purge/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -57,7 +58,7 @@ -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). --spec(start/0 :: () -> [rabbit_types:amqqueue()]). +-spec(recover/0 :: () -> [rabbit_types:amqqueue()]). -spec(stop/0 :: () -> 'ok'). -spec(declare/5 :: (name(), boolean(), boolean(), @@ -157,7 +158,7 @@ %%---------------------------------------------------------------------------- -start() -> +recover() -> DurableQueues = find_durable_queues(), {ok, BQ} = application:get_env(rabbit, backing_queue_module), ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]), @@ -186,8 +187,8 @@ find_durable_queues() -> recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], - [Q#amqqueue.name || Q <- Qs, - gen_server2:call(Q#amqqueue.pid, {init, true}, infinity) == {new, Q}]. + [QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs, + gen_server2:call(Pid, {init, true}, infinity) == {new, Q}]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e474d6ab..84379e13 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -50,8 +50,8 @@ -opaque(deletions() :: dict()). --spec(recover/2 :: ([rabbit_types:exchange()], [rabbit_types:amqqueue()]) -> - [rabbit_types:binding()]). +-spec(recover/2 :: ([rabbit_types:resource()], [rabbit_types:resource()]) -> + 'ok'). -spec(exists/1 :: (rabbit_types:binding()) -> boolean() | bind_errors()). -spec(add/1 :: (rabbit_types:binding()) -> add_res()). -spec(remove/1 :: (rabbit_types:binding()) -> remove_res()). @@ -94,30 +94,45 @@ destination_name, destination_kind, routing_key, arguments]). -recover(XsL, QsL) -> - Xs = sets:from_list(XsL), - Qs = sets:from_list(QsL), +recover(XNames, QNames) -> + XNameSet = sets:from_list(XNames), + QNameSet = sets:from_list(QNames), rabbit_misc:table_fold( fun (Route, ok) -> ok = mnesia:write(rabbit_semi_durable_route, Route, write) end, ok, rabbit_durable_route), - rabbit_misc:table_fold( - fun (Route = #route{binding = B}, Acc) -> - case should_recover(B, Xs, Qs) of - true -> {_, Rev} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, Route, write), - ok = mnesia:write(rabbit_reverse_route, Rev, write), - [B | Acc]; - false -> Acc - end - end, [], rabbit_semi_durable_route). + XBs = rabbit_misc:table_fold( + fun (Route = #route{binding = B = #binding{source = Src}}, Acc) -> + case should_recover(B, XNameSet, QNameSet) of + true -> {_, Rev} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, Route, write), + ok = mnesia:write(rabbit_reverse_route, Rev, + write), + rabbit_misc:dict_cons(Src, B, Acc); + false -> Acc + end + end, dict:new(), rabbit_semi_durable_route), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + dict:map(fun (XName, Bindings) -> + {ok, X} = rabbit_exchange:lookup(XName), + rabbit_exchange:callback( + X, add_bindings, + [case Tx of + true -> transaction; + false -> rabbit_exchange:serial(X) + end, X, Bindings]) + end, XBs) + end), + ok. should_recover(B = #binding{destination = Dst = #resource{ kind = Kind }}, - XNames, QNames) -> + XNameSet, QNameSet) -> case mnesia:read({rabbit_route, B}) of [] -> sets:is_element(Dst, case Kind of - exchange -> XNames; - queue -> QNames + exchange -> XNameSet; + queue -> QNameSet end); _ -> false end. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e0dd3d9b..bcbfdfd0 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -type(type() :: atom()). -type(fun_name() :: atom()). --spec(recover/0 :: () -> 'ok'). +-spec(recover/0 :: () -> [rabbit_types:resource()]). -spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), @@ -84,14 +84,20 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> - rabbit_misc:table_fold( - fun (X = #exchange{name = XName}, Acc) -> - case mnesia:read({rabbit_exchange, XName}) of + Xs = rabbit_misc:table_fold( + fun (X = #exchange{name = XName}, Acc) -> + case mnesia:read({rabbit_exchange, XName}) of [] -> store(X), - [XName | Acc]; - [_] -> Acc - end - end, [], rabbit_durable_exchange). + [X | Acc]; + [_] -> Acc + end + end, [], rabbit_durable_exchange), + rabbit_misc:execute_mnesia_transaction( + fun () -> ok end, + fun (ok, Tx) -> + [rabbit_exchange:callback(X, create, [Tx, X]) || X <- Xs] + end), + [XName || #exchange{name = XName} <- Xs]. callback(#exchange{type = Type}, Fun, Args) -> apply(type_to_module(Type), Fun, Args). @@ -122,10 +128,10 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - ok = XT:start(case Tx of - true -> transaction; - false -> none - end, Exchange, []), + ok = XT:create(case Tx of + true -> transaction; + false -> none + end, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 0468683e..39b90497 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -34,7 +34,7 @@ behaviour_info(callbacks) -> {validate, 1}, %% called after declaration and recovery - {start, 3}, + {create, 2}, %% called after exchange (auto)deletion. {delete, 3}, diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 372541d3..68107ca8 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, start/3, delete/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -42,7 +42,7 @@ route(#exchange{name = Name}, rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _B) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 3d3bb919..40aab06b 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -20,7 +20,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -41,7 +41,7 @@ route(#exchange{name = Name}, _Delivery) -> rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 5326ebd0..949eb964 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -115,7 +115,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). validate(_X) -> ok. -start(_Tx, _X, _Bs) -> ok. +create(_Tx, _X) -> ok. delete(_Tx, _X, _Bs) -> ok. add_bindings(_Tx, _X, _Bs) -> ok. remove_bindings(_Tx, _X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index c50072ca..352a2a12 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -21,7 +21,7 @@ -behaviour(rabbit_exchange_type). -export([description/0, serialise_events/0, route/2]). --export([validate/1, start/3, delete/3, add_bindings/3, +-export([validate/1, create/2, delete/3, add_bindings/3, remove_bindings/3, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). @@ -50,12 +50,7 @@ route(#exchange{name = X}, validate(_X) -> ok. -start(transaction, _X, Bs) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) - end); -start(none, _X, _Bs) -> +create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9cfe8392..4f8138c7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -595,7 +595,7 @@ test_topic_matching() -> auto_delete = false, arguments = []}, %% create rabbit_exchange_type_topic:validate(X), - exchange_op_callback(X, start, [[]]), + exchange_op_callback(X, create, []), %% add some bindings Bindings = [#binding{source = XName, @@ -2322,7 +2322,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), - rabbit_amqqueue:start(), + rabbit_amqqueue:recover(), rabbit_amqqueue:with_or_die( QName, fun (Q1 = #amqqueue { pid = QPid1 }) -> |