summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-16 13:36:26 +0000
committerRobert Greig <rgreig@apache.org>2006-12-16 13:36:26 +0000
commitd9aa5a9c9a8622e12ebc897222b6c0f38676f3e4 (patch)
tree900370f27af591ba737f07b6364fe1fd5276bd48
parent883268174c5b6922070c0e8261541efdedb93ef4 (diff)
downloadqpid-python-d9aa5a9c9a8622e12ebc897222b6c0f38676f3e4.tar.gz
QPID-202 : Implement TemporaryQueue.delete
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@487801 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java80
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java81
4 files changed, 161 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index cfc79cc70c..ed2127a78a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -48,6 +48,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
{
@@ -102,6 +103,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private Map _consumers = new ConcurrentHashMap();
/**
+ * Maps from destination to count of JMSMessageConsumers
+ */
+ private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
+ new ConcurrentHashMap<Destination, AtomicInteger>();
+ /**
* Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
* need to be attached to a queue
*/
@@ -127,6 +133,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -788,20 +796,38 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, null);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ null,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, false, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ false,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, _defaultPrefetchHighMark, _defaultPrefetchLowMark, noLocal, false, messageSelector);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -811,7 +837,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
}
@@ -823,7 +849,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumer(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
}
public MessageConsumer createConsumer(Destination destination,
@@ -892,11 +918,26 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
throw ex;
}
+ synchronized(destination)
+ {
+ _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.get(destination).incrementAndGet();
+ }
+
return consumer;
}
}.execute(_connection);
}
+
+ public boolean hasConsumer(TemporaryQueue destination)
+ {
+ AtomicInteger counter = _destinationConsumerCount.get(destination);
+
+ return (counter != null) && (counter.get() != 0);
+ }
+
+
public void declareExchange(String name, String type)
{
declareExchange(name, type, _connection.getProtocolHandler());
@@ -970,6 +1011,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
+
try
{
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
@@ -1136,7 +1178,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (isQueueBound(dest.getQueueName()) &&
!isQueueBound(dest.getQueueName(), topic.getTopicName()))
{
- deleteSubscriptionQueue(dest.getQueueName());
+ deleteQueue(dest.getQueueName());
}
}
@@ -1146,7 +1188,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
return subscriber;
}
- private void deleteSubscriptionQueue(String queueName) throws JMSException
+ void deleteQueue(String queueName) throws JMSException
{
try
{
@@ -1198,7 +1240,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public TemporaryQueue createTemporaryQueue() throws JMSException
{
checkNotClosed();
- return new AMQTemporaryQueue();
+ return new AMQTemporaryQueue(this);
}
public TemporaryTopic createTemporaryTopic() throws JMSException
@@ -1214,14 +1256,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
if (subscriber != null)
{
// send a queue.delete for the subscription
- deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
_subscriptions.remove(name);
}
else
{
if (isQueueBound(AMQTopic.getDurableTopicQueueName(name, _connection)))
{
- deleteSubscriptionQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+ deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
}
else
{
@@ -1230,12 +1272,12 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
}
}
- private boolean isQueueBound(String queueName) throws JMSException
+ boolean isQueueBound(String queueName) throws JMSException
{
return isQueueBound(queueName, null);
}
- private boolean isQueueBound(String queueName, String routingKey) throws JMSException
+ boolean isQueueBound(String queueName, String routingKey) throws JMSException
{
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
routingKey, queueName);
@@ -1374,11 +1416,19 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
* Called by the MessageConsumer when closing, to deregister the consumer from the
* map from consumerTag to consumer instance.
*
- * @param consumerTag the consumer tag, that was broker-generated
+ * @param consumer the consum
*/
- void deregisterConsumer(String consumerTag)
+ void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumerTag);
+ _consumers.remove(consumer.getConsumerTag());
+ Destination dest = consumer.getDestination();
+ synchronized(dest)
+ {
+ if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
+ }
}
private void registerProducer(long producerId, MessageProducer producer)
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
index 6b41ea0112..abb76edb67 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
@@ -29,22 +29,32 @@ import javax.jms.TemporaryQueue;
final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
{
+ private final AMQSession _session;
/**
* Create a new instance of an AMQTemporaryQueue
*/
- public AMQTemporaryQueue()
+ public AMQTemporaryQueue(AMQSession session)
{
super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+ _session = session;
}
/**
* @see javax.jms.TemporaryQueue#delete()
*/
- public void delete() throws JMSException
+ public synchronized void delete() throws JMSException
{
- throw new UnsupportedOperationException("Delete not supported, " +
- "will auto-delete when connection closed");
+ if(_session.hasConsumer(this))
+ {
+ throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+ }
+
+ if(_session.isQueueBound(getQueueName()))
+ {
+ _session.deleteQueue(getQueueName());
+ }
+
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index d6118f5560..01146844f0 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -524,7 +524,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
*/
private void deregisterConsumer()
{
- _session.deregisterConsumer(_consumerTag);
+ _session.deregisterConsumer(this);
}
public String getConsumerTag()
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
new file mode 100644
index 0000000000..d0ebdcc668
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
@@ -0,0 +1,81 @@
+package org.apache.qpid.test.unit.client.temporaryqueue;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.test.unit.client.connection.ConnectionTest;
+
+import javax.jms.*;
+
+public class TemporaryQueueTest extends TestCase
+{
+
+ String _broker = "vm://:1";
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+
+ protected Connection createConnection() throws AMQException, URLSyntaxException
+ {
+ return new AMQConnection(_broker, "guest", "guest",
+ "fred", "/test");
+ }
+
+ public void testTempoaryQueue() throws Exception
+ {
+ Connection conn = createConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue queue = session.createTemporaryQueue();
+ assertNotNull(queue);
+ MessageProducer producer = session.createProducer(queue);
+ MessageConsumer consumer = session.createConsumer(queue);
+ conn.start();
+ producer.send(session.createTextMessage("hello"));
+ TextMessage tm = (TextMessage) consumer.receive(2000);
+ assertNotNull(tm);
+ assertEquals("hello",tm.getText());
+
+ try
+ {
+ queue.delete();
+ fail("Expected JMSException : should not be able to delete while there are active consumers");
+ }
+ catch(JMSException je)
+ {
+ ; //pass
+ }
+
+ consumer.close();
+
+ try
+ {
+ queue.delete();
+ }
+ catch(JMSException je)
+ {
+ fail("Unexpected Exception: " + je.getMessage());
+ }
+
+ conn.close();
+ }
+
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TemporaryQueueTest.class);
+ }
+}