summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-28 15:29:05 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-28 15:29:05 +0100
commit1dcc251bb05c8d979ae8b21eafbedb7166812823 (patch)
tree066e0ae2ed42ee80af5367941796b6fde45f4b2d
parentb4b29b285354678df1262799b59f6bf4259a5c94 (diff)
downloadrabbitmq-server-1dcc251bb05c8d979ae8b21eafbedb7166812823.tar.gz
Cherry-pick the queue-exclusivity-on-(un)binding thing from amqp_0_9_1.
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_exchange.erl23
3 files changed, 29 insertions, 13 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 483b5a93..36dc1b90 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,7 +195,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 31bb54c0..e915c7b3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -771,7 +771,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -779,7 +779,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -853,7 +853,9 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -864,7 +866,13 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ CheckExclusive =
+ fun(_X, Q) ->
+ with_exclusive_access_or_die(Q#amqqueue.name,
+ ReaderPid, fun(_Q1)-> ok end)
+ end,
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ CheckExclusive) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 8f41392f..b3b9e1b4 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2]).
@@ -58,6 +58,8 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
amqp_table()) -> exchange()).
@@ -72,11 +74,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -366,10 +368,14 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% failing on e.g., the durability being different.
+ InnerFun(X, Q),
if Q#amqqueue.durable and not(X#exchange.durable) ->
{error, durability_settings_incompatible};
true ->
@@ -391,14 +397,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ _ -> InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
{maybe_auto_delete(X), B}
end