summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java17
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java58
6 files changed, 125 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 517a7a5ce8..b392bad879 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -56,6 +56,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -1068,22 +1069,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
+ try
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
+ }
}
- else if (match)
+ catch(SessionException e)
{
- // should I use the queried details to update the local data structure.
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ {
+ match = false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+ "Error querying queue",e);
+ }
}
return match;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0a78403268..5d32863f2f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -571,6 +571,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (!_session.isClosed() || _session.isClosing())
{
sendCancel();
+ cleanupQueue();
}
}
catch (AMQException e)
@@ -608,6 +609,8 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
abstract void sendCancel() throws AMQException, FailoverException;
+
+ abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index b5f3501e5a..1c7c9a7bb5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,7 +19,9 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
@@ -509,4 +511,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _exclusive;
}
}
+
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.RECEIVER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ this.getDestination().getQueueName());
+ }
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cdbf57769d..00acd5e866 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -88,4 +88,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return receive();
}
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 53c0457120..0e3f4a5524 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -30,6 +30,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -239,5 +240,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
return _session.isQueueBound(destination);
}
+
+ @Override
+ public void close()
+ {
+ super.close();
+ AMQDestination dest = _destination;
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ _destination.getQueueName());
+ }
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index dd86ffc4da..f24e45ec93 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -844,4 +844,62 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull(msg);
assertEquals("A",((TextMessage)msg).getText());
}
+
+ public void testDeleteOptions() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons;
+
+ // default (create never, assert never) -------------------
+ // create never --------------------------------------------
+ String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
+ AMQDestination dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
+ dest = new AMQAnyDestination(addr2);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
+ dest = new AMQAnyDestination(addr3);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+ prod.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ }
}