summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java106
1 files changed, 52 insertions, 54 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 5fba351d8a..7721722748 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -17,27 +17,36 @@
*/
package org.apache.qpid.client;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.messaging.QpidDestination.CheckMode;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a 0.10 message consumer.
@@ -83,6 +92,16 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
+
+ if (AMQDestination.DestSyntax.ADDR == destination.getDestSyntax())
+ {
+ AddressBasedDestination addrDest = (AddressBasedDestination)destination;
+ addrDest.resolveAddress((AMQSession_0_10)session);
+ addrDest.create((AMQSession_0_10)session,CheckMode.FOR_RECEIVER);
+ addrDest.azzert((AMQSession_0_10)session,CheckMode.FOR_RECEIVER);
+ // ideally we should be invoking addrDest.createSubscription() here;
+ }
+
if (messageSelector != null && !messageSelector.equals(""))
{
try
@@ -93,33 +112,24 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
throw new InvalidSelectorException("cannot create consumer because of selector issue");
}
+
if (destination instanceof AMQQueue)
{
_preAcquire = false;
}
}
- // Destination setting overrides connection defaults
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
+ try
{
- capacity = destination.getLink().getConsumerCapacity();
+ capacity = destination.getConsumerCapacity(session);
}
- else if (getSession().prefetch())
+ catch(Exception e)
{
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
- }
-
- if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
- {
- boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
-
- if (!namedQueue)
- {
- _destination = destination.copyDestination();
- _destination.setQueueName(null);
- }
- }
+ JMSException ex = new JMSException("Error retrieving capacity");
+ ex.initCause(e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
@@ -473,22 +483,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
public boolean isExclusive()
{
- AMQDestination dest = this.getDestination();
- if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
- {
- return true;
- }
- else
- {
- return dest.getLink().getSubscription().isExclusive();
- }
- }
- else
- {
- return _exclusive;
- }
+ return _exclusive;
}
void cleanupQueue() throws AMQException, FailoverException
@@ -496,11 +491,14 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.RECEIVER )
+ try
+ {
+ ((AddressBasedDestination)dest).delete((AMQSession_0_10)getSession(),CheckMode.FOR_RECEIVER);
+ }
+ catch(Exception e)
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
+ AMQException ex = new AMQException(AMQConstant.INTERNAL_ERROR,"Error deleting queue",e);
+ throw ex;
}
}
}