summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-02-05 14:32:59 +0000
committerMatthew Sackman <matthew@lshift.net>2010-02-05 14:32:59 +0000
commit4a0f42dd1b8b9d09c1d403990ec8120290b96dbc (patch)
treed8765d9e29af55705862e8dbea5fafd685a2271c
parent3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa (diff)
parent4163f3112b08cfeb79681428e339bc6185908baa (diff)
downloadrabbitmq-server-4a0f42dd1b8b9d09c1d403990ec8120290b96dbc.tar.gz
Merging bug22300 into default
-rw-r--r--docs/rabbitmqctl.1.pod20
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_control.erl18
-rw-r--r--src/rabbit_tests.erl26
5 files changed, 74 insertions, 16 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 0c95841e..e26767ab 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -455,10 +455,22 @@ QoS prefetch count limit in force, 0 if unlimited
=back
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results,
-defaulting to I<"/">. The default can be overridden with the B<-p>
-flag.
+=item list_consumers
+
+List consumers, i.e. subscriptions to a queue's message stream. Each
+line printed shows, separated by tab characters, the name of the queue
+subscribed to, the id of the channel process via which the
+subscription was created and is managed, the consumer tag which
+uniquely identifies the subscription within a channel, and a boolean
+indicating whether acknowledgements are expected for messages
+delivered to this consumer.
+
+=back
+
+The list_queues, list_exchanges, list_bindings and list_consumers
+commands accept an optional virtual host parameter for which to
+display results, defaulting to I<"/">. The default can be overridden
+with the B<-p> flag.
=head1 OUTPUT ESCAPING
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index db7461b0..08ad2d78 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,6 +37,7 @@
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
@@ -74,6 +75,9 @@
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(consumers_all/1 ::
+ (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -96,7 +100,8 @@
-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(),
+ boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+consumers(#amqqueue{ pid = QPid }) ->
+ gen_server2:pcall(QPid, 9, consumers, infinity).
+
+consumers_all(VHostPath) ->
+ lists:concat(
+ map(VHostPath,
+ fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
+ {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ end)).
+
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9e1bcee..29d428c7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -562,6 +562,15 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ reply(rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8a891205..382d4826 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -163,6 +163,7 @@ Available commands:
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
list_channels [<ChannelInfoItem> ...]
+ list_consumers [-p <VHostPath>]
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -203,6 +204,11 @@ messages_unacknowledged, acks_uncommitted, prefetch_count]. The
default is to display pid, user, transactional, consumer_count,
messages_unacknowledged.
+The output format for \"list_consumers\" is a list of rows containing,
+in order, the queue name, channel process id, consumer tag, and a
+boolean indicating whether acknowledgements are expected from the
+consumer.
+
"),
halt(1).
@@ -308,8 +314,7 @@ action(list_bindings, Node, Args, Inform) ->
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys),
- ok;
+ InfoKeys);
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
@@ -325,6 +330,15 @@ action(list_channels, Node, Args, Inform) ->
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
+action(list_consumers, Node, Args, Inform) ->
+ Inform("Listing consumers", []),
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
+ InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
+ display_info_list(
+ [lists:zip(InfoKeys, tuple_to_list(X)) ||
+ X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
+ InfoKeys);
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 61f6d816..46a8f7df 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -712,10 +712,17 @@ test_user_management() ->
test_server_status() ->
- %% create a queue so we have something to list
- Q = #amqqueue{} = rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>),
- false, false, []),
+ %% create a few things so there is some useful information to list
+ Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
+ rabbit_misc:r(<<"/">>, queue, Name),
+ false, false, []) ||
+ Name <- [<<"foo">>, <<"bar">>]],
+
+ ok = rabbit_amqqueue:claim_queue(Q, self()),
+ ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined,
+ <<"ctag">>, true, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -726,9 +733,6 @@ test_server_status() ->
%% list bindings
ok = control_action(list_bindings, []),
- %% cleanup
- {ok, _} = rabbit_amqqueue:delete(Q, false, false),
-
%% list connections
[#listener{host = H, port = P} | _] =
[L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
@@ -744,9 +748,13 @@ test_server_status() ->
"go away"]),
%% list channels
- Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
ok = info_action(list_channels, rabbit_channel:info_keys(), false),
+
+ %% list consumers
+ ok = control_action(list_consumers, []),
+
+ %% cleanup
+ [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]],
ok = rabbit_channel:shutdown(Ch),
passed.