diff options
author | Simon MacMullen <simon@lshift.net> | 2010-05-28 15:29:05 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-05-28 15:29:05 +0100 |
commit | 1dcc251bb05c8d979ae8b21eafbedb7166812823 (patch) | |
tree | 066e0ae2ed42ee80af5367941796b6fde45f4b2d | |
parent | b4b29b285354678df1262799b59f6bf4259a5c94 (diff) | |
download | rabbitmq-server-1dcc251bb05c8d979ae8b21eafbedb7166812823.tar.gz |
Cherry-pick the queue-exclusivity-on-(un)binding thing from amqp_0_9_1.
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 23 |
3 files changed, 29 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 483b5a93..36dc1b90 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,7 +195,8 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], + fun (_X, _Q) -> ok end), ok. lookup(Name) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 31bb54c0..e915c7b3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -771,7 +771,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -779,7 +779,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -853,7 +853,9 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -864,7 +866,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + CheckExclusive = + fun(_X, Q) -> + with_exclusive_access_or_die(Q#amqqueue.name, + ReaderPid, fun(_Q1)-> ok end) + end, + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + CheckExclusive) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8f41392f..b3b9e1b4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2]). @@ -58,6 +58,8 @@ 'queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found'}). +-type(inner_fun() :: fun((exchange(), queue()) -> any())). + -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), amqp_table()) -> exchange()). @@ -72,11 +74,11 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(add_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(delete_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). @@ -366,10 +368,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% failing on e.g., the durability being different. + InnerFun(X, Q), if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; true -> @@ -391,14 +397,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> Err end. -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, + _ -> InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), {maybe_auto_delete(X), B} end |