summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-11-02 16:39:57 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-11-02 16:39:57 +0000
commitc45ccc5523a8ae4a0fa39ad9e195218d81212539 (patch)
tree94b7b84b51595f0c561cc855a4116eb7f04e1b51 /java/amqp-1-0-client-jms
parent8c34459b707aa9768c95dc9a5aa76ea2cf8b2804 (diff)
downloadqpid-python-c45ccc5523a8ae4a0fa39ad9e195218d81212539.tar.gz
QPID-4411 : QPID JMS QueueBrowser should defer getting messages until getEnumeration() is called.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1405042 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/amqp-1-0-client-jms')
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java133
1 files changed, 88 insertions, 45 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
index 527e82eaed..914ec4bd0f 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
@@ -18,9 +18,7 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import java.util.*;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
@@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.Filter;
import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -39,49 +38,27 @@ public class QueueBrowserImpl implements QueueBrowser
private static final String JMS_SELECTOR = "jms-selector";
private QueueImpl _queue;
private String _selector;
- private Receiver _receiver;
- private Message _nextElement;
- private MessageEnumeration _enumeration;
+ private final SessionImpl _session;
+ private Map<Symbol, Filter> _filters;
+ private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+ private boolean _closed;
QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
{
_queue = queue;
_selector = selector;
+ _session = session;
- Map<Symbol, Filter> filters;
if(selector == null || selector.trim().equals(""))
{
- filters = null;
+ _filters = null;
}
else
{
- filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
- }
-
-
- try
- {
- _receiver = session.getClientSession().createReceiver(queue.getAddress(),
- StdDistMode.COPY,
- AcknowledgeMode.AMO,null,
- false,
- filters, null);
- _nextElement = _receiver.receive(0L);
- _enumeration = new MessageEnumeration();
- }
- catch(AmqpErrorException e)
- {
- org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
- if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
- {
- throw new InvalidSelectorException(e.getMessage());
- }
- else
- {
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
- }
-
+ _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+ // We do this just to have the server validate the filter..
+ new MessageEnumeration().close();
}
}
@@ -97,42 +74,108 @@ public class QueueBrowserImpl implements QueueBrowser
public Enumeration getEnumeration() throws JMSException
{
- if(_enumeration == null)
+ if(_closed)
{
throw new IllegalStateException("Browser has been closed");
}
- return _enumeration;
+ return new MessageEnumeration();
}
public void close() throws JMSException
{
- _receiver.close();
- _enumeration = null;
+ _closed = true;
+ for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+ {
+ me.close();
+ }
}
private final class MessageEnumeration implements Enumeration<Message>
{
+ private Receiver _receiver;
+ private Message _nextElement;
+ private boolean _needNext = true;
+
+ MessageEnumeration() throws JMSException
+ {
+ try
+ {
+ _receiver = _session.getClientSession().createReceiver(_queue.getAddress(),
+ StdDistMode.COPY,
+ AcknowledgeMode.AMO, null,
+ false,
+ _filters, null);
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+ catch(AmqpErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ }
+
+ }
+ _enumerations.add(this);
+
+ }
+
+ public void close()
+ {
+ _enumerations.remove(this);
+ _receiver.close();
+ _receiver = null;
+ }
@Override
public boolean hasMoreElements()
{
+ if( _receiver == null )
+ {
+ return false;
+ }
+ if( _needNext )
+ {
+ _needNext = false;
+ _nextElement = _receiver.receive(0L);
+ if( _nextElement == null )
+ {
+ // Drain to verify there really are no more messages.
+ _receiver.drain();
+ _receiver.drainWait();
+ _nextElement = _receiver.receive(0L);
+ if( _nextElement == null )
+ {
+ close();
+ }
+ else
+ {
+ // there are still more messages, open up the credit window again..
+ _receiver.clearDrain();
+ }
+ }
+ }
return _nextElement != null;
}
@Override
public Message nextElement()
{
-
- Message message = _nextElement;
- if(message == null)
+ if( hasMoreElements() )
{
- message = _receiver.receive(0l);
+ Message message = _nextElement;
+ _nextElement = null;
+ _needNext = true;
+ return message;
}
- if(message != null)
+ else
{
- _nextElement = _receiver.receive(0l);
+ throw new NoSuchElementException();
}
- return message;
}
}
}