summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-01-16 18:05:08 +0000
committerMatthias Radestock <matthias@lshift.net>2009-01-16 18:05:08 +0000
commit161d2fbdc2e637c4958b9cf276be6abb9d18c52f (patch)
treeab913c40016a330f8486742818af7928a2bc4b83 /src/rabbit_amqqueue.erl
parentc9a6c754570475a9f0e282381d169924e40d2005 (diff)
parentd5197fa54d6831a67c3f96e0dab7f7225f0ad98a (diff)
downloadrabbitmq-server-161d2fbdc2e637c4958b9cf276be6abb9d18c52f.tar.gz
merge default into bug19749
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl27
1 files changed, 19 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cf4c324d..abbdce66 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,9 +37,9 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -91,15 +91,17 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -259,16 +261,22 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server2:call(QPid, {claim_queue, ReaderPid}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server2:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -276,6 +284,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
+unblock(QPid, ChPid) ->
+ gen_server2:cast(QPid, {unblock, ChPid}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->