diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-03-03 16:34:20 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-03-03 16:34:20 +0000 |
commit | 4abd5879ee62961db700a4f125d0fcf6c355d3cd (patch) | |
tree | 2b94052cd59c5250a2ade20f184fc02623fd4dcc | |
parent | 2736513ae2302ff6e960a4e9571cc41fe11d1b6b (diff) | |
download | qpid-python-4abd5879ee62961db700a4f125d0fcf6c355d3cd.tar.gz |
QPID-3109
The isQueueExist method is modified to handle the exception when a queue has been deleted.
Both the Message producer and consumer now delete the queue if the delete option is selected and will not check if the queue is empty or if there are any subscribers on it.
Also added test cases.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1076670 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 125 insertions, 11 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 517a7a5ce8..b392bad879 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -56,6 +56,7 @@ import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.transport.ExchangeBoundResult; import org.apache.qpid.transport.ExchangeQueryResult; +import org.apache.qpid.transport.ExecutionErrorCode; import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; @@ -1068,22 +1069,37 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) + public boolean isQueueExist(AMQDestination dest,QueueNode node,boolean assertNode) throws AMQException { boolean match = true; - QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); - match = dest.getAddressName().equals(result.getQueue()); - - if (match && assertNode) + try { - match = (result.getDurable() == node.isDurable()) && - (result.getAutoDelete() == node.isAutoDelete()) && - (result.getExclusive() == node.isExclusive()) && - (matchProps(result.getArguments(),node.getDeclareArgs())); + QueueQueryResult result = getQpidSession().queueQuery(dest.getAddressName(), Option.NONE).get(); + match = dest.getAddressName().equals(result.getQueue()); + + if (match && assertNode) + { + match = (result.getDurable() == node.isDurable()) && + (result.getAutoDelete() == node.isAutoDelete()) && + (result.getExclusive() == node.isExclusive()) && + (matchProps(result.getArguments(),node.getDeclareArgs())); + } + else if (match) + { + // should I use the queried details to update the local data structure. + } } - else if (match) + catch(SessionException e) { - // should I use the queried details to update the local data structure. + if (e.getException().getErrorCode() == ExecutionErrorCode.RESOURCE_DELETED) + { + match = false; + } + else + { + throw new AMQException(AMQConstant.getConstant(e.getException().getErrorCode().getValue()), + "Error querying queue",e); + } } return match; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 0a78403268..5d32863f2f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -571,6 +571,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (!_session.isClosed() || _session.isClosing()) { sendCancel(); + cleanupQueue(); } } catch (AMQException e) @@ -608,6 +609,8 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } abstract void sendCancel() throws AMQException, FailoverException; + + abstract void cleanupQueue() throws AMQException, FailoverException; /** * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index b5f3501e5a..1c7c9a7bb5 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -19,7 +19,9 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; +import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.AMQShortString; @@ -509,4 +511,18 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return _exclusive; } } + + void cleanupQueue() throws AMQException, FailoverException + { + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.RECEIVER ) + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + this.getDestination().getQueueName()); + } + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cdbf57769d..00acd5e866 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -88,4 +88,8 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe return receive(); } + void cleanupQueue() throws AMQException, FailoverException + { + + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 53c0457120..0e3f4a5524 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -30,6 +30,7 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -239,5 +240,21 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { return _session.isQueueBound(destination); } + + @Override + public void close() + { + super.close(); + AMQDestination dest = _destination; + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AddressOption.ALWAYS || + dest.getDelete() == AddressOption.SENDER ) + { + ((AMQSession_0_10) getSession()).getQpidSession().queueDelete( + _destination.getQueueName()); + } + } + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index dd86ffc4da..f24e45ec93 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -844,4 +844,62 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase assertNotNull(msg); assertEquals("A",((TextMessage)msg).getText()); } + + public void testDeleteOptions() throws Exception + { + Session jmsSession = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons; + + // default (create never, assert never) ------------------- + // create never -------------------------------------------- + String addr1 = "ADDR:testQueue1;{create: always, delete: always}"; + AMQDestination dest = new AMQAnyDestination(addr1); + try + { + cons = jmsSession.createConsumer(dest); + cons.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; + dest = new AMQAnyDestination(addr2); + try + { + cons = jmsSession.createConsumer(dest); + cons.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; + dest = new AMQAnyDestination(addr3); + try + { + cons = jmsSession.createConsumer(dest); + MessageProducer prod = jmsSession.createProducer(dest); + prod.close(); + } + catch(JMSException e) + { + fail("Exception should not be thrown. Exception thrown is : " + e); + } + + assertFalse("Queue not deleted as expected",( + (AMQSession_0_10)jmsSession).isQueueExist(dest,(QueueNode)dest.getSourceNode(), true)); + + + } } |