diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 26bb51b821..ef7b8cc217 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -28,7 +28,6 @@ import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage_0_10; -import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; @@ -82,13 +81,13 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, - AMQSession<?,?> session, AMQProtocolHandler protocolHandler, - FieldTable rawSelector, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) + AMQSession<?,?> session, FieldTable rawSelector, + int prefetchHigh, int prefetchLow, boolean exclusive, + int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { - super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, rawSelector, + prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); @@ -96,6 +95,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _capacity = evaluateCapacity(destination); + // This is due to the Destination carrying the temporary subscription name which is incorrect. if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; @@ -164,6 +164,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM @Override void sendCancel() throws AMQException { _0_10session.getQpidSession().messageCancel(getConsumerTagString()); + postSubscription(); try { _0_10session.getQpidSession().sync(); @@ -500,7 +501,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } } - void cleanupQueue() throws AMQException, FailoverException + void postSubscription() throws AMQException { AMQDestination dest = this.getDestination(); if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) @@ -508,9 +509,11 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( - this.getDestination().getQueueName()); + ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); } + // Subscription queue is handled as part of linkDelete method. + ((AMQSession_0_10) getSession()).handleLinkDelete(dest); } } @@ -560,4 +563,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -} +}
\ No newline at end of file |