summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-02-01 15:19:32 +0000
committerRobert Gemmell <robbie@apache.org>2012-02-01 15:19:32 +0000
commit08b3d439ce5cdcd127d14489ba4730ae3f2c7724 (patch)
tree2525eccc1500c3e9b20d039458ab42b5defdb605
parent1fa5f50355acc1b6e9673eb12667ed63d92a4884 (diff)
downloadqpid-python-08b3d439ce5cdcd127d14489ba4730ae3f2c7724.tar.gz
QPID-3790: Add a method AMQSession.getQueueDepth(AMQDestionation, boolean) to sync session (if specified) before sending QueueQuery command
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1239166 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java27
4 files changed, 59 insertions, 14 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 25a2875b3f..82ba04ddd3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -2752,18 +2752,38 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public long getQueueDepth(final AMQDestination amqd)
throws AMQException
{
- return new FailoverNoopSupport<Long, AMQException>(
- new FailoverProtectedOperation<Long, AMQException>()
- {
- public Long execute() throws AMQException, FailoverException
- {
- return requestQueueDepth(amqd);
- }
- }, _connection).execute();
+ return getQueueDepth(amqd, false);
+ }
+ /**
+ * Returns the number of messages currently queued by the given
+ * destination. Syncs session before receiving the queue depth if sync is
+ * set to true.
+ *
+ * @param amqd AMQ destination to get the depth value
+ * @param sync flag to sync session before receiving the queue depth
+ * @return queue depth
+ * @throws AMQException
+ */
+ public long getQueueDepth(final AMQDestination amqd, final boolean sync) throws AMQException
+ {
+ return new FailoverNoopSupport<Long, AMQException>(new FailoverProtectedOperation<Long, AMQException>()
+ {
+ public Long execute() throws AMQException, FailoverException
+ {
+ try
+ {
+ return requestQueueDepth(amqd, sync);
+ }
+ catch (TransportException e)
+ {
+ throw new AMQException(AMQConstant.getConstant(getErrorCode(e)), e.getMessage(), e);
+ }
+ }
+ }, _connection).execute();
}
- protected abstract Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException;
+ protected abstract Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException;
/**
* Declares the named exchange and type of exchange.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d38f4bbb2f..c092fa6ccb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -951,9 +951,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}, getAMQConnection()).execute();
}
- protected Long requestQueueDepth(AMQDestination amqd)
+ protected Long requestQueueDepth(AMQDestination amqd, boolean sync)
{
flushAcknowledgments();
+ if (sync)
+ {
+ getQpidSession().sync();
+ }
return getQpidSession().queueQuery(amqd.getQueueName()).get().getMessageCount();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 96994e7963..24d73c583e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -587,7 +587,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
}
- protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+ protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 6c8ccb139e..4bfbd3c726 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -18,9 +18,8 @@
*/
package org.apache.qpid.client;
-import junit.framework.TestCase;
-
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.*;
import org.apache.qpid.transport.Connection.SessionFactory;
import org.apache.qpid.transport.Connection.State;
@@ -39,7 +38,7 @@ import java.util.List;
* {@link SessionException} is not thrown from methods of
* {@link AMQSession_0_10}.
*/
-public class AMQSession_0_10Test extends TestCase
+public class AMQSession_0_10Test extends QpidTestCase
{
public void testExceptionOnCommit()
@@ -460,6 +459,28 @@ public class AMQSession_0_10Test extends TestCase
assertNotNull("ExchangeDeclare event was not sent", event);
}
+ public void testGetQueueDepthWithSync()
+ {
+ // slow down a flush thread
+ setTestSystemProperty("qpid.session.max_ack_delay", "10000");
+ AMQSession_0_10 session = createAMQSession_0_10(false, javax.jms.Session.DUPS_OK_ACKNOWLEDGE);
+ try
+ {
+ session.acknowledgeMessage(-1, false);
+ session.getQueueDepth(createDestination(), true);
+ }
+ catch (Exception e)
+ {
+ fail("Unexpected exception is cought:" + e.getMessage());
+ }
+ ProtocolEvent command = findSentProtocolEventOfClass(session, MessageAccept.class, false);
+ assertNotNull("MessageAccept command was not sent", command);
+ command = findSentProtocolEventOfClass(session, ExecutionSync.class, false);
+ assertNotNull("ExecutionSync command was not sent", command);
+ command = findSentProtocolEventOfClass(session, QueueQuery.class, false);
+ assertNotNull("QueueQuery command was not sent", command);
+ }
+
private AMQAnyDestination createDestination()
{
AMQAnyDestination destination = null;