summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java23
1 files changed, 13 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 26bb51b821..ef7b8cc217 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.Session;
@@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelector, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
+ AMQSession<?,?> session, FieldTable rawSelector,
+ int prefetchHigh, int prefetchLow, boolean exclusive,
+ int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
- super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+ super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector,
+ prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
@@ -96,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_capacity = evaluateCapacity(destination);
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
@@ -164,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
@Override void sendCancel() throws AMQException
{
_0_10session.getQpidSession().messageCancel(getConsumerTagString());
+ postSubscription();
try
{
_0_10session.getQpidSession().sync();
@@ -500,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
}
- void cleanupQueue() throws AMQException, FailoverException
+ void postSubscription() throws AMQException
{
AMQDestination dest = this.getDestination();
if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
@@ -508,9 +509,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
- this.getDestination().getQueueName());
+ ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
}
+ // Subscription queue is handled as part of linkDelete method.
+ ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
}
}
@@ -560,4 +563,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
-}
+} \ No newline at end of file