diff options
author | Robert Gemmell <robbie@apache.org> | 2012-02-01 15:19:32 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-02-01 15:19:32 +0000 |
commit | 08b3d439ce5cdcd127d14489ba4730ae3f2c7724 (patch) | |
tree | 2525eccc1500c3e9b20d039458ab42b5defdb605 | |
parent | 1fa5f50355acc1b6e9673eb12667ed63d92a4884 (diff) | |
download | qpid-python-08b3d439ce5cdcd127d14489ba4730ae3f2c7724.tar.gz |
QPID-3790: Add a method AMQSession.getQueueDepth(AMQDestionation, boolean) to sync session (if specified) before sending QueueQuery command
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1239166 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 59 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 25a2875b3f..82ba04ddd3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2752,18 +2752,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public long getQueueDepth(final AMQDestination amqd) throws AMQException { - return new FailoverNoopSupport<Long, AMQException>( - new FailoverProtectedOperation<Long, AMQException>() - { - public Long execute() throws AMQException, FailoverException - { - return requestQueueDepth(amqd); - } - }, _connection).execute(); + return getQueueDepth(amqd, false); + } + /** + * Returns the number of messages currently queued by the given + * destination. Syncs session before receiving the queue depth if sync is + * set to true. + * + * @param amqd AMQ destination to get the depth value + * @param sync flag to sync session before receiving the queue depth + * @return queue depth + * @throws AMQException + */ + public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException + { + return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>() + { + public Long execute() throws AMQException, FailoverException + { + try + { + return requestQueueDepth(amqd, sync); + } + catch (TransportException e) + { + throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e); + } + } + }, _connection).execute(); } - protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException; + protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException; /** * Declares the named exchange and type of exchange. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d38f4bbb2f..c092fa6ccb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -951,9 +951,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic }, getAMQConnection()).execute(); } - protected Long requestQueueDepth(AMQDestination amqd) + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) { flushAcknowledgments(); + if (sync) + { + getQpidSession().sync(); + } return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 96994e7963..24d73c583e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -587,7 +587,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } - protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException + protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { AMQFrame queueDeclare = getMethodRegistry().createQueueDeclareBody(getTicket(), diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java index 6c8ccb139e..4bfbd3c726 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java @@ -18,9 +18,8 @@ */ package org.apache.qpid.client; -import junit.framework.TestCase; - import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.transport.*; import org.apache.qpid.transport.Connection.SessionFactory; import org.apache.qpid.transport.Connection.State; @@ -39,7 +38,7 @@ import java.util.List; * {@link SessionException} is not thrown from methods of * {@link AMQSession_0_10}. */ -public class AMQSession_0_10Test extends TestCase +public class AMQSession_0_10Test extends QpidTestCase { public void testExceptionOnCommit() @@ -460,6 +459,28 @@ public class AMQSession_0_10Test extends TestCase assertNotNull("ExchangeDeclare event was not sent", event); } + public void testGetQueueDepthWithSync() + { + // slow down a flush thread + setTestSystemProperty("qpid.session.max_ack_delay", "10000"); + AMQSession_0_10 session = createAMQSession_0_10(false, javax.jms.Session.DUPS_OK_ACKNOWLEDGE); + try + { + session.acknowledgeMessage(-1, false); + session.getQueueDepth(createDestination(), true); + } + catch (Exception e) + { + fail("Unexpected exception is cought:" + e.getMessage()); + } + ProtocolEvent command = findSentProtocolEventOfClass(session, MessageAccept.class, false); + assertNotNull("MessageAccept command was not sent", command); + command = findSentProtocolEventOfClass(session, ExecutionSync.class, false); + assertNotNull("ExecutionSync command was not sent", command); + command = findSentProtocolEventOfClass(session, QueueQuery.class, false); + assertNotNull("QueueQuery command was not sent", command); + } + private AMQAnyDestination createDestination() { AMQAnyDestination destination = null; |