diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index db7461b0..08ad2d78 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,6 +37,7 @@ -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). @@ -74,6 +75,9 @@ -spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). +-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). +-spec(consumers_all/1 :: + (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -96,7 +100,8 @@ -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), + boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +consumers(#amqqueue{ pid = QPid }) -> + gen_server2:pcall(QPid, 9, consumers, infinity). + +consumers_all(VHostPath) -> + lists:concat( + map(VHostPath, + fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || + {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + end)). + stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> |