diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2008-11-06 17:51:07 +0000 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2008-11-06 17:51:07 +0000 |
commit | f58bbbdcb3d1265d8507fc5b0261fb1c017417f6 (patch) | |
tree | 793438247fcf191e5cf2f9bd13b61d292141001a | |
parent | 04613528d22a5ca561656ceffac16fe6cda056f3 (diff) | |
parent | efda2e06e4f60663c5f2d4f0e3447f39d8be7a80 (diff) | |
download | rabbitmq-server-f58bbbdcb3d1265d8507fc5b0261fb1c017417f6.tar.gz |
merge bug18776 into default
-rw-r--r-- | include/rabbit.hrl | 20 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 120 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 406 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 |
7 files changed, 268 insertions, 302 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 180a0dc3..706a92af 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -43,11 +43,14 @@ -record(exchange, {name, type, durable, auto_delete, arguments}). --record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}). --record(binding_spec, {exchange_name, routing_key, arguments}). +-record(amqqueue, {name, durable, auto_delete, arguments, pid}). --record(binding, {key, handlers}). --record(handler, {binding_spec, queue, qpid}). +%% mnesia doesn't like unary records, so we add a dummy 'value' field +-record(route, {binding, value = const}). +-record(reverse_route, {reverse_binding, value = const}). + +-record(binding, {exchange_name, key, queue_name, args = []}). +-record(reverse_binding, {queue_name, key, exchange_name, args = []}). -record(listener, {node, protocol, host, port}). @@ -77,16 +80,11 @@ -type(user() :: #user{username :: username(), password :: password()}). --type(binding_spec() :: - #binding_spec{exchange_name :: exchange_name(), - routing_key :: routing_key(), - arguments :: amqp_table()}). -type(amqqueue() :: #amqqueue{name :: queue_name(), durable :: bool(), auto_delete :: bool(), arguments :: amqp_table(), - binding_specs :: [binding_spec()], pid :: maybe(pid())}). -type(exchange() :: #exchange{name :: exchange_name(), @@ -94,6 +92,10 @@ durable :: bool(), auto_delete :: bool(), arguments :: amqp_table()}). +-type(binding() :: + #binding{exchange_name :: exchange_name(), + queue_name :: queue_name(), + key :: binding_key()}). %% TODO: make this more precise by tying specific class_ids to %% specific properties -type(undecoded_content() :: diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl index e9e65092..13000153 100644 --- a/include/rabbit_framing_spec.hrl +++ b/include/rabbit_framing_spec.hrl @@ -53,3 +53,4 @@ -type(vhost() :: binary()). -type(ctag() :: binary()). -type(exchange_type() :: 'direct' | 'topic' | 'fanout'). +-type(binding_key() :: binary()). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7b2f801a..56d2c35d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,6 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). --export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2]). @@ -53,21 +52,12 @@ -type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). --type(bind_res() :: {'ok', non_neg_integer()} | - {'error', 'queue_not_found' | 'exchange_not_found'}). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). - -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). --spec(add_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (queue_name(), exchange_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'binding_not_found'}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). @@ -89,7 +79,6 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). --spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). @@ -131,7 +120,7 @@ recover_durable_queues() -> Queues = lists:map(fun start_queue_process/1, R), rabbit_misc:execute_mnesia_transaction( fun () -> - lists:foreach(fun recover_queue/1, Queues), + lists:foreach(fun store_queue/1, Queues), ok end). @@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args, - binding_specs = [], pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> ok = recover_queue(Q), + [] -> ok = store_queue(Q), + ok = add_default_binding(Q), Q; [ExistingQ] -> ExistingQ end @@ -167,83 +156,12 @@ start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), Q#amqqueue{pid = Pid}. -recover_queue(Q) -> - ok = store_queue(Q), - ok = recover_bindings(Q), - ok. - -default_binding_spec(#resource{virtual_host = VHost, name = Name}) -> - #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>), - routing_key = Name, - arguments = []}. - -recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> - ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), - lists:foreach(fun (B) -> - ok = rabbit_exchange:add_binding(B, Q) - end, Specs), +add_default_binding(#amqqueue{name = QueueName}) -> + Exchange = rabbit_misc:r(QueueName, exchange, <<>>), + RoutingKey = QueueName#resource.name, + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), ok. -modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments, - SpecPresentFun, SpecAbsentFun) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [Q = #amqqueue{binding_specs = Specs0}] -> - Spec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey, - arguments = Arguments}, - case (case lists:member(Spec, Specs0) of - true -> SpecPresentFun; - false -> SpecAbsentFun - end)(Q, Spec) of - {ok, #amqqueue{binding_specs = Specs}} -> - {ok, length(Specs)}; - {error, not_found} -> - {error, exchange_not_found}; - Other -> Other - end; - [] -> {error, queue_not_found} - end - end). - -update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, - UpdateSpecFun, UpdateExchangeFun) -> - Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, - case UpdateExchangeFun(Spec, Q1) of - ok -> store_queue(Q1), - {ok, Q1}; - Other -> Other - end. - -add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, _Spec) -> {ok, Q} end, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun (S, Specs) -> [S | Specs] end, - fun rabbit_exchange:add_binding/2) - end). - -delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> - modify_bindings( - QueueName, ExchangeName, RoutingKey, Arguments, - fun (Q, Spec) -> update_bindings( - Q, Spec, - fun lists:delete/2, - fun rabbit_exchange:delete_binding/2) - end, - fun (Q, Spec) -> - %% the following is essentially a no-op, though crucially - %% it produces {error, not_found} when the exchange does - %% not exist. - case rabbit_exchange:delete_binding(Spec, Q) of - ok -> {error, binding_not_found}; - Other -> Other - end - end). - lookup(Name) -> rabbit_misc:dirty_read({amqqueue, Name}). @@ -314,17 +232,6 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). -binding_forcibly_removed(BindingSpec, QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({amqqueue, QueueName}) of - [] -> ok; - [Q = #amqqueue{binding_specs = Specs}] -> - store_queue(Q#amqqueue{binding_specs = - lists:delete(BindingSpec, Specs)}) - end - end). - claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server:call(QPid, {claim_queue, ReaderPid}). @@ -342,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). -delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> - lists:foreach(fun (BindingSpec) -> - ok = rabbit_exchange:delete_binding( - BindingSpec, Q) - end, Specs). - internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -360,10 +261,8 @@ internal_delete(QueueName) -> end end). -delete_queue(Q = #amqqueue{name = QueueName}) -> - ok = delete_bindings(Q), - ok = rabbit_exchange:delete_binding( - default_binding_spec(QueueName), Q), +delete_queue(#amqqueue{name = QueueName}) -> + ok = rabbit_exchange:delete_bindings_for_queue(QueueName), ok = mnesia:delete({amqqueue, QueueName}), ok. @@ -383,7 +282,6 @@ pseudo_queue(QueueName, Pid) -> durable = false, auto_delete = false, arguments = [], - binding_specs = [], pid = Pid}. safe_pmap_ok(H, F, L) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b4e0fbab..1eb421ca 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -586,7 +586,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -594,7 +594,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -654,7 +654,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); @@ -670,8 +670,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, rabbit_misc:protocol_error( not_allowed, "durability settings of ~s incompatible with ~s", [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); - {ok, _BindingCount} -> - return_ok(State, NoWait, ReturnMethod) + ok -> return_ok(State, NoWait, ReturnMethod) end. publish(Mandatory, Immediate, Message, QPids, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index bb132a50..a8c54438 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -29,13 +29,18 @@ -include("rabbit_framing.hrl"). -export([recover/0, declare/5, lookup/1, lookup_or_die/1, - list_vhost_exchanges/1, list_exchange_bindings/1, + list_vhost_exchanges/1, simple_publish/6, simple_publish/3, route/2]). --export([add_binding/2, delete_binding/2]). +-export([add_binding/4, delete_binding/4]). -export([delete/2]). +-export([delete_bindings_for_queue/1]). -export([check_type/1, assert_type/2, topic_matches/2]). +%% EXTENDED API +-export([list_exchange_bindings/1]). +-export([list_queue_bindings/1]). + -import(mnesia). -import(sets). -import(lists). @@ -48,7 +53,8 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). - +-type(bind_res() :: 'ok' | + {'error', 'queue_not_found' | 'exchange_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -57,37 +63,46 @@ -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> - [{queue_name(), routing_key(), amqp_table()}]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found() | - {'error', 'durability_settings_incompatible'}). --spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> - 'ok' | not_found()). +-spec(add_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/4 :: + (exchange_name(), queue_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'binding_not_found'}). +-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). +-spec(list_queue_bindings/1 :: (queue_name()) -> + [{exchange_name(), routing_key(), amqp_table()}]). +-spec(list_exchange_bindings/1 :: (exchange_name()) -> + [{queue_name(), routing_key(), amqp_table()}]). -endif. %%---------------------------------------------------------------------------- recover() -> - ok = recover_durable_exchanges(), - ok. - -recover_durable_exchanges() -> rabbit_misc:execute_mnesia_transaction( fun () -> - mnesia:foldl(fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges) + mnesia:foldl( + fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges), + mnesia:foldl( + fun (Route, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute), + Acc + end, ok, durable_routes), + ok end). declare(ExchangeName, Type, Durable, AutoDelete, Args) -> @@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) -> mnesia:dirty_match_object( #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). -list_exchange_bindings(Name) -> - [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. - -bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). - -empty_handlers() -> - []. - %% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, + ContentTypeBin, BodyBin) -> {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), Content = #content{class_id = ClassId, properties = #'P_basic'{content_type = ContentTypeBin}, @@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate, %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. +%% +%% TODO: Maybe this should be handled by a cursor instead. route(#exchange{name = Name, type = topic}, RoutingKey) -> - sets:to_list( - sets:union( - mnesia:activity( - async_dirty, - fun () -> - qlc:e(qlc:q([handler_qpids(H) || - #binding{key = {Name1, PatternKey}, - handlers = H} - <- mnesia:table(binding), - Name == Name1, - topic_matches(PatternKey, RoutingKey)])) - end))); - -route(#exchange{name = Name, type = Type}, RoutingKey) -> - BindingKey = delivery_key_for_type(Type, Name, RoutingKey), - case rabbit_misc:dirty_read({binding, BindingKey}) of - {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); - {error, not_found} -> [] - end. + Query = qlc:q([QName || + #route{binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Name, + %% TODO: This causes a full scan for each entry + %% with the same exchange (see bug 19336) + topic_matches(BindingKey, RoutingKey)]), + lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query])); + +route(X = #exchange{type = fanout}, _) -> + route_internal(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey) -> + route_internal(X, RoutingKey). + +route_internal(#exchange{name = Name}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}), + [QPid | Acc] + end, [], sets:from_list(Queues)). + +%% TODO: Should all of the route and binding management not be +%% refactored to its own module, especially seeing as unbind will have +%% to be implemented for 0.91 ? + +delete_bindings_for_exchange(ExchangeName) -> + indexed_delete( + #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + fun delete_forward_routes/1, fun mnesia:delete_object/1). + +delete_bindings_for_queue(QueueName) -> + Exchanges = exchanges_for_queue(QueueName), + indexed_delete( + reverse_route(#route{binding = #binding{queue_name = QueueName, + _ = '_'}}), + fun mnesia:delete_object/1, fun delete_forward_routes/1), + [begin + [X] = mnesia:read({exchange, ExchangeName}), + ok = maybe_auto_delete(X) + end || ExchangeName <- Exchanges], + ok. -delivery_key_for_type(fanout, Name, _RoutingKey) -> - {Name, fanout}; -delivery_key_for_type(_Type, Name, RoutingKey) -> - {Name, RoutingKey}. +indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> + [begin + ok = ReverseDeleteFun(reverse_route(Route)), + ok = ForwardsDeleteFun(Route) + end || Route <- mnesia:match_object(Match)], + ok. -call_with_exchange(Name, Fun) -> - case mnesia:wread({exchange, Name}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end. +delete_forward_routes(Route) -> + ok = mnesia:delete_object(Route), + ok = mnesia:delete_object(durable_routes, Route, write). -make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> - #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. +exchanges_for_queue(QueueName) -> + MatchHead = reverse_route( + #route{binding = #binding{exchange_name = '$1', + queue_name = QueueName, + _ = '_'}}), + sets:to_list( + sets:from_list( + mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). -add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> - call_with_exchange( - ExchangeName, - fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - internal_add_binding( - X, RoutingKey, make_handler(BindingSpec, Q)) - end +has_bindings(ExchangeName) -> + MatchHead = #route{binding = #binding{exchange_name = ExchangeName, + queue_name = '$1', + _ = '_'}}, + continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)). + +continue('$end_of_table') -> false; +continue({[_|_], _}) -> true; +continue({[], Continuation}) -> continue(mnesia:select(Continuation)). + +call_with_exchange(Exchange, Fun) -> + rabbit_misc:execute_mnesia_transaction( + fun() -> case mnesia:read({exchange, Exchange}) of + [] -> {error, exchange_not_found}; + [X] -> Fun(X) + end end). -delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, - routing_key = RoutingKey}, Q) -> +call_with_exchange_and_queue(Exchange, Queue, Fun) -> call_with_exchange( - ExchangeName, - fun (X) -> ok = internal_delete_binding( - X, RoutingKey, make_handler(BindingSpec, Q)), - maybe_auto_delete(X) + Exchange, + fun(X) -> case mnesia:read({amqqueue, Queue}) of + [] -> {error, queue_not_found}; + [Q] -> Fun(X, Q) + end end). -%% Must run within a transaction. -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; -maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> - case internal_delete(ExchangeName, true) of - {error, in_use} -> ok; - ok -> ok - end. - -handlers_isempty([]) -> true; -handlers_isempty([_|_]) -> false. - -extend_handlers(Handlers, Handler) -> [Handler | Handlers]. - -delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). - -handler_qpids(Handlers) -> - sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). +add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:write/3) + end + end). -%% Must run within a transaction. -internal_add_binding(#exchange{name = ExchangeName, type = Type}, - RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - ok = add_handler_to_binding(BindingKey, Handler). +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + call_with_exchange_and_queue( + ExchangeName, QueueName, + fun (X, Q) -> + ok = sync_binding( + ExchangeName, QueueName, RoutingKey, Arguments, + Q#amqqueue.durable, fun mnesia:delete_object/3), + maybe_auto_delete(X) + end). -%% Must run within a transaction. -internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> - BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), - remove_handler_from_binding(BindingKey, Handler), +sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> + Binding = #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = Arguments}, + ok = case Durable of + true -> Fun(durable_routes, #route{binding = Binding}, write); + false -> ok + end, + [ok, ok] = [Fun(element(1, R), R, write) || + R <- tuple_to_list(route_with_reverse(Binding))], ok. -%% Must run within a transaction. -add_handler_to_binding(BindingKey, Handler) -> - ok = case mnesia:wread({binding, BindingKey}) of - [] -> - ok = mnesia:write( - #binding{key = BindingKey, - handlers = extend_handlers( - empty_handlers(), Handler)}); - [B = #binding{handlers = H}] -> - ok = mnesia:write( - B#binding{handlers = extend_handlers(H, Handler)}) - end. - -%% Must run within a transaction. -remove_handler_from_binding(BindingKey, Handler) -> - case mnesia:wread({binding, BindingKey}) of - [] -> empty; - [B = #binding{handlers = H}] -> - H1 = delete_handler(H, Handler), - case handlers_isempty(H1) of - true -> - ok = mnesia:delete({binding, BindingKey}), - empty; - _ -> - ok = mnesia:write(B#binding{handlers = H1}), - not_empty - end - end. +route_with_reverse(#route{binding = Binding}) -> + route_with_reverse(Binding); +route_with_reverse(Binding = #binding{}) -> + Route = #route{binding = Binding}, + {Route, reverse_route(Route)}. + +reverse_route(#route{binding = Binding}) -> + #reverse_route{reverse_binding = reverse_binding(Binding)}; + +reverse_route(#reverse_route{reverse_binding = Binding}) -> + #route{binding = reverse_binding(Binding)}. + +reverse_binding(#reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}; + +reverse_binding(#binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}) -> + #reverse_binding{exchange_name = Exchange, + queue_name = Queue, + key = Key, + args = Args}. split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), @@ -331,46 +385,50 @@ last_topic_match(P, R, []) -> last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). -delete(ExchangeName, IfUnused) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> internal_delete(ExchangeName, IfUnused) end). - -internal_delete(ExchangeName, _IfUnused = true) -> - Bindings = bindings_for_exchange(ExchangeName), - case Bindings of - [] -> do_internal_delete(ExchangeName, Bindings); - _ -> - case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, - Bindings) of - true -> - %% There are no handlers anywhere in any of the - %% bindings for this exchange. - do_internal_delete(ExchangeName, Bindings); - false -> - %% There was at least one real handler - %% present. It's still in use. - {error, in_use} - end - end; -internal_delete(ExchangeName, false) -> - do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). - -forcibly_remove_handlers(Handlers) -> - lists:foreach( - fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> - ok = rabbit_amqqueue:binding_forcibly_removed( - BindingSpec, QueueName) - end, Handlers), +delete(ExchangeName, _IfUnused = true) -> + call_with_exchange(ExchangeName, fun conditional_delete/1); +delete(ExchangeName, _IfUnused = false) -> + call_with_exchange(ExchangeName, fun unconditional_delete/1). + +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> + conditional_delete(Exchange), ok. -do_internal_delete(ExchangeName, Bindings) -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> {error, not_found}; - _ -> - lists:foreach(fun (#binding{key = K, handlers = H}) -> - ok = forcibly_remove_handlers(H), - ok = mnesia:delete({binding, K}) - end, Bindings), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}) +conditional_delete(Exchange = #exchange{name = ExchangeName}) -> + case has_bindings(ExchangeName) of + false -> unconditional_delete(Exchange); + true -> {error, in_use} end. + +unconditional_delete(#exchange{name = ExchangeName}) -> + ok = delete_bindings_for_exchange(ExchangeName), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}). + +%%---------------------------------------------------------------------------- +%% EXTENDED API +%% These are API calls that are not used by the server internally, +%% they are exported for embedded clients to use + +%% This is currently used in mod_rabbit.erl (XMPP) and expects this to +%% return {QueueName, RoutingKey, Arguments} tuples +list_exchange_bindings(ExchangeName) -> + Route = #route{binding = #binding{exchange_name = ExchangeName, + _ = '_'}}, + [{QueueName, RoutingKey, Arguments} || + #route{binding = #binding{queue_name = QueueName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. + +% Refactoring is left as an exercise for the reader +list_queue_bindings(QueueName) -> + Route = #route{binding = #binding{queue_name = QueueName, + _ = '_'}}, + [{ExchangeName, RoutingKey, Arguments} || + #route{binding = #binding{exchange_name = ExchangeName, + key = RoutingKey, + args = Arguments}} + <- mnesia:dirty_match_object(Route)]. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 527de0f6..7638af58 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -68,7 +68,8 @@ -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) + when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} @@ -237,6 +238,7 @@ with_vhost(VHostPath, Thunk) -> with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). + execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4ae367ba..9b67135d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -105,7 +105,13 @@ table_definitions() -> {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, - {binding, [{attributes, record_info(fields, binding)}]}, + {durable_routes, [{disc_copies, [node()]}, + {record_name, route}, + {attributes, record_info(fields, route)}]}, + {route, [{type, ordered_set}, + {attributes, record_info(fields, route)}]}, + {reverse_route, [{type, ordered_set}, + {attributes, record_info(fields, reverse_route)}]}, {durable_exchanges, [{disc_copies, [node()]}, {record_name, exchange}, {attributes, record_info(fields, exchange)}]}, @@ -255,7 +261,7 @@ init_db(ClusterNodes) -> end. create_schema() -> - mnesia:stop(), + mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), rabbit_misc:ensure_ok(mnesia:start(), |