From bac581489b453ccb7d0586474b7d75d8b259e003 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 3 Jan 2008 13:23:04 +0000 Subject: QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@608477 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/etc/config.xml | 4 + .../org/apache/qpid/server/queue/AMQQueue.java | 25 +++- .../queue/ConcurrentSelectorDeliveryManager.java | 23 ++++ .../apache/qpid/server/queue/DeliveryManager.java | 2 + .../apache/qpid/server/queue/SubscriptionSet.java | 14 ++- .../qpid/server/virtualhost/VirtualHost.java | 48 ++++++++ .../java/org/apache/qpid/client/AMQSession.java | 96 ++++++++++----- .../ConcurrentLinkedMessageQueueAtomicSize.java | 8 ++ .../apache/qpid/server/queue/TimeToLiveTest.java | 135 ++++++++++++++------- 9 files changed, 275 insertions(+), 80 deletions(-) diff --git a/java/broker/etc/config.xml b/java/broker/etc/config.xml index b5b81bbeb0..69108bb7cc 100644 --- a/java/broker/etc/config.xml +++ b/java/broker/etc/config.xml @@ -98,6 +98,10 @@ org.apache.qpid.server.store.MemoryMessageStore + + 20000 + + diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0c52a358f7..c7cb2be265 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -37,6 +38,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -153,10 +156,7 @@ public class AMQQueue implements Managable, Comparable /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); - public int compareTo(Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -937,4 +937,21 @@ public class AMQQueue implements Managable, Comparable { _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg); } + + public int compareTo(Object o) + { + return _name.compareTo(((AMQQueue) o).getName()); + } + + + public void removeExpiredIfNoSubscribers() throws AMQException + { + synchronized(_subscribers.getChangeLock()) + { + if(_subscribers.isEmpty()) + { + _deliveryMgr.removeExpired(); + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index eabc8ebf38..bb0022ab4d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -211,6 +211,29 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * NOTE : This method should only be called when there are no active subscribers + */ + public void removeExpired() throws AMQException + { + _lock.lock(); + + + for(Iterator iter = _messages.iterator(); iter.hasNext();) + { + AMQMessage msg = iter.next(); + if(msg.expired(_queue)) + { + _queue.dequeue(_reapingStoreContext,msg); + msg.decrementReference(_reapingStoreContext); + iter.remove(); + } + } + + + _lock.unlock(); + } + /** @return the state of the async processor. */ public boolean isProcessingAsync() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 153106d919..c6bead809a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -97,4 +97,6 @@ interface DeliveryManager long getOldestMessageArrival(); void subscriberHasPendingResend(boolean hasContent, Subscription subscription, AMQMessage msg); + + void removeExpired() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b500247fa4..c0d862c0e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -37,7 +37,8 @@ class SubscriptionSet implements WeightedSubscriptionManager /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _subscriptionsChange = new Object(); + + private final Object _changeLock = new Object(); /** Accessor for unit tests. */ @@ -48,7 +49,7 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - synchronized (_subscriptionsChange) + synchronized (_changeLock) { _subscriptions.add(subscription); } @@ -66,7 +67,7 @@ class SubscriptionSet implements WeightedSubscriptionManager // TODO: possibly need O(1) operation here. Subscription sub = null; - synchronized (_subscriptionsChange) + synchronized (_changeLock) { int subIndex = _subscriptions.indexOf(subscription); @@ -226,4 +227,11 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _subscriptions.size(); } + + + public Object getChangeLock() + { + return _changeLock; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index b95772b680..5dd0d60f8f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -39,8 +39,13 @@ import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.AMQException; + +import java.util.Timer; +import java.util.TimerTask; public class VirtualHost implements Accessable { @@ -66,6 +71,11 @@ public class VirtualHost implements Accessable private AccessManager _accessManager; + private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true); + + private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; + + public void setAccessableName(String name) { _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" @@ -162,6 +172,44 @@ public class VirtualHost implements Accessable _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); + + initialiseHouseKeeping(hostConfig); + + } + + private void initialiseHouseKeeping(final Configuration hostConfig) + { + + long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); + + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ + if(period != 0L) + { + class RemoveExpiredMessagesTask extends TimerTask + { + public void run() + { + for(AMQQueue q : _queueRegistry.getQueues()) + { + + try + { + q.removeExpiredIfNoSubscribers(); + } + catch (AMQException e) + { + _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e); + throw new RuntimeException(e); + } + } + } + + } + + _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), + period/2, + period); + } } private void initialiseMessageStore(Configuration config) throws Exception diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8f484383ca..91df4f7d35 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -39,35 +39,9 @@ import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.util.FlowControllingBlockingQueue; +import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.BasicAckBody; -import org.apache.qpid.framing.BasicConsumeBody; -import org.apache.qpid.framing.BasicConsumeOkBody; -import org.apache.qpid.framing.BasicRecoverBody; -import org.apache.qpid.framing.BasicRecoverOkBody; -import org.apache.qpid.framing.BasicRejectBody; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ChannelFlowBody; -import org.apache.qpid.framing.ChannelFlowOkBody; -import org.apache.qpid.framing.ExchangeBoundBody; -import org.apache.qpid.framing.ExchangeBoundOkBody; -import org.apache.qpid.framing.ExchangeDeclareBody; -import org.apache.qpid.framing.ExchangeDeclareOkBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.QueueBindBody; -import org.apache.qpid.framing.QueueBindOkBody; -import org.apache.qpid.framing.QueueDeclareBody; -import org.apache.qpid.framing.QueueDeclareOkBody; -import org.apache.qpid.framing.QueueDeleteBody; -import org.apache.qpid.framing.QueueDeleteOkBody; -import org.apache.qpid.framing.TxCommitBody; -import org.apache.qpid.framing.TxCommitOkBody; -import org.apache.qpid.framing.TxRollbackBody; -import org.apache.qpid.framing.TxRollbackOkBody; +import org.apache.qpid.framing.*; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -2107,6 +2081,72 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait); } + + /** + * Returns the number of messages currently queued for the given destination. + * + *

Note that this operation automatically retries in the event of fail-over. + * + * @param amqd The destination to be checked + * + * @return the number of queued messages. + * + * @throws AMQException If the queue cannot be declared for any reason. + */ + public long getQueueDepth(final AMQDestination amqd) + throws AMQException + { + + class QueueDeclareOkHandler extends SpecificMethodFrameListener + { + + private long _messageCount; + private long _consumerCount; + + public QueueDeclareOkHandler() + { + super(getChannelId(), QueueDeclareOkBody.class); + } + + public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException + { + boolean matches = super.processMethod(channelId, frame); + QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame; + _messageCount = declareOk.getMessageCount(); + _consumerCount = declareOk.getConsumerCount(); + return matches; + } + + } + + return new FailoverNoopSupport( + new FailoverProtectedOperation() + { + public Long execute() throws AMQException, FailoverException + { + + AMQFrame queueDeclare = + QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), + null, // arguments + amqd.isAutoDelete(), // autoDelete + amqd.isDurable(), // durable + amqd.isExclusive(), // exclusive + false, // nowait + true, // passive + amqd.getAMQQueueName(), // queue + getTicket()); // ticket + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + //getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + + return okHandler._messageCount; + } + }, _connection).execute(); + + } + + + /** * Declares the named exchange and type of exchange. * diff --git a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java index 461cf9591d..633cf4fe3a 100644 --- a/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java +++ b/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java @@ -215,6 +215,14 @@ public class ConcurrentLinkedMessageQueueAtomicSize extends ConcurrentLinkedQ public void remove() { last.remove(); + if(last == _mainIterator) + { + _size.decrementAndGet(); + } + else + { + _messageHeadSize.decrementAndGet(); + } } }; } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java index 06956ba52f..3afecb47d3 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -25,7 +25,11 @@ import junit.framework.TestCase; import junit.framework.Assert; import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.qpid.url.URLSyntaxException; +import org.apache.qpid.AMQException; import org.apache.log4j.Logger; import javax.jms.JMSException; @@ -38,6 +42,7 @@ import javax.jms.Connection; import javax.jms.Message; import javax.naming.spi.InitialContextFactory; import javax.naming.Context; +import javax.naming.NamingException; import java.util.Hashtable; @@ -53,21 +58,37 @@ public class TimeToLiveTest extends TestCase private final long TIME_TO_LIVE = 1000L; - Context _context; - - private Connection _clientConnection, _producerConnection; - - private MessageConsumer _consumer; - MessageProducer _producer; - Session _clientSession, _producerSession; private static final int MSG_COUNT = 50; + private static final long SERVER_TTL_TIMEOUT = 60000L; protected void setUp() throws Exception { - if (BROKER.startsWith("vm://")) + super.setUp(); + + if (usingInVMBroker()) { TransportConnection.createVMBroker(1); } + + + } + + private boolean usingInVMBroker() + { + return BROKER.startsWith("vm://"); + } + + protected void tearDown() throws Exception + { + if (usingInVMBroker()) + { + TransportConnection.killAllVMBrokers(); + } + super.tearDown(); + } + + public void testPassiveTTL() throws JMSException, NamingException + { InitialContextFactory factory = new PropertiesFileInitialContextFactory(); Hashtable env = new Hashtable(); @@ -75,56 +96,40 @@ public class TimeToLiveTest extends TestCase env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); env.put("queue.queue", QUEUE); - _context = factory.getInitialContext(env); + Context context = factory.getInitialContext(env); - Queue queue = (Queue) _context.lookup("queue"); + Queue queue = (Queue) context.lookup("queue"); //Create Client 1 - _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Connection clientConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _consumer = _clientSession.createConsumer(queue); + MessageConsumer consumer = clientSession.createConsumer(queue); //Create Producer - _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + Connection producerConnection = ((ConnectionFactory) context.lookup("connection")).createConnection(); - _producerConnection.start(); + producerConnection.start(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - _producer = _producerSession.createProducer(queue); - } + MessageProducer producer = producerSession.createProducer(queue); - protected void tearDown() throws Exception - { - _clientConnection.close(); - - _producerConnection.close(); - super.tearDown(); - - if (BROKER.startsWith("vm://")) - { - TransportConnection.killAllVMBrokers(); - } - } - - public void test() throws JMSException - { //Set TTL int msg = 0; - _producer.send(nextMessage(String.valueOf(msg), true)); + producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer)); - _producer.setTimeToLive(TIME_TO_LIVE); + producer.setTimeToLive(TIME_TO_LIVE); for (; msg < MSG_COUNT - 2; msg++) { - _producer.send(nextMessage(String.valueOf(msg), false)); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); } //Reset TTL - _producer.setTimeToLive(0L); - _producer.send(nextMessage(String.valueOf(msg), false)); + producer.setTimeToLive(0L); + producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer)); try { @@ -136,31 +141,71 @@ public class TimeToLiveTest extends TestCase } - _clientConnection.start(); + clientConnection.start(); //Receive Message 0 - Message received = _consumer.receive(100); + Message received = consumer.receive(100); Assert.assertNotNull("First message not received", received); Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first")); Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL")); - received = _consumer.receive(100); + received = consumer.receive(100); Assert.assertNotNull("Final message not received", received); Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first")); Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL")); - received = _consumer.receive(100); + received = consumer.receive(100); Assert.assertNull("More messages received", received); + + clientConnection.close(); + + producerConnection.close(); } - private Message nextMessage(String msg, boolean first) throws JMSException + private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException { - Message send = _producerSession.createTextMessage("Message " + msg); + Message send = producerSession.createTextMessage("Message " + msg); send.setBooleanProperty("first", first); - send.setLongProperty("TTL", _producer.getTimeToLive()); + send.setLongProperty("TTL", producer.getTimeToLive()); return send; } + /** + * Tests the expired messages get actively deleted even on queues which have no consumers + */ + public void testActiveTTL() throws URLSyntaxException, AMQException, JMSException, InterruptedException + { + Connection producerConnection = new AMQConnection(BROKER,"guest","guest","activeTTLtest","test"); + AMQSession producerSession = (AMQSession) producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = producerSession.createTemporaryQueue(); + producerSession.declareAndBind((AMQDestination) queue); + MessageProducer producer = producerSession.createProducer(queue); + producer.setTimeToLive(1000L); + + // send Messages + for(int i = 0; i < MSG_COUNT; i++) + { + producer.send(producerSession.createTextMessage("Message: "+i)); + } + long failureTime = System.currentTimeMillis() + 2*SERVER_TTL_TIMEOUT; + + // check Queue depth for up to TIMEOUT seconds + long messageCount; + + do + { + Thread.sleep(100); + messageCount = producerSession.getQueueDepth((AMQDestination) queue); + } + while(messageCount > 0L && System.currentTimeMillis() < failureTime); + + assertEquals("Messages not automatically expired: ", 0L, messageCount); + + producer.close(); + producerSession.close(); + producerConnection.close(); + } + } -- cgit v1.2.1