diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 106 |
1 files changed, 52 insertions, 54 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 5fba351d8a..7721722748 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,27 +17,36 @@ */ package org.apache.qpid.client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.failover.FailoverException; -import org.apache.qpid.client.message.*; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.*; -import org.apache.qpid.filter.MessageFilter; -import org.apache.qpid.filter.JMSSelectorFilter; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInternalException; +import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.message.AMQMessageDelegateFactory; +import org.apache.qpid.client.message.AMQMessageDelegate_0_10; +import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.MessageFactoryRegistry; +import org.apache.qpid.client.message.UnprocessedMessage_0_10; +import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.JMSSelectorFilter; +import org.apache.qpid.filter.MessageFilter; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.messaging.QpidDestination.CheckMode; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.transport.Acquired; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.Option; +import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.SessionException; +import org.apache.qpid.transport.TransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This is a 0.10 message consumer. @@ -83,6 +92,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; + + if (AMQDestination.DestSyntax.ADDR == destination.getDestSyntax()) + { + AddressBasedDestination addrDest = (AddressBasedDestination)destination; + addrDest.resolveAddress((AMQSession_0_10)session); + addrDest.create((AMQSession_0_10)session,CheckMode.FOR_RECEIVER); + addrDest.azzert((AMQSession_0_10)session,CheckMode.FOR_RECEIVER); + // ideally we should be invoking addrDest.createSubscription() here; + } + if (messageSelector != null && !messageSelector.equals("")) { try @@ -93,33 +112,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM { throw new InvalidSelectorException("cannot create consumer because of selector issue"); } + if (destination instanceof AMQQueue) { _preAcquire = false; } } - // Destination setting overrides connection defaults - if (destination.getDestSyntax() == DestSyntax.ADDR && - destination.getLink().getConsumerCapacity() > 0) + try { - capacity = destination.getLink().getConsumerCapacity(); + capacity = destination.getConsumerCapacity(session); } - else if (getSession().prefetch()) + catch(Exception e) { - capacity = _0_10session.getAMQConnection().getMaxPrefetch(); - } - - if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) - { - boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; - - if (!namedQueue) - { - _destination = destination.copyDestination(); - _destination.setQueueName(null); - } - } + JMSException ex = new JMSException("Error retrieving capacity"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } } @@ -473,22 +483,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM public boolean isExclusive() { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return _exclusive; - } + return _exclusive; } void cleanupQueue() throws AMQException, FailoverException @@ -496,11 +491,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.RECEIVER ) + try + { + ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_RECEIVER); + } + catch(Exception e) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR,"Error deleting queue",e); + throw ex; } } } |