diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-16 18:05:08 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-16 18:05:08 +0000 |
commit | 161d2fbdc2e637c4958b9cf276be6abb9d18c52f (patch) | |
tree | ab913c40016a330f8486742818af7928a2bc4b83 /src/rabbit_amqqueue.erl | |
parent | c9a6c754570475a9f0e282381d169924e40d2005 (diff) | |
parent | d5197fa54d6831a67c3f96e0dab7f7225f0ad98a (diff) | |
download | rabbitmq-server-161d2fbdc2e637c4958b9cf276be6abb9d18c52f.tar.gz |
merge default into bug19749
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 27 |
1 files changed, 19 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cf4c324d..abbdce66 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,9 +37,9 @@ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). @@ -91,15 +91,17 @@ -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -259,16 +261,22 @@ notify_down_all(QPids, ChPid) -> fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> gen_server2:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server2:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). @@ -276,6 +284,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> |