summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl17
1 files changed, 16 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index db7461b0..08ad2d78 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,6 +37,7 @@
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
@@ -74,6 +75,9 @@
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(consumers_all/1 ::
+ (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -96,7 +100,8 @@
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+ boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+consumers(#amqqueue{ pid = QPid }) ->
+ gen_server2:pcall(QPid, 9, consumers, infinity).
+
+consumers_all(VHostPath) ->
+ lists:concat(
+ map(VHostPath,
+ fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
+ {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ end)).
+
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->