summaryrefslogtreecommitdiff
path: root/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java')
-rw-r--r--java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java151
1 files changed, 105 insertions, 46 deletions
diff --git a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
index 527e82eaed..8fab315b10 100644
--- a/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
+++ b/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueBrowserImpl.java
@@ -18,9 +18,7 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.Map;
+import java.util.*;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
@@ -29,6 +27,7 @@ import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.QueueBrowser;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.messaging.Filter;
import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -39,49 +38,27 @@ public class QueueBrowserImpl implements QueueBrowser
private static final String JMS_SELECTOR = "jms-selector";
private QueueImpl _queue;
private String _selector;
- private Receiver _receiver;
- private Message _nextElement;
- private MessageEnumeration _enumeration;
+ private final SessionImpl _session;
+ private Map<Symbol, Filter> _filters;
+ private HashSet<MessageEnumeration> _enumerations = new HashSet<MessageEnumeration>();
+ private boolean _closed;
QueueBrowserImpl(final QueueImpl queue, final String selector, SessionImpl session) throws JMSException
{
_queue = queue;
_selector = selector;
+ _session = session;
- Map<Symbol, Filter> filters;
if(selector == null || selector.trim().equals(""))
{
- filters = null;
+ _filters = null;
}
else
{
- filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
- }
-
-
- try
- {
- _receiver = session.getClientSession().createReceiver(queue.getAddress(),
- StdDistMode.COPY,
- AcknowledgeMode.AMO,null,
- false,
- filters, null);
- _nextElement = _receiver.receive(0L);
- _enumeration = new MessageEnumeration();
- }
- catch(AmqpErrorException e)
- {
- org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
- if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
- {
- throw new InvalidSelectorException(e.getMessage());
- }
- else
- {
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
- }
-
+ _filters = Collections.singletonMap(Symbol.valueOf(JMS_SELECTOR),(Filter) new JMSSelectorFilter(_selector));
+ // We do this just to have the server validate the filter..
+ new MessageEnumeration().close();
}
}
@@ -97,42 +74,124 @@ public class QueueBrowserImpl implements QueueBrowser
public Enumeration getEnumeration() throws JMSException
{
- if(_enumeration == null)
+ if(_closed)
{
throw new IllegalStateException("Browser has been closed");
}
- return _enumeration;
+ return new MessageEnumeration();
}
public void close() throws JMSException
{
- _receiver.close();
- _enumeration = null;
+ _closed = true;
+ for(MessageEnumeration me : new ArrayList<MessageEnumeration>(_enumerations))
+ {
+ me.close();
+ }
}
- private final class MessageEnumeration implements Enumeration<Message>
+ private final class MessageEnumeration implements Enumeration<MessageImpl>
{
+ private Receiver _receiver;
+ private MessageImpl _nextElement;
+ private boolean _needNext = true;
+
+ MessageEnumeration() throws JMSException
+ {
+ try
+ {
+ _receiver = _session.getClientSession().createReceiver(_session.toAddress(_queue),
+ StdDistMode.COPY,
+ AcknowledgeMode.AMO, null,
+ false,
+ _filters, null);
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+ catch(AmqpErrorException e)
+ {
+ org.apache.qpid.amqp_1_0.type.transport.Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+ }
+
+ }
+ _enumerations.add(this);
+
+ }
+
+ public void close()
+ {
+ _enumerations.remove(this);
+ _receiver.close();
+ _receiver = null;
+ }
@Override
public boolean hasMoreElements()
{
+ if( _receiver == null )
+ {
+ return false;
+ }
+ if( _needNext )
+ {
+ _needNext = false;
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ // Drain to verify there really are no more messages.
+ _receiver.drain();
+ _receiver.drainWait();
+ _nextElement = createJMSMessage(_receiver.receive(0L));
+ if( _nextElement == null )
+ {
+ close();
+ }
+ else
+ {
+ // there are still more messages, open up the credit window again..
+ _receiver.clearDrain();
+ }
+ }
+ }
return _nextElement != null;
}
@Override
- public Message nextElement()
+ public MessageImpl nextElement()
{
-
- Message message = _nextElement;
- if(message == null)
+ if( hasMoreElements() )
{
- message = _receiver.receive(0l);
+ MessageImpl message = _nextElement;
+ _nextElement = null;
+ _needNext = true;
+ return message;
}
- if(message != null)
+ else
{
- _nextElement = _receiver.receive(0l);
+ throw new NoSuchElementException();
}
+ }
+ }
+
+ MessageImpl createJMSMessage(final Message msg)
+ {
+ if(msg != null)
+ {
+ final MessageImpl message = _session.getMessageFactory().createMessage(_queue, msg);
+ message.setFromQueue(true);
+ message.setFromTopic(false);
return message;
}
+ else
+ {
+ return null;
+ }
}
+
}