diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-02-05 14:32:59 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-02-05 14:32:59 +0000 |
commit | 4a0f42dd1b8b9d09c1d403990ec8120290b96dbc (patch) | |
tree | d8765d9e29af55705862e8dbea5fafd685a2271c | |
parent | 3cd3130cb5a0f3c2ee6e265fd438d159cac3e4fa (diff) | |
parent | 4163f3112b08cfeb79681428e339bc6185908baa (diff) | |
download | rabbitmq-server-4a0f42dd1b8b9d09c1d403990ec8120290b96dbc.tar.gz |
Merging bug22300 into default
-rw-r--r-- | docs/rabbitmqctl.1.pod | 20 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
-rw-r--r-- | src/rabbit_control.erl | 18 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 26 |
5 files changed, 74 insertions, 16 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 0c95841e..e26767ab 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -455,10 +455,22 @@ QoS prefetch count limit in force, 0 if unlimited =back -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, -defaulting to I<"/">. The default can be overridden with the B<-p> -flag. +=item list_consumers + +List consumers, i.e. subscriptions to a queue's message stream. Each +line printed shows, separated by tab characters, the name of the queue +subscribed to, the id of the channel process via which the +subscription was created and is managed, the consumer tag which +uniquely identifies the subscription within a channel, and a boolean +indicating whether acknowledgements are expected for messages +delivered to this consumer. + +=back + +The list_queues, list_exchanges, list_bindings and list_consumers +commands accept an optional virtual host parameter for which to +display results, defaulting to I<"/">. The default can be overridden +with the B<-p> flag. =head1 OUTPUT ESCAPING diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index db7461b0..08ad2d78 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,6 +37,7 @@ -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). +-export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, unblock/2]). @@ -74,6 +75,9 @@ -spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). +-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]). +-spec(consumers_all/1 :: + (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]). -spec(stat/1 :: (amqqueue()) -> qstats()). -spec(stat_all/0 :: () -> [qstats()]). -spec(delete/3 :: @@ -96,7 +100,8 @@ -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), + boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -240,6 +245,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +consumers(#amqqueue{ pid = QPid }) -> + gen_server2:pcall(QPid, 9, consumers, infinity). + +consumers_all(VHostPath) -> + lists:concat( + map(VHostPath, + fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} || + {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + end)). + stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e9e1bcee..29d428c7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -562,6 +562,15 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(consumers, _From, + State = #q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + reply(rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); + handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8a891205..382d4826 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -163,6 +163,7 @@ Available commands: list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] list_channels [<ChannelInfoItem> ...] + list_consumers [-p <VHostPath>] Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -203,6 +204,11 @@ messages_unacknowledged, acks_uncommitted, prefetch_count]. The default is to display pid, user, transactional, consumer_count, messages_unacknowledged. +The output format for \"list_consumers\" is a list of rows containing, +in order, the queue name, channel process id, consumer tag, and a +boolean indicating whether acknowledgements are expected from the +consumer. + "), halt(1). @@ -308,8 +314,7 @@ action(list_bindings, Node, Args, Inform) -> display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], - InfoKeys), - ok; + InfoKeys); action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), @@ -325,6 +330,15 @@ action(list_channels, Node, Args, Inform) -> display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); +action(list_consumers, Node, Args, Inform) -> + Inform("Listing consumers", []), + {VHostArg, _} = parse_vhost_flag_bin(Args), + InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], + display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], + InfoKeys); + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 61f6d816..46a8f7df 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -712,10 +712,17 @@ test_user_management() -> test_server_status() -> - %% create a queue so we have something to list - Q = #amqqueue{} = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/">>, queue, <<"foo">>), - false, false, []), + %% create a few things so there is some useful information to list + Writer = spawn(fun () -> receive shutdown -> ok end end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), + [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/">>, queue, Name), + false, false, []) || + Name <- [<<"foo">>, <<"bar">>]], + + ok = rabbit_amqqueue:claim_queue(Q, self()), + ok = rabbit_amqqueue:basic_consume(Q, true, self(), Ch, undefined, + <<"ctag">>, true, undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), @@ -726,9 +733,6 @@ test_server_status() -> %% list bindings ok = control_action(list_bindings, []), - %% cleanup - {ok, _} = rabbit_amqqueue:delete(Q, false, false), - %% list connections [#listener{host = H, port = P} | _] = [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), @@ -744,9 +748,13 @@ test_server_status() -> "go away"]), %% list channels - Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>), ok = info_action(list_channels, rabbit_channel:info_keys(), false), + + %% list consumers + ok = control_action(list_consumers, []), + + %% cleanup + [{ok, _} = rabbit_amqqueue:delete(QR, false, false) || QR <- [Q, Q2]], ok = rabbit_channel:shutdown(Ch), passed. |