summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-03-03 16:34:20 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-03-03 16:34:20 +0000
commit4abd5879ee62961db700a4f125d0fcf6c355d3cd (patch)
tree2b94052cd59c5250a2ade20f184fc02623fd4dcc
parent2736513ae2302ff6e960a4e9571cc41fe11d1b6b (diff)
downloadqpid-python-4abd5879ee62961db700a4f125d0fcf6c355d3cd.tar.gz
QPID-3109
The isQueueExist method is modified to handle the exception when a queue has been deleted. Both the Message producer and consumer now delete the queue if the delete option is selected and will not check if the queue is empty or if there are any subscribers on it. Also added test cases. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1076670 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java17
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java58
6 files changed, 125 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 517a7a5ce8..b392bad879 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -56,6 +56,7 @@ import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.ExchangeBoundResult;
import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
@@ -1068,22 +1069,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode)
+ public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException
{
boolean match = true;
- QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
- match = dest.getAddressName().equals(result.getQueue());
-
- if (match && assertNode)
+ try
{
- match = (result.getDurable() == node.isDurable()) &&
- (result.getAutoDelete() == node.isAutoDelete()) &&
- (result.getExclusive() == node.isExclusive()) &&
- (matchProps(result.getArguments(),node.getDeclareArgs()));
+ QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get();
+ match = dest.getAddressName().equals(result.getQueue());
+
+ if (match && assertNode)
+ {
+ match = (result.getDurable() == node.isDurable()) &&
+ (result.getAutoDelete() == node.isAutoDelete()) &&
+ (result.getExclusive() == node.isExclusive()) &&
+ (matchProps(result.getArguments(),node.getDeclareArgs()));
+ }
+ else if (match)
+ {
+ // should I use the queried details to update the local data structure.
+ }
}
- else if (match)
+ catch(SessionException e)
{
- // should I use the queried details to update the local data structure.
+ if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED)
+ {
+ match = false;
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()),
+ "Error querying queue",e);
+ }
}
return match;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0a78403268..5d32863f2f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -571,6 +571,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (!_session.isClosed() || _session.isClosing())
{
sendCancel();
+ cleanupQueue();
}
}
catch (AMQException e)
@@ -608,6 +609,8 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
abstract void sendCancel() throws AMQException, FailoverException;
+
+ abstract void cleanupQueue() throws AMQException, FailoverException;
/**
* Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index b5f3501e5a..1c7c9a7bb5 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -19,7 +19,9 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQShortString;
@@ -509,4 +511,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return _exclusive;
}
}
+
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.RECEIVER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ this.getDestination().getQueueName());
+ }
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cdbf57769d..00acd5e866 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -88,4 +88,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
return receive();
}
+ void cleanupQueue() throws AMQException, FailoverException
+ {
+
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 53c0457120..0e3f4a5524 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -30,6 +30,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -239,5 +240,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
return _session.isQueueBound(destination);
}
+
+ @Override
+ public void close()
+ {
+ super.close();
+ AMQDestination dest = _destination;
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AddressOption.ALWAYS ||
+ dest.getDelete() == AddressOption.SENDER )
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
+ _destination.getQueueName());
+ }
+ }
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index dd86ffc4da..f24e45ec93 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -844,4 +844,62 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
assertNotNull(msg);
assertEquals("A",((TextMessage)msg).getText());
}
+
+ public void testDeleteOptions() throws Exception
+ {
+ Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons;
+
+ // default (create never, assert never) -------------------
+ // create never --------------------------------------------
+ String addr1 = "ADDR:testQueue1;{create: always, delete: always}";
+ AMQDestination dest = new AMQAnyDestination(addr1);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
+ dest = new AMQAnyDestination(addr2);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ cons.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
+ dest = new AMQAnyDestination(addr3);
+ try
+ {
+ cons = jmsSession.createConsumer(dest);
+ MessageProducer prod = jmsSession.createProducer(dest);
+ prod.close();
+ }
+ catch(JMSException e)
+ {
+ fail("Exception should not be thrown. Exception thrown is : " + e);
+ }
+
+ assertFalse("Queue not deleted as expected",(
+ (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true));
+
+
+ }
}