summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2008-11-06 17:51:07 +0000
committerTony Garnock-Jones <tonyg@lshift.net>2008-11-06 17:51:07 +0000
commitf58bbbdcb3d1265d8507fc5b0261fb1c017417f6 (patch)
tree793438247fcf191e5cf2f9bd13b61d292141001a
parent04613528d22a5ca561656ceffac16fe6cda056f3 (diff)
parentefda2e06e4f60663c5f2d4f0e3447f39d8be7a80 (diff)
downloadrabbitmq-server-f58bbbdcb3d1265d8507fc5b0261fb1c017417f6.tar.gz
merge bug18776 into default
-rw-r--r--include/rabbit.hrl20
-rw-r--r--include/rabbit_framing_spec.hrl1
-rw-r--r--src/rabbit_amqqueue.erl120
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_exchange.erl406
-rw-r--r--src/rabbit_misc.erl4
-rw-r--r--src/rabbit_mnesia.erl10
7 files changed, 268 insertions, 302 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 180a0dc3..706a92af 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,14 @@
-record(exchange, {name, type, durable, auto_delete, arguments}).
--record(amqqueue, {name, durable, auto_delete, arguments, binding_specs, pid}).
--record(binding_spec, {exchange_name, routing_key, arguments}).
+-record(amqqueue, {name, durable, auto_delete, arguments, pid}).
--record(binding, {key, handlers}).
--record(handler, {binding_spec, queue, qpid}).
+%% mnesia doesn't like unary records, so we add a dummy 'value' field
+-record(route, {binding, value = const}).
+-record(reverse_route, {reverse_binding, value = const}).
+
+-record(binding, {exchange_name, key, queue_name, args = []}).
+-record(reverse_binding, {queue_name, key, exchange_name, args = []}).
-record(listener, {node, protocol, host, port}).
@@ -77,16 +80,11 @@
-type(user() ::
#user{username :: username(),
password :: password()}).
--type(binding_spec() ::
- #binding_spec{exchange_name :: exchange_name(),
- routing_key :: routing_key(),
- arguments :: amqp_table()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: bool(),
auto_delete :: bool(),
arguments :: amqp_table(),
- binding_specs :: [binding_spec()],
pid :: maybe(pid())}).
-type(exchange() ::
#exchange{name :: exchange_name(),
@@ -94,6 +92,10 @@
durable :: bool(),
auto_delete :: bool(),
arguments :: amqp_table()}).
+-type(binding() ::
+ #binding{exchange_name :: exchange_name(),
+ queue_name :: queue_name(),
+ key :: binding_key()}).
%% TODO: make this more precise by tying specific class_ids to
%% specific properties
-type(undecoded_content() ::
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
index e9e65092..13000153 100644
--- a/include/rabbit_framing_spec.hrl
+++ b/include/rabbit_framing_spec.hrl
@@ -53,3 +53,4 @@
-type(vhost() :: binary()).
-type(ctag() :: binary()).
-type(exchange_type() :: 'direct' | 'topic' | 'fanout').
+-type(binding_key() :: binary()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b2f801a..56d2c35d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,6 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
--export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2]).
@@ -53,21 +52,12 @@
-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
--type(bind_res() :: {'ok', non_neg_integer()} |
- {'error', 'queue_not_found' | 'exchange_not_found'}).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
--spec(add_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (queue_name(), exchange_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'binding_not_found'}).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
@@ -89,7 +79,6 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
@@ -131,7 +120,7 @@ recover_durable_queues() ->
Queues = lists:map(fun start_queue_process/1, R),
rabbit_misc:execute_mnesia_transaction(
fun () ->
- lists:foreach(fun recover_queue/1, Queues),
+ lists:foreach(fun store_queue/1, Queues),
ok
end).
@@ -140,12 +129,12 @@ declare(QueueName, Durable, AutoDelete, Args) ->
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
- binding_specs = [],
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> ok = recover_queue(Q),
+ [] -> ok = store_queue(Q),
+ ok = add_default_binding(Q),
Q;
[ExistingQ] -> ExistingQ
end
@@ -167,83 +156,12 @@ start_queue_process(Q) ->
{ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
Q#amqqueue{pid = Pid}.
-recover_queue(Q) ->
- ok = store_queue(Q),
- ok = recover_bindings(Q),
- ok.
-
-default_binding_spec(#resource{virtual_host = VHost, name = Name}) ->
- #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>),
- routing_key = Name,
- arguments = []}.
-
-recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) ->
- ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q),
- lists:foreach(fun (B) ->
- ok = rabbit_exchange:add_binding(B, Q)
- end, Specs),
+add_default_binding(#amqqueue{name = QueueName}) ->
+ Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
+ RoutingKey = QueueName#resource.name,
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
ok.
-modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments,
- SpecPresentFun, SpecAbsentFun) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
- [Q = #amqqueue{binding_specs = Specs0}] ->
- Spec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- arguments = Arguments},
- case (case lists:member(Spec, Specs0) of
- true -> SpecPresentFun;
- false -> SpecAbsentFun
- end)(Q, Spec) of
- {ok, #amqqueue{binding_specs = Specs}} ->
- {ok, length(Specs)};
- {error, not_found} ->
- {error, exchange_not_found};
- Other -> Other
- end;
- [] -> {error, queue_not_found}
- end
- end).
-
-update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec,
- UpdateSpecFun, UpdateExchangeFun) ->
- Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)},
- case UpdateExchangeFun(Spec, Q1) of
- ok -> store_queue(Q1),
- {ok, Q1};
- Other -> Other
- end.
-
-add_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
- modify_bindings(
- QueueName, ExchangeName, RoutingKey, Arguments,
- fun (Q, _Spec) -> {ok, Q} end,
- fun (Q, Spec) -> update_bindings(
- Q, Spec,
- fun (S, Specs) -> [S | Specs] end,
- fun rabbit_exchange:add_binding/2)
- end).
-
-delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) ->
- modify_bindings(
- QueueName, ExchangeName, RoutingKey, Arguments,
- fun (Q, Spec) -> update_bindings(
- Q, Spec,
- fun lists:delete/2,
- fun rabbit_exchange:delete_binding/2)
- end,
- fun (Q, Spec) ->
- %% the following is essentially a no-op, though crucially
- %% it produces {error, not_found} when the exchange does
- %% not exist.
- case rabbit_exchange:delete_binding(Spec, Q) of
- ok -> {error, binding_not_found};
- Other -> Other
- end
- end).
-
lookup(Name) ->
rabbit_misc:dirty_read({amqqueue, Name}).
@@ -314,17 +232,6 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
-binding_forcibly_removed(BindingSpec, QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
- [] -> ok;
- [Q = #amqqueue{binding_specs = Specs}] ->
- store_queue(Q#amqqueue{binding_specs =
- lists:delete(BindingSpec, Specs)})
- end
- end).
-
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
@@ -342,12 +249,6 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
-delete_bindings(Q = #amqqueue{binding_specs = Specs}) ->
- lists:foreach(fun (BindingSpec) ->
- ok = rabbit_exchange:delete_binding(
- BindingSpec, Q)
- end, Specs).
-
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -360,10 +261,8 @@ internal_delete(QueueName) ->
end
end).
-delete_queue(Q = #amqqueue{name = QueueName}) ->
- ok = delete_bindings(Q),
- ok = rabbit_exchange:delete_binding(
- default_binding_spec(QueueName), Q),
+delete_queue(#amqqueue{name = QueueName}) ->
+ ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
ok = mnesia:delete({amqqueue, QueueName}),
ok.
@@ -383,7 +282,6 @@ pseudo_queue(QueueName, Pid) ->
durable = false,
auto_delete = false,
arguments = [],
- binding_specs = [],
pid = Pid}.
safe_pmap_ok(H, F, L) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b4e0fbab..1eb421ca 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -586,7 +586,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_amqqueue:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -594,7 +594,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_amqqueue:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -654,7 +654,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- case Fun(QueueName, ExchangeName, ActualRoutingKey, Arguments) of
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
@@ -670,8 +670,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
rabbit_misc:protocol_error(
not_allowed, "durability settings of ~s incompatible with ~s",
[rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
- {ok, _BindingCount} ->
- return_ok(State, NoWait, ReturnMethod)
+ ok -> return_ok(State, NoWait, ReturnMethod)
end.
publish(Mandatory, Immediate, Message, QPids,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index bb132a50..a8c54438 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -29,13 +29,18 @@
-include("rabbit_framing.hrl").
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list_vhost_exchanges/1, list_exchange_bindings/1,
+ list_vhost_exchanges/1,
simple_publish/6, simple_publish/3,
route/2]).
--export([add_binding/2, delete_binding/2]).
+-export([add_binding/4, delete_binding/4]).
-export([delete/2]).
+-export([delete_bindings_for_queue/1]).
-export([check_type/1, assert_type/2, topic_matches/2]).
+%% EXTENDED API
+-export([list_exchange_bindings/1]).
+-export([list_queue_bindings/1]).
+
-import(mnesia).
-import(sets).
-import(lists).
@@ -48,7 +53,8 @@
-type(publish_res() :: {'ok', [pid()]} |
not_found() | {'error', 'unroutable' | 'not_delivered'}).
-
+-type(bind_res() :: 'ok' |
+ {'error', 'queue_not_found' | 'exchange_not_found'}).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
@@ -57,37 +63,46 @@
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
- [{queue_name(), routing_key(), amqp_table()}]).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
--spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found() |
- {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
- 'ok' | not_found()).
+-spec(add_binding/4 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+ bind_res() | {'error', 'durability_settings_incompatible'}).
+-spec(delete_binding/4 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+ bind_res() | {'error', 'binding_not_found'}).
+-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
+-spec(list_queue_bindings/1 :: (queue_name()) ->
+ [{exchange_name(), routing_key(), amqp_table()}]).
+-spec(list_exchange_bindings/1 :: (exchange_name()) ->
+ [{queue_name(), routing_key(), amqp_table()}]).
-endif.
%%----------------------------------------------------------------------------
recover() ->
- ok = recover_durable_exchanges(),
- ok.
-
-recover_durable_exchanges() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- mnesia:foldl(fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges)
+ mnesia:foldl(
+ fun (Exchange, Acc) ->
+ ok = mnesia:write(Exchange),
+ Acc
+ end, ok, durable_exchanges),
+ mnesia:foldl(
+ fun (Route, Acc) ->
+ {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute),
+ Acc
+ end, ok, durable_routes),
+ ok
end).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
@@ -143,22 +158,9 @@ list_vhost_exchanges(VHostPath) ->
mnesia:dirty_match_object(
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
-list_exchange_bindings(Name) ->
- [{QueueName, RoutingKey, Arguments} ||
- #binding{handlers = Handlers} <- bindings_for_exchange(Name),
- #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
- arguments = Arguments},
- queue = QueueName} <- Handlers].
-
-bindings_for_exchange(Name) ->
- qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
- element(1, K) == Name])).
-
-empty_handlers() ->
- [].
-
%% Usable by Erlang code that wants to publish messages.
-simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
+ ContentTypeBin, BodyBin) ->
{ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
Content = #content{class_id = ClassId,
properties = #'P_basic'{content_type = ContentTypeBin},
@@ -188,121 +190,173 @@ simple_publish(Mandatory, Immediate,
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
+%%
+%% TODO: Maybe this should be handled by a cursor instead.
route(#exchange{name = Name, type = topic}, RoutingKey) ->
- sets:to_list(
- sets:union(
- mnesia:activity(
- async_dirty,
- fun () ->
- qlc:e(qlc:q([handler_qpids(H) ||
- #binding{key = {Name1, PatternKey},
- handlers = H}
- <- mnesia:table(binding),
- Name == Name1,
- topic_matches(PatternKey, RoutingKey)]))
- end)));
-
-route(#exchange{name = Name, type = Type}, RoutingKey) ->
- BindingKey = delivery_key_for_type(Type, Name, RoutingKey),
- case rabbit_misc:dirty_read({binding, BindingKey}) of
- {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H));
- {error, not_found} -> []
- end.
+ Query = qlc:q([QName ||
+ #route{binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName,
+ key = BindingKey}} <- mnesia:table(route),
+ ExchangeName == Name,
+ %% TODO: This causes a full scan for each entry
+ %% with the same exchange (see bug 19336)
+ topic_matches(BindingKey, RoutingKey)]),
+ lookup_qpids(mnesia:async_dirty(fun qlc:e/1, [Query]));
+
+route(X = #exchange{type = fanout}, _) ->
+ route_internal(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey) ->
+ route_internal(X, RoutingKey).
+
+route_internal(#exchange{name = Name}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = Name,
+ queue_name = '$1',
+ key = RoutingKey,
+ _ = '_'}},
+ lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
+
+lookup_qpids(Queues) ->
+ sets:fold(
+ fun(Key, Acc) ->
+ [#amqqueue{pid = QPid}] = mnesia:dirty_read({amqqueue, Key}),
+ [QPid | Acc]
+ end, [], sets:from_list(Queues)).
+
+%% TODO: Should all of the route and binding management not be
+%% refactored to its own module, especially seeing as unbind will have
+%% to be implemented for 0.91 ?
+
+delete_bindings_for_exchange(ExchangeName) ->
+ indexed_delete(
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ fun delete_forward_routes/1, fun mnesia:delete_object/1).
+
+delete_bindings_for_queue(QueueName) ->
+ Exchanges = exchanges_for_queue(QueueName),
+ indexed_delete(
+ reverse_route(#route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ fun mnesia:delete_object/1, fun delete_forward_routes/1),
+ [begin
+ [X] = mnesia:read({exchange, ExchangeName}),
+ ok = maybe_auto_delete(X)
+ end || ExchangeName <- Exchanges],
+ ok.
-delivery_key_for_type(fanout, Name, _RoutingKey) ->
- {Name, fanout};
-delivery_key_for_type(_Type, Name, RoutingKey) ->
- {Name, RoutingKey}.
+indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) ->
+ [begin
+ ok = ReverseDeleteFun(reverse_route(Route)),
+ ok = ForwardsDeleteFun(Route)
+ end || Route <- mnesia:match_object(Match)],
+ ok.
-call_with_exchange(Name, Fun) ->
- case mnesia:wread({exchange, Name}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end.
+delete_forward_routes(Route) ->
+ ok = mnesia:delete_object(Route),
+ ok = mnesia:delete_object(durable_routes, Route, write).
-make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) ->
- #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}.
+exchanges_for_queue(QueueName) ->
+ MatchHead = reverse_route(
+ #route{binding = #binding{exchange_name = '$1',
+ queue_name = QueueName,
+ _ = '_'}}),
+ sets:to_list(
+ sets:from_list(
+ mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
-add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
- call_with_exchange(
- ExchangeName,
- fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- internal_add_binding(
- X, RoutingKey, make_handler(BindingSpec, Q))
- end
+has_bindings(ExchangeName) ->
+ MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = '$1',
+ _ = '_'}},
+ continue(mnesia:select(route, [{MatchHead, [], ['$1']}], 1, read)).
+
+continue('$end_of_table') -> false;
+continue({[_|_], _}) -> true;
+continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
+
+call_with_exchange(Exchange, Fun) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> case mnesia:read({exchange, Exchange}) of
+ [] -> {error, exchange_not_found};
+ [X] -> Fun(X)
+ end
end).
-delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName,
- routing_key = RoutingKey}, Q) ->
+call_with_exchange_and_queue(Exchange, Queue, Fun) ->
call_with_exchange(
- ExchangeName,
- fun (X) -> ok = internal_delete_binding(
- X, RoutingKey, make_handler(BindingSpec, Q)),
- maybe_auto_delete(X)
+ Exchange,
+ fun(X) -> case mnesia:read({amqqueue, Queue}) of
+ [] -> {error, queue_not_found};
+ [Q] -> Fun(X, Q)
+ end
end).
-%% Must run within a transaction.
-maybe_auto_delete(#exchange{auto_delete = false}) ->
- ok;
-maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) ->
- case internal_delete(ExchangeName, true) of
- {error, in_use} -> ok;
- ok -> ok
- end.
-
-handlers_isempty([]) -> true;
-handlers_isempty([_|_]) -> false.
-
-extend_handlers(Handlers, Handler) -> [Handler | Handlers].
-
-delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers).
-
-handler_qpids(Handlers) ->
- sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]).
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ if Q#amqqueue.durable and not(X#exchange.durable) ->
+ {error, durability_settings_incompatible};
+ true -> ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:write/3)
+ end
+ end).
-%% Must run within a transaction.
-internal_add_binding(#exchange{name = ExchangeName, type = Type},
- RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- ok = add_handler_to_binding(BindingKey, Handler).
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+ call_with_exchange_and_queue(
+ ExchangeName, QueueName,
+ fun (X, Q) ->
+ ok = sync_binding(
+ ExchangeName, QueueName, RoutingKey, Arguments,
+ Q#amqqueue.durable, fun mnesia:delete_object/3),
+ maybe_auto_delete(X)
+ end).
-%% Must run within a transaction.
-internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) ->
- BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey),
- remove_handler_from_binding(BindingKey, Handler),
+sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
+ Binding = #binding{exchange_name = ExchangeName,
+ queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments},
+ ok = case Durable of
+ true -> Fun(durable_routes, #route{binding = Binding}, write);
+ false -> ok
+ end,
+ [ok, ok] = [Fun(element(1, R), R, write) ||
+ R <- tuple_to_list(route_with_reverse(Binding))],
ok.
-%% Must run within a transaction.
-add_handler_to_binding(BindingKey, Handler) ->
- ok = case mnesia:wread({binding, BindingKey}) of
- [] ->
- ok = mnesia:write(
- #binding{key = BindingKey,
- handlers = extend_handlers(
- empty_handlers(), Handler)});
- [B = #binding{handlers = H}] ->
- ok = mnesia:write(
- B#binding{handlers = extend_handlers(H, Handler)})
- end.
-
-%% Must run within a transaction.
-remove_handler_from_binding(BindingKey, Handler) ->
- case mnesia:wread({binding, BindingKey}) of
- [] -> empty;
- [B = #binding{handlers = H}] ->
- H1 = delete_handler(H, Handler),
- case handlers_isempty(H1) of
- true ->
- ok = mnesia:delete({binding, BindingKey}),
- empty;
- _ ->
- ok = mnesia:write(B#binding{handlers = H1}),
- not_empty
- end
- end.
+route_with_reverse(#route{binding = Binding}) ->
+ route_with_reverse(Binding);
+route_with_reverse(Binding = #binding{}) ->
+ Route = #route{binding = Binding},
+ {Route, reverse_route(Route)}.
+
+reverse_route(#route{binding = Binding}) ->
+ #reverse_route{reverse_binding = reverse_binding(Binding)};
+
+reverse_route(#reverse_route{reverse_binding = Binding}) ->
+ #route{binding = reverse_binding(Binding)}.
+
+reverse_binding(#reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args};
+
+reverse_binding(#binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}) ->
+ #reverse_binding{exchange_name = Exchange,
+ queue_name = Queue,
+ key = Key,
+ args = Args}.
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
@@ -331,46 +385,50 @@ last_topic_match(P, R, []) ->
last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList).
-delete(ExchangeName, IfUnused) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () -> internal_delete(ExchangeName, IfUnused) end).
-
-internal_delete(ExchangeName, _IfUnused = true) ->
- Bindings = bindings_for_exchange(ExchangeName),
- case Bindings of
- [] -> do_internal_delete(ExchangeName, Bindings);
- _ ->
- case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end,
- Bindings) of
- true ->
- %% There are no handlers anywhere in any of the
- %% bindings for this exchange.
- do_internal_delete(ExchangeName, Bindings);
- false ->
- %% There was at least one real handler
- %% present. It's still in use.
- {error, in_use}
- end
- end;
-internal_delete(ExchangeName, false) ->
- do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)).
-
-forcibly_remove_handlers(Handlers) ->
- lists:foreach(
- fun (#handler{binding_spec = BindingSpec, queue = QueueName}) ->
- ok = rabbit_amqqueue:binding_forcibly_removed(
- BindingSpec, QueueName)
- end, Handlers),
+delete(ExchangeName, _IfUnused = true) ->
+ call_with_exchange(ExchangeName, fun conditional_delete/1);
+delete(ExchangeName, _IfUnused = false) ->
+ call_with_exchange(ExchangeName, fun unconditional_delete/1).
+
+maybe_auto_delete(#exchange{auto_delete = false}) ->
+ ok;
+maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
+ conditional_delete(Exchange),
ok.
-do_internal_delete(ExchangeName, Bindings) ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> {error, not_found};
- _ ->
- lists:foreach(fun (#binding{key = K, handlers = H}) ->
- ok = forcibly_remove_handlers(H),
- ok = mnesia:delete({binding, K})
- end, Bindings),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName})
+conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
+ case has_bindings(ExchangeName) of
+ false -> unconditional_delete(Exchange);
+ true -> {error, in_use}
end.
+
+unconditional_delete(#exchange{name = ExchangeName}) ->
+ ok = delete_bindings_for_exchange(ExchangeName),
+ ok = mnesia:delete({durable_exchanges, ExchangeName}),
+ ok = mnesia:delete({exchange, ExchangeName}).
+
+%%----------------------------------------------------------------------------
+%% EXTENDED API
+%% These are API calls that are not used by the server internally,
+%% they are exported for embedded clients to use
+
+%% This is currently used in mod_rabbit.erl (XMPP) and expects this to
+%% return {QueueName, RoutingKey, Arguments} tuples
+list_exchange_bindings(ExchangeName) ->
+ Route = #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ [{QueueName, RoutingKey, Arguments} ||
+ #route{binding = #binding{queue_name = QueueName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(Route)].
+
+% Refactoring is left as an exercise for the reader
+list_queue_bindings(QueueName) ->
+ Route = #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}},
+ [{ExchangeName, RoutingKey, Arguments} ||
+ #route{binding = #binding{exchange_name = ExchangeName,
+ key = RoutingKey,
+ args = Arguments}}
+ <- mnesia:dirty_match_object(Route)].
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 527de0f6..7638af58 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -68,7 +68,8 @@
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
+ when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
@@ -237,6 +238,7 @@ with_vhost(VHostPath, Thunk) ->
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
+
execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4ae367ba..9b67135d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -105,7 +105,13 @@ table_definitions() ->
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
- {binding, [{attributes, record_info(fields, binding)}]},
+ {durable_routes, [{disc_copies, [node()]},
+ {record_name, route},
+ {attributes, record_info(fields, route)}]},
+ {route, [{type, ordered_set},
+ {attributes, record_info(fields, route)}]},
+ {reverse_route, [{type, ordered_set},
+ {attributes, record_info(fields, reverse_route)}]},
{durable_exchanges, [{disc_copies, [node()]},
{record_name, exchange},
{attributes, record_info(fields, exchange)}]},
@@ -255,7 +261,7 @@ init_db(ClusterNodes) ->
end.
create_schema() ->
- mnesia:stop(),
+ mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
cannot_create_schema),
rabbit_misc:ensure_ok(mnesia:start(),