summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/AMQSession.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java96
1 files changed, 68 insertions, 28 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f484383ca..91df4f7d35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -39,35 +39,9 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -2107,6 +2081,72 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
+
+ /**
+ * Returns the number of messages currently queued for the given destination.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqd The destination to be checked
+ *
+ * @return the number of queued messages.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ */
+ public long getQueueDepth(final AMQDestination amqd)
+ throws AMQException
+ {
+
+ class QueueDeclareOkHandler extends SpecificMethodFrameListener
+ {
+
+ private long _messageCount;
+ private long _consumerCount;
+
+ public QueueDeclareOkHandler()
+ {
+ super(getChannelId(), QueueDeclareOkBody.class);
+ }
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ {
+ boolean matches = super.processMethod(channelId, frame);
+ QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+ _messageCount = declareOk.getMessageCount();
+ _consumerCount = declareOk.getConsumerCount();
+ return matches;
+ }
+
+ }
+
+ return new FailoverNoopSupport<Long, AMQException>(
+ new FailoverProtectedOperation<Long, AMQException>()
+ {
+ public Long execute() throws AMQException, FailoverException
+ {
+
+ AMQFrame queueDeclare =
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ false, // nowait
+ true, // passive
+ amqd.getAMQQueueName(), // queue
+ getTicket()); // ticket
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ //getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+ return okHandler._messageCount;
+ }
+ }, _connection).execute();
+
+ }
+
+
+
/**
* Declares the named exchange and type of exchange.
*