summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-03-14 12:57:42 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-03-14 12:57:42 +0000
commit21d5aa6534515bc7c1f343b5a4b579fe9513b0ce (patch)
treef6f6dcdac4d93ebc28f53fa016b2754a71d62982
parent92a03fa08a503afb6af1988862393a86813d8482 (diff)
downloadqpid-python-21d5aa6534515bc7c1f343b5a4b579fe9513b0ce.tar.gz
QPID-854 : Changes to the client to make the dispatcher responsible for closing the queue browser when all the messages have been processed.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637086 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java51
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java45
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java157
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java5
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java174
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java110
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java103
7 files changed, 475 insertions, 170 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
index 4171e9bf9b..a3cf39003d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
@@ -50,7 +51,9 @@ public class AMQQueueBrowser implements QueueBrowser
_messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
// Create Consumer to verify message selector.
BasicMessageConsumer consumer =
- (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
+ // Close this consumer as we are not looking to consume only to establish that, at least for now,
+ // the QB can be created
consumer.close();
}
@@ -88,40 +91,40 @@ public class AMQQueueBrowser implements QueueBrowser
checkState();
final BasicMessageConsumer consumer =
(BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
- consumer.closeWhenNoMessages(true);
+
_consumers.add(consumer);
return new Enumeration()
+ {
+
+ Message _nextMessage = consumer == null ? null : consumer.receive();
+
+ public boolean hasMoreElements()
{
+ _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
- Message _nextMessage = consumer.receive();
+ return (_nextMessage != null);
+ }
- public boolean hasMoreElements()
+ public Object nextElement()
+ {
+ Message msg = _nextMessage;
+ try
{
- _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+ _logger.info("QB:nextElement about to receive");
- return (_nextMessage != null);
+ _nextMessage = consumer.receive();
+ _logger.info("QB:nextElement received:" + _nextMessage);
}
-
- public Object nextElement()
+ catch (JMSException e)
{
- Message msg = _nextMessage;
- try
- {
- _logger.info("QB:nextElement about to receive");
-
- _nextMessage = consumer.receive();
- _logger.info("QB:nextElement received:" + _nextMessage);
- }
- catch (JMSException e)
- {
- _logger.warn("Exception caught while queue browsing", e);
- _nextMessage = null;
- }
-
- return msg;
+ _logger.warn("Exception caught while queue browsing", e);
+ _nextMessage = null;
}
- };
+
+ return msg;
+ }
+ };
}
public void close() throws JMSException
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 9f7f53a011..0a51ec7c47 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -648,6 +648,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
public void closed(Throwable e) throws JMSException
{
+ // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived
+ // calls through connection.closeAllSessions which is also called by the public connection.close()
+ // with a null cause
+ // When we are closing the Session due to a protocol session error we simply create a new AMQException
+ // with the correct error code and text this is cleary WRONG as the instanceof check below will fail.
+ // We need to determin here if the connection should be
+
synchronized (_connection.getFailoverMutex())
{
if (e instanceof AMQDisconnectedException)
@@ -763,13 +770,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
if (consumer != null)
{
- // fixme this isn't right.. needs to check if _queue contains data for this consumer
- if (consumer.isAutoClose()) // && _queue.isEmpty())
- {
- consumer.closeWhenNoMessages(true);
- }
-
- if (!consumer.isNoConsume())
+ if (!consumer.isNoConsume()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -785,7 +786,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_dispatcher.rejectPending(consumer);
}
- else
+ else // Queue Browser
{
// Just close the consumer
// fixme the CancelOK is being processed before the arriving messages..
@@ -793,13 +794,28 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// has yet to receive before the close comes in.
// consumer.markClosed();
+
+
+
+ if (consumer.isAutoClose())
+ { // There is a small window where the message is between the two queues in the dispatcher.
+ if (consumer.isClosed())
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + consumer.debugIdentity());
+ }
+
+ deregisterConsumer(consumer);
+
+ }
+ else
+ {
+ _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+ }
+ }
}
}
- else
- {
- _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
- }
-
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
@@ -2934,7 +2950,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_lock.wait(2000);
}
- if (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get())
+ if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+ && (message.getDeliverBody().getDeliveryTag() <= _rollbackMark.get()))
{
rejectMessage(message, true);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 605e2d1e83..efbce6033b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -38,6 +38,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -121,7 +122,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
* on the queue. This is used for queue browsing.
*/
private final boolean _autoClose;
- private boolean _closeWhenNoMessages;
private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
@@ -358,7 +358,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted acquire: " + e);
if (isClosed())
{
return null;
@@ -369,11 +369,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = null;
if (l > 0)
{
@@ -386,7 +381,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted poll: " + e);
if (isClosed())
{
return null;
@@ -404,7 +399,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e);
+ _logger.warn("Interrupted take: " + e);
if (isClosed())
{
return null;
@@ -426,20 +421,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- private boolean closeOnAutoClose() throws JMSException
- {
- if (isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
- {
- close(false);
-
- return true;
- }
- else
- {
- return false;
- }
- }
-
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -468,11 +449,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
try
{
- if (closeOnAutoClose())
- {
- return null;
- }
-
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -513,6 +489,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw e;
}
+ else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ _closed.set(true);
+ deregisterConsumer();
+ return null;
+ }
else
{
return (AbstractJMSMessage) o;
@@ -526,31 +508,30 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public void close(boolean sendClose) throws JMSException
{
- // synchronized (_closed)
-
if (_logger.isInfoEnabled())
{
_logger.info("Closing consumer:" + debugIdentity());
}
- synchronized (_connection.getFailoverMutex())
+ if (!_closed.getAndSet(true))
{
- if (!_closed.getAndSet(true))
+ if (_logger.isDebugEnabled())
{
- if (_logger.isDebugEnabled())
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ if (_closedStack != null)
{
- StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
- if (_closedStack != null)
- {
- _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
- }
- else
- {
- _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
- }
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
+ else
+ {
+ _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
+ }
+ }
- if (sendClose)
+ if (sendClose)
+ {
+ // The Synchronized block only needs to protect network traffic.
+ synchronized (_connection.getFailoverMutex())
{
BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
@@ -564,7 +545,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
-
}
catch (AMQException e)
{
@@ -575,24 +555,26 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
}
}
- else
- {
- // //fixme this probably is not right
- // if (!isNoConsume())
- { // done in BasicCancelOK Handler but not sending one so just deregister.
- deregisterConsumer();
- }
+ }
+ else
+ {
+ // //fixme this probably is not right
+ // if (!isNoConsume())
+ { // done in BasicCancelOK Handler but not sending one so just deregister.
+ deregisterConsumer();
}
+ }
- if ((_messageListener != null) && _receiving.get())
+ // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
+ // so we need to let it know it is time to close.
+ if ((_messageListener != null) && _receiving.get())
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Interrupting thread: " + _receivingThread);
- }
-
- _receivingThread.interrupt();
+ _logger.info("Interrupting thread: " + _receivingThread);
}
+
+ _receivingThread.interrupt();
}
}
}
@@ -634,6 +616,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
void notifyMessage(UnprocessedMessage messageFrame)
{
+ if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+ {
+ notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+ return;
+ }
+
final boolean debug = _logger.isDebugEnabled();
if (debug)
@@ -646,12 +634,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
final BasicDeliverBody deliverBody = messageFrame.getDeliverBody();
AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(deliverBody.getDeliveryTag(),
- deliverBody.getRedelivered(),
- deliverBody.getExchange(),
- deliverBody.getRoutingKey(),
- messageFrame.getContentHeader(),
- messageFrame.getBodies());
+ _messageFactory.createMessage(deliverBody.getDeliveryTag(),
+ deliverBody.getRedelivered(),
+ deliverBody.getExchange(),
+ deliverBody.getRoutingKey(),
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{
@@ -688,9 +676,32 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
}
- /**
- * @param jmsMessage this message has already been processed so can't redo preDeliver
- */
+ /** @param closeMessage this message signals that we should close the browser */
+ public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+ {
+ if (isMessageListenerSet())
+ {
+ // Currently only possible to get this msg type with a browser.
+ // If we get the message here then we should probably just close this consumer.
+ // Though an AutoClose consumer with message listener is quite odd...
+ // Just log out the fact so we know where we are
+ _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+ }
+ else
+ {
+ try
+ {
+ _synchronousQueue.put(closeMessage);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing," +
+ "but we shouldn't have close yet");
+ }
+ }
+ }
+
+ /** @param jmsMessage this message has already been processed so can't redo preDeliver */
public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
@@ -913,18 +924,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _noConsume;
}
- public void closeWhenNoMessages(boolean b)
- {
- _closeWhenNoMessages = b;
-
- if (_closeWhenNoMessages && _synchronousQueue.isEmpty() && _receiving.get() && (_messageListener != null))
- {
- _closed.set(true);
- _receivingThread.interrupt();
- }
-
- }
-
public void rollback()
{
clearUnackedMessages();
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
index 8c8814e9b7..a580a6466d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
@@ -104,6 +104,11 @@ public class ChannelCloseMethodHandler implements StateAwareMethodListener<Chann
}
// fixme why is this only done when the close is expected...
// should the above forced closes not also cause a close?
+ // ----------
+ // Closing the session only when it is expected allows the errors to be processed
+ // Calling this here will prevent failover. So we should do this for all exceptions
+ // that should never cause failover. Such as authentication errors.
+
session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
index bc1ba155cb..18157adc34 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,10 +24,20 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQChannelException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -128,31 +138,159 @@ public abstract class UnprocessedMessage
}
public static final class UnprocessedBouncedMessage extends UnprocessedMessage
+ {
+ private final BasicReturnBody _body;
+
+ public UnprocessedBouncedMessage(final BasicReturnBody body)
{
- private final BasicReturnBody _body;
+ _body = body;
+ }
- public UnprocessedBouncedMessage(final BasicReturnBody body)
- {
- _body = body;
- }
+ public BasicDeliverBody getDeliverBody()
+ {
+ return null;
+ }
- public BasicDeliverBody getDeliverBody()
- {
- return null;
- }
+ public BasicReturnBody getBounceBody()
+ {
+ return _body;
+ }
- public BasicReturnBody getBounceBody()
- {
- return _body;
- }
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
+
+ public static final class CloseConsumerMessage extends UnprocessedMessage
+ {
+ BasicMessageConsumer _consumer;
- public boolean isDeliverMessage()
+ public CloseConsumerMessage(BasicMessageConsumer consumer)
+ {
+ _consumer = consumer;
+ }
+
+ public BasicDeliverBody getDeliverBody()
+ {
+ return new BasicDeliverBody()
{
- return false;
- }
+ // This is the only thing we need to preserve so the correct consumer can be found later.
+ public AMQShortString getConsumerTag()
+ {
+ return _consumer.getConsumerTag();
+ }
+
+ // The Rest of these methods are not used
+ public long getDeliveryTag()
+ {
+ return 0;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean getRedelivered()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+
+ public byte getMajor()
+ {
+ return 0;
+ }
+
+ public byte getMinor()
+ {
+ return 0;
+ }
+
+ public int getClazz()
+ {
+ return 0;
+ }
+
+ public int getMethod()
+ {
+ return 0;
+ }
+
+ public void writeMethodPayload(ByteBuffer buffer)
+ {
+ }
+
+ public byte getFrameType()
+ {
+ return 0;
+ }
+
+ public int getSize()
+ {
+ return 0;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ }
+
+ public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException
+ {
+ }
+
+ public AMQFrame generateFrame(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelNotFoundException(int channelId)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message)
+ {
+ return null;
+ }
+
+ public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
+ {
+ return null;
+ }
+
+ public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
+ {
+ return false;
+ }
+ };
}
+ public BasicReturnBody getBounceBody()
+ {
+ return null;
+ }
+ public boolean isDeliverMessage()
+ {
+ return false;
+ }
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
new file mode 100644
index 0000000000..b07778cdaa
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
@@ -0,0 +1,110 @@
+package org.apache.qpid.test.client;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.test.VMTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
+import java.util.Enumeration;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+public class CancelTest extends VMTestCase
+{
+ private static final Logger _logger = Logger.getLogger(QueueBrowserTest.class);
+
+ private Connection _clientConnection;
+ private Session _clientSession;
+ private Queue _queue;
+
+ public void setUp() throws Exception
+ {
+
+ super.setUp();
+
+ _queue = (Queue) _context.lookup("queue");
+
+ //Create Client
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientConnection.start();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Ensure _queue is created
+ _clientSession.createConsumer(_queue).close();
+ }
+
+ /**
+ * Simply
+ */
+ public void test() throws JMSException, NamingException
+ {
+ Connection producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ producerConnection.start();
+
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(_queue);
+ producer.send(producerSession.createTextMessage());
+ producerConnection.close();
+
+
+ QueueBrowser browser = _clientSession.createBrowser(_queue);
+ Enumeration e = browser.getEnumeration();
+
+
+ while (e.hasMoreElements())
+ {
+ e.nextElement();
+ }
+
+ browser.close();
+
+ MessageConsumer consumer = _clientSession.createConsumer(_queue);
+ consumer.receive();
+ consumer.close();
+ }
+
+ public void loop()
+ {
+ try
+ {
+ int run = 0;
+ while (true)
+ {
+ System.err.println(run++);
+ test();
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e, e);
+ }
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
index 06f05462ba..72b8dfcb1c 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/QueueBrowserTest.java
@@ -14,14 +14,17 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.test.client;
import org.apache.log4j.Logger;
import org.apache.qpid.test.VMTestCase;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.AMQException;
import javax.jms.Queue;
import javax.jms.ConnectionFactory;
@@ -88,7 +91,6 @@ public class QueueBrowserTest extends VMTestCase
private void checkQueueDepth(int depth) throws JMSException, NamingException
{
- sendMessages(depth);
// create QueueBrowser
_logger.info("Creating Queue Browser");
@@ -101,6 +103,19 @@ public class QueueBrowserTest extends VMTestCase
_logger.debug("Checking for " + depth + " messages with QueueBrowser");
}
+ long queueDepth = 0;
+
+ try
+ {
+ queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
+ }
+ catch (AMQException e)
+ {
+ }
+
+ assertEquals("Session reports Queue depth not as expected", depth, queueDepth);
+
+
int msgCount = 0;
Enumeration msgs = queueBrowser.getEnumeration();
@@ -116,11 +131,7 @@ public class QueueBrowserTest extends VMTestCase
}
// check to see if all messages found
-// assertEquals("browser did not find all messages", MSG_COUNT, msgCount);
- if (msgCount != depth)
- {
- _logger.warn(msgCount + " off" + depth + " messages received.");
- }
+ assertEquals("Browser did not find all messages", depth, msgCount);
//Close browser
queueBrowser.close();
@@ -132,39 +143,61 @@ public class QueueBrowserTest extends VMTestCase
*
*/
- public void testQueueBrowserMsgsRemainOnQueue() throws Exception
- {
- int messages = 10;
+ public void testQueueBrowserMsgsRemainOnQueue() throws Exception
+ {
+ int messages = 10;
+
+ sendMessages(messages);
+
+ checkQueueDepth(messages);
- checkQueueDepth(messages);
+ // VERIFY
- // VERIFY
+ // continue and try to receive all messages
+ MessageConsumer consumer = _clientSession.createConsumer(_queue);
- // continue and try to receive all messages
- MessageConsumer consumer = _clientSession.createConsumer(_queue);
+ _logger.info("Verify messages are still on the queue");
- _logger.info("Verify messages are still on the queue");
+ Message tempMsg;
- Message tempMsg;
+ for (int msgCount = 0; msgCount < messages; msgCount++)
+ {
+ tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
+ if (tempMsg == null)
+ {
+ fail("Message " + msgCount + " not retrieved from queue");
+ }
+ }
- for (int msgCount = 0; msgCount < messages; msgCount++)
- {
- tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
- if (tempMsg == null)
- {
- fail("Message " + msgCount + " not retrieved from queue");
- }
- }
+ consumer.close();
+
+ _logger.info("All messages recevied from queue");
+ }
- _logger.info("All messages recevied from queue");
- }
+ /**
+ * This tests you can browse an empty queue, see QPID-785
+ *
+ * @throws Exception
+ */
+ public void testBrowsingEmptyQueue() throws Exception
+ {
+ checkQueueDepth(0);
+ }
- /**
- * This tests you can browse an empty queue, see QPID-785
- * @throws Exception
- */
- public void testBrowsingEmptyQueue() throws Exception
- {
- checkQueueDepth(0);
- }
+ public void loop() throws JMSException
+ {
+ int run = 0;
+ try
+ {
+ while (true)
+ {
+ System.err.println(run++ + ":************************************************************************");
+ testQueueBrowserMsgsRemainOnQueue();
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error(e, e);
+ }
+ }
}