diff options
author | Aidan Skinner <aidan@apache.org> | 2008-01-28 16:48:00 +0000 |
---|---|---|
committer | Aidan Skinner <aidan@apache.org> | 2008-01-28 16:48:00 +0000 |
commit | 5b060554268c763cb883a102b04be21741551161 (patch) | |
tree | 8961572178d7b619d26f9804ef0e19199a30ef66 /java/client | |
parent | 6445226bb0f615c47fba16390160db9588c1039f (diff) | |
download | qpid-python-5b060554268c763cb883a102b04be21741551161.tar.gz |
Merged revisions 608477,609961,610475,610479,610806,611146 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r608477 | rgodfrey | 2008-01-03 13:23:04 +0000 (Thu, 03 Jan 2008) | 1 line
QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages
........
r609961 | ritchiem | 2008-01-08 12:59:01 +0000 (Tue, 08 Jan 2008) | 2 lines
QPID-499 : Patch to update the queue size statistics when the Active TTL process runs
Removed old single commented out code line from AMQSession.
........
r610475 | ritchiem | 2008-01-09 17:32:43 +0000 (Wed, 09 Jan 2008) | 1 line
Qpid-723 Added exec to qpid.start
........
r610479 | ritchiem | 2008-01-09 17:39:54 +0000 (Wed, 09 Jan 2008) | 1 line
Qpid-690 : Provide configurable delay between re-connecion attempts.
........
r610806 | ritchiem | 2008-01-10 14:41:37 +0000 (Thu, 10 Jan 2008) | 1 line
QPID-690 : Relaxed the timings on failover as Thread.sleep is accurate to 10ms so may finish the sleep 10ms early. Resulting in erratic failures as 9.9s < 10s.
........
r611146 | ritchiem | 2008-01-11 11:33:31 +0000 (Fri, 11 Jan 2008) | 1 line
Patch by Aidan Skinner to make third constructor public. This is done so that the BDBMessageStore tests can still run with the addition of the VirtualHost reaper thread.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@615943 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 65 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java | 5 |
2 files changed, 69 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 42f07f97f9..deaa435d8c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; @@ -2148,6 +2149,70 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } + + /** + * Returns the number of messages currently queued for the given destination. + * + * <p/>Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to be checked + * + * @return the number of queued messages. + * + * @throws AMQException If the queue cannot be declared for any reason. + */ + public long getQueueDepth(final AMQDestination amqd) + throws AMQException + { + + class QueueDeclareOkHandler extends SpecificMethodFrameListener + { + + private long _messageCount; + private long _consumerCount; + + public QueueDeclareOkHandler() + { + super(getChannelId(), QueueDeclareOkBody.class); + } + + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + { + boolean matches = super.processMethod(channelId, frame); + QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; + _messageCount = declareOk.getMessageCount(); + _consumerCount = declareOk.getConsumerCount(); + return matches; + } + + } + + return new FailoverNoopSupport<Long, AMQException>( + new FailoverProtectedOperation<Long, AMQException>() + { + public Long execute() throws AMQException, FailoverException + { + + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(_channelId); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + + return okHandler._messageCount; + } + }, _connection).execute(); + + } + + + /** * Declares the named exchange and type of exchange. * diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index e8a220f5e9..c1116ca01e 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -283,7 +283,10 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); - _acceptor.unbindAll(); + if (_acceptor != null) + { + _acceptor.unbindAll(); + } synchronized (_inVmPipeAddress) { _inVmPipeAddress.clear(); |