diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eebcfcb9..9764e368 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,8 +37,9 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -58,7 +59,6 @@ -ifdef(use_specs). --type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(ok_or_errors() :: @@ -66,10 +66,14 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(), + maybe(pid)) -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). @@ -79,8 +83,8 @@ -spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). -spec(consumers_all/1 :: (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). --spec(stat/1 :: (amqqueue()) -> qstats()). --spec(stat_all/0 :: () -> [qstats()]). +-spec(stat/1 :: + (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(delete/3 :: (amqqueue(), 'false', 'false') -> qlen(); (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'}; @@ -213,6 +217,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + not_allowed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -247,9 +276,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). - delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -395,7 +421,7 @@ delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, + delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> |