diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-16 13:36:26 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-16 13:36:26 +0000 |
commit | d9aa5a9c9a8622e12ebc897222b6c0f38676f3e4 (patch) | |
tree | 900370f27af591ba737f07b6364fe1fd5276bd48 | |
parent | 883268174c5b6922070c0e8261541efdedb93ef4 (diff) | |
download | qpid-python-d9aa5a9c9a8622e12ebc897222b6c0f38676f3e4.tar.gz |
QPID-202 : Implement TemporaryQueue.delete
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487801 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 161 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index cfc79cc70c..ed2127a78a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -48,6 +48,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; public class AMQSession extends Closeable implements Session, QueueSession, TopicSession { @@ -102,6 +103,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private Map _consumers = new ConcurrentHashMap(); /** + * Maps from destination to count of JMSMessageConsumers + */ + private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount = + new ConcurrentHashMap<Destination, AtomicInteger>(); + /** * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not * need to be attached to a queue */ @@ -127,6 +133,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); + + /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ @@ -788,20 +796,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageConsumer createConsumer(Destination destination) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + false, + false, + null, + null); } public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + false, + false, + messageSelector, + null); } public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector); + return createConsumerImpl(destination, + _defaultPrefetchHighMark, + _defaultPrefetchLowMark, + noLocal, + false, + messageSelector, + null); } public MessageConsumer createConsumer(Destination destination, @@ -811,7 +837,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null); } @@ -823,7 +849,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi String selector) throws JMSException { checkValidDestination(destination); - return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); + return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null); } public MessageConsumer createConsumer(Destination destination, @@ -892,11 +918,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi throw ex; } + synchronized(destination) + { + _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger()); + _destinationConsumerCount.get(destination).incrementAndGet(); + } + return consumer; } }.execute(_connection); } + + public boolean hasConsumer(TemporaryQueue destination) + { + AtomicInteger counter = _destinationConsumerCount.get(destination); + + return (counter != null) && (counter.get() != 0); + } + + public void declareExchange(String name, String type) { declareExchange(name, type, _connection.getProtocolHandler()); @@ -970,6 +1011,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi consumer.setConsumerTag(tag); // we must register the consumer in the map before we actually start listening _consumers.put(tag, consumer); + try { AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0, @@ -1136,7 +1178,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (isQueueBound(dest.getQueueName()) && !isQueueBound(dest.getQueueName(), topic.getTopicName())) { - deleteSubscriptionQueue(dest.getQueueName()); + deleteQueue(dest.getQueueName()); } } @@ -1146,7 +1188,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi return subscriber; } - private void deleteSubscriptionQueue(String queueName) throws JMSException + void deleteQueue(String queueName) throws JMSException { try { @@ -1198,7 +1240,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public TemporaryQueue createTemporaryQueue() throws JMSException { checkNotClosed(); - return new AMQTemporaryQueue(); + return new AMQTemporaryQueue(this); } public TemporaryTopic createTemporaryTopic() throws JMSException @@ -1214,14 +1256,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi if (subscriber != null) { // send a queue.delete for the subscription - deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); _subscriptions.remove(name); } else { if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection))) { - deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); + deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection)); } else { @@ -1230,12 +1272,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } - private boolean isQueueBound(String queueName) throws JMSException + boolean isQueueBound(String queueName) throws JMSException { return isQueueBound(queueName, null); } - private boolean isQueueBound(String queueName, String routingKey) throws JMSException + boolean isQueueBound(String queueName, String routingKey) throws JMSException { AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME, routingKey, queueName); @@ -1374,11 +1416,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi * Called by the MessageConsumer when closing, to deregister the consumer from the * map from consumerTag to consumer instance. * - * @param consumerTag the consumer tag, that was broker-generated + * @param consumer the consum */ - void deregisterConsumer(String consumerTag) + void deregisterConsumer(BasicMessageConsumer consumer) { - _consumers.remove(consumerTag); + _consumers.remove(consumer.getConsumerTag()); + Destination dest = consumer.getDestination(); + synchronized(dest) + { + if(_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } + } } private void registerProducer(long producerId, MessageProducer producer) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java index 6b41ea0112..abb76edb67 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java @@ -29,22 +29,32 @@ import javax.jms.TemporaryQueue; final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue { + private final AMQSession _session; /** * Create a new instance of an AMQTemporaryQueue */ - public AMQTemporaryQueue() + public AMQTemporaryQueue(AMQSession session) { super("TempQueue" + Long.toString(System.currentTimeMillis()), true); + _session = session; } /** * @see javax.jms.TemporaryQueue#delete() */ - public void delete() throws JMSException + public synchronized void delete() throws JMSException { - throw new UnsupportedOperationException("Delete not supported, " + - "will auto-delete when connection closed"); + if(_session.hasConsumer(this)) + { + throw new JMSException("Temporary Queue has consumers so cannot be deleted"); + } + + if(_session.isQueueBound(getQueueName())) + { + _session.deleteQueue(getQueueName()); + } + } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index d6118f5560..01146844f0 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -524,7 +524,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer */ private void deregisterConsumer() { - _session.deregisterConsumer(_consumerTag); + _session.deregisterConsumer(this); } public String getConsumerTag() diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java new file mode 100644 index 0000000000..d0ebdcc668 --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java @@ -0,0 +1,81 @@ +package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.unit.client.connection.ConnectionTest;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ protected Connection createConnection() throws AMQException, URLSyntaxException
+ {
+ return new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}
|