summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java16
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java17
5 files changed, 67 insertions, 11 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 517a7a5ce8..b392bad879 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -56,6 +56,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -1068,22 +1069,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
+ try
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
+ }
}
- else if (match)
+ catch(SessionException e)
{
- // should I use the queried details to update the local data structure.
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ {
+ match = false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+ "Error querying queue",e);
+ }
}
return match;
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0a78403268..5d32863f2f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -571,6 +571,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (!_session.isClosed() || _session.isClosing())
{
sendCancel();
+ cleanupQueue();
}
}
catch (AMQException e)
@@ -608,6 +609,8 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
abstract void sendCancel() throws AMQException, FailoverException;
+
+ abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index b5f3501e5a..1c7c9a7bb5 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,7 +19,9 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
@@ -509,4 +511,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _exclusive;
}
}
+
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.RECEIVER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ this.getDestination().getQueueName());
+ }
+ }
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cdbf57769d..00acd5e866 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -88,4 +88,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return receive();
}
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 53c0457120..0e3f4a5524 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -30,6 +30,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -239,5 +240,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
return _session.isQueueBound(destination);
}
+
+ @Override
+ public void close()
+ {
+ super.close();
+ AMQDestination dest = _destination;
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ _destination.getQueueName());
+ }
+ }
+ }
}