summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-01-03 13:23:04 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-01-03 13:23:04 +0000
commitbac581489b453ccb7d0586474b7d75d8b259e003 (patch)
tree37d3895df447d6486b8798b4623f6fc37e94769c
parent04fe13b3b56eb9c41fe4d95900582d68e4c7b4cc (diff)
downloadqpid-python-bac581489b453ccb7d0586474b7d75d8b259e003.tar.gz
QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@608477 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/etc/config.xml4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java25
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java48
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java96
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java8
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java135
9 files changed, 275 insertions, 80 deletions
diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml
index b5b81bbeb0..69108bb7cc 100644
--- a/java/broker/etc/config.xml
+++ b/java/broker/etc/config.xml
@@ -98,6 +98,10 @@
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
</store>
+ <housekeeping>
+ <expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+ </housekeeping>
+
<security>
<!-- Need protocol changes to allow this-->
<authentication>
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 0c52a358f7..c7cb2be265 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -37,6 +38,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.text.MessageFormat;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -153,10 +156,7 @@ public class AMQQueue implements Managable, Comparable
/** total messages received by the queue since startup. */
public AtomicLong _totalMessagesReceived = new AtomicLong();
- public int compareTo(Object o)
- {
- return _name.compareTo(((AMQQueue) o).getName());
- }
+
public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -937,4 +937,21 @@ public class AMQQueue implements Managable, Comparable
{
_deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
}
+
+ public int compareTo(Object o)
+ {
+ return _name.compareTo(((AMQQueue) o).getName());
+ }
+
+
+ public void removeExpiredIfNoSubscribers() throws AMQException
+ {
+ synchronized(_subscribers.getChangeLock())
+ {
+ if(_subscribers.isEmpty())
+ {
+ _deliveryMgr.removeExpired();
+ }
+ }
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index eabc8ebf38..bb0022ab4d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -211,6 +211,29 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
+ /**
+ * NOTE : This method should only be called when there are no active subscribers
+ */
+ public void removeExpired() throws AMQException
+ {
+ _lock.lock();
+
+
+ for(Iterator<AMQMessage> iter = _messages.iterator(); iter.hasNext();)
+ {
+ AMQMessage msg = iter.next();
+ if(msg.expired(_queue))
+ {
+ _queue.dequeue(_reapingStoreContext,msg);
+ msg.decrementReference(_reapingStoreContext);
+ iter.remove();
+ }
+ }
+
+
+ _lock.unlock();
+ }
+
/** @return the state of the async processor. */
public boolean isProcessingAsync()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 153106d919..c6bead809a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -97,4 +97,6 @@ interface DeliveryManager
long getOldestMessageArrival();
void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg);
+
+ void removeExpired() throws AMQException;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index b500247fa4..c0d862c0e5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -37,7 +37,8 @@ class SubscriptionSet implements WeightedSubscriptionManager
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
- private final Object _subscriptionsChange = new Object();
+
+ private final Object _changeLock = new Object();
/** Accessor for unit tests. */
@@ -48,7 +49,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
public void addSubscriber(Subscription subscription)
{
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
_subscriptions.add(subscription);
}
@@ -66,7 +67,7 @@ class SubscriptionSet implements WeightedSubscriptionManager
// TODO: possibly need O(1) operation here.
Subscription sub = null;
- synchronized (_subscriptionsChange)
+ synchronized (_changeLock)
{
int subIndex = _subscriptions.indexOf(subscription);
@@ -226,4 +227,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
{
return _subscriptions.size();
}
+
+
+ public Object getChangeLock()
+ {
+ return _changeLock;
+ }
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index b95772b680..5dd0d60f8f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -39,8 +39,13 @@ import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.AMQException;
+
+import java.util.Timer;
+import java.util.TimerTask;
public class VirtualHost implements Accessable
{
@@ -66,6 +71,11 @@ public class VirtualHost implements Accessable
private AccessManager _accessManager;
+ private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true);
+
+ private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
+
+
public void setAccessableName(String name)
{
_logger.warn("Setting Accessable Name for VirualHost is not allowed. ("
@@ -162,6 +172,44 @@ public class VirtualHost implements Accessable
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
+
+ initialiseHouseKeeping(hostConfig);
+
+ }
+
+ private void initialiseHouseKeeping(final Configuration hostConfig)
+ {
+
+ long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
+
+ /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */
+ if(period != 0L)
+ {
+ class RemoveExpiredMessagesTask extends TimerTask
+ {
+ public void run()
+ {
+ for(AMQQueue q : _queueRegistry.getQueues())
+ {
+
+ try
+ {
+ q.removeExpiredIfNoSubscribers();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ }
+
+ _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
+ period/2,
+ period);
+ }
}
private void initialiseMessageStore(Configuration config) throws Exception
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 8f484383ca..91df4f7d35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -39,35 +39,9 @@ import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -2107,6 +2081,72 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
+
+ /**
+ * Returns the number of messages currently queued for the given destination.
+ *
+ * <p/>Note that this operation automatically retries in the event of fail-over.
+ *
+ * @param amqd The destination to be checked
+ *
+ * @return the number of queued messages.
+ *
+ * @throws AMQException If the queue cannot be declared for any reason.
+ */
+ public long getQueueDepth(final AMQDestination amqd)
+ throws AMQException
+ {
+
+ class QueueDeclareOkHandler extends SpecificMethodFrameListener
+ {
+
+ private long _messageCount;
+ private long _consumerCount;
+
+ public QueueDeclareOkHandler()
+ {
+ super(getChannelId(), QueueDeclareOkBody.class);
+ }
+
+ public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ {
+ boolean matches = super.processMethod(channelId, frame);
+ QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
+ _messageCount = declareOk.getMessageCount();
+ _consumerCount = declareOk.getConsumerCount();
+ return matches;
+ }
+
+ }
+
+ return new FailoverNoopSupport<Long, AMQException>(
+ new FailoverProtectedOperation<Long, AMQException>()
+ {
+ public Long execute() throws AMQException, FailoverException
+ {
+
+ AMQFrame queueDeclare =
+ QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+ null, // arguments
+ amqd.isAutoDelete(), // autoDelete
+ amqd.isDurable(), // durable
+ amqd.isExclusive(), // exclusive
+ false, // nowait
+ true, // passive
+ amqd.getAMQQueueName(), // queue
+ getTicket()); // ticket
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ //getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+
+ return okHandler._messageCount;
+ }
+ }, _connection).execute();
+
+ }
+
+
+
/**
* Declares the named exchange and type of exchange.
*
diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
index 461cf9591d..633cf4fe3a 100644
--- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
+++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
@@ -215,6 +215,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize<E> extends ConcurrentLinkedQ
public void remove()
{
last.remove();
+ if(last == _mainIterator)
+ {
+ _size.decrementAndGet();
+ }
+ else
+ {
+ _messageHeadSize.decrementAndGet();
+ }
}
};
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index 06956ba52f..3afecb47d3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -25,7 +25,11 @@ import junit.framework.TestCase;
import junit.framework.Assert;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import javax.jms.JMSException;
@@ -38,6 +42,7 @@ import javax.jms.Connection;
import javax.jms.Message;
import javax.naming.spi.InitialContextFactory;
import javax.naming.Context;
+import javax.naming.NamingException;
import java.util.Hashtable;
@@ -53,21 +58,37 @@ public class TimeToLiveTest extends TestCase
private final long TIME_TO_LIVE = 1000L;
- Context _context;
-
- private Connection _clientConnection, _producerConnection;
-
- private MessageConsumer _consumer;
- MessageProducer _producer;
- Session _clientSession, _producerSession;
private static final int MSG_COUNT = 50;
+ private static final long SERVER_TTL_TIMEOUT = 60000L;
protected void setUp() throws Exception
{
- if (BROKER.startsWith("vm://"))
+ super.setUp();
+
+ if (usingInVMBroker())
{
TransportConnection.createVMBroker(1);
}
+
+
+ }
+
+ private boolean usingInVMBroker()
+ {
+ return BROKER.startsWith("vm://");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (usingInVMBroker())
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ super.tearDown();
+ }
+
+ public void testPassiveTTL() throws JMSException, NamingException
+ {
InitialContextFactory factory = new PropertiesFileInitialContextFactory();
Hashtable<String, String> env = new Hashtable<String, String>();
@@ -75,56 +96,40 @@ public class TimeToLiveTest extends TestCase
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
env.put("queue.queue", QUEUE);
- _context = factory.getInitialContext(env);
+ Context context = factory.getInitialContext(env);
- Queue queue = (Queue) _context.lookup("queue");
+ Queue queue = (Queue) context.lookup("queue");
//Create Client 1
- _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ Connection clientConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
- _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _consumer = _clientSession.createConsumer(queue);
+ MessageConsumer consumer = clientSession.createConsumer(queue);
//Create Producer
- _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+ Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection();
- _producerConnection.start();
+ producerConnection.start();
- _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _producer = _producerSession.createProducer(queue);
- }
+ MessageProducer producer = producerSession.createProducer(queue);
- protected void tearDown() throws Exception
- {
- _clientConnection.close();
-
- _producerConnection.close();
- super.tearDown();
-
- if (BROKER.startsWith("vm://"))
- {
- TransportConnection.killAllVMBrokers();
- }
- }
-
- public void test() throws JMSException
- {
//Set TTL
int msg = 0;
- _producer.send(nextMessage(String.valueOf(msg), true));
+ producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
- _producer.setTimeToLive(TIME_TO_LIVE);
+ producer.setTimeToLive(TIME_TO_LIVE);
for (; msg < MSG_COUNT - 2; msg++)
{
- _producer.send(nextMessage(String.valueOf(msg), false));
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
}
//Reset TTL
- _producer.setTimeToLive(0L);
- _producer.send(nextMessage(String.valueOf(msg), false));
+ producer.setTimeToLive(0L);
+ producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
try
{
@@ -136,31 +141,71 @@ public class TimeToLiveTest extends TestCase
}
- _clientConnection.start();
+ clientConnection.start();
//Receive Message 0
- Message received = _consumer.receive(100);
+ Message received = consumer.receive(100);
Assert.assertNotNull("First message not received", received);
Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first"));
Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
- received = _consumer.receive(100);
+ received = consumer.receive(100);
Assert.assertNotNull("Final message not received", received);
Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first"));
Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
- received = _consumer.receive(100);
+ received = consumer.receive(100);
Assert.assertNull("More messages received", received);
+
+ clientConnection.close();
+
+ producerConnection.close();
}
- private Message nextMessage(String msg, boolean first) throws JMSException
+ private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
{
- Message send = _producerSession.createTextMessage("Message " + msg);
+ Message send = producerSession.createTextMessage("Message " + msg);
send.setBooleanProperty("first", first);
- send.setLongProperty("TTL", _producer.getTimeToLive());
+ send.setLongProperty("TTL", producer.getTimeToLive());
return send;
}
+ /**
+ * Tests the expired messages get actively deleted even on queues which have no consumers
+ */
+ public void testActiveTTL() throws URLSyntaxException, AMQException, JMSException, InterruptedException
+ {
+ Connection producerConnection = new AMQConnection(BROKER,"guest","guest","activeTTLtest","test");
+ AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = producerSession.createTemporaryQueue();
+ producerSession.declareAndBind((AMQDestination) queue);
+ MessageProducer producer = producerSession.createProducer(queue);
+ producer.setTimeToLive(1000L);
+
+ // send Messages
+ for(int i = 0; i < MSG_COUNT; i++)
+ {
+ producer.send(producerSession.createTextMessage("Message: "+i));
+ }
+ long failureTime = System.currentTimeMillis() + 2*SERVER_TTL_TIMEOUT;
+
+ // check Queue depth for up to TIMEOUT seconds
+ long messageCount;
+
+ do
+ {
+ Thread.sleep(100);
+ messageCount = producerSession.getQueueDepth((AMQDestination) queue);
+ }
+ while(messageCount > 0L && System.currentTimeMillis() < failureTime);
+
+ assertEquals("Messages not automatically expired: ", 0L, messageCount);
+
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+ }
+
}