summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl24
1 files changed, 24 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06e68a1b..29d428c7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -77,6 +77,9 @@
auto_delete,
arguments,
pid,
+ owner_pid,
+ exclusive_consumer_pid,
+ exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages_uncommitted,
@@ -511,6 +514,18 @@ i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
i(pid, _) ->
self();
+i(owner_pid, #q{owner = none}) ->
+ '';
+i(owner_pid, #q{owner = {ReaderPid, _MonitorRef}}) ->
+ ReaderPid;
+i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
+ ChPid;
+i(exclusive_consumer_tag, #q{exclusive_consumer = none}) ->
+ '';
+i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
+ ConsumerTag;
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
i(messages_unacknowledged, _) ->
@@ -547,6 +562,15 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ reply(rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%