diff options
8 files changed, 238 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 33c87d90e3..9f803558d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -216,6 +216,8 @@ public class AMQChannel _log.trace(debugIdentity() + "Content header received on channel " + _channelId); } _currentMessage.setContentHeaderBody(contentHeaderBody); + _currentMessage.setExpiration(); + routeCurrentMessage(); _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 90cf845f10..87c7db46e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.TransactionalContext; +import org.apache.qpid.server.registry.ApplicationRegistry; /** Combines the information that make up a deliverable message into a more manageable form. */ @@ -93,12 +95,42 @@ public class AMQMessage private final int hashcode = System.identityHashCode(this); + private long _expiration; public String debugIdentity() { return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } + public void setExpiration() + { + long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration(); + long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp(); + + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false)) + { + _expiration = expiration; + } + else + { + // Update TTL to be in broker time. + if (expiration != 0L) + { + if (timestamp != 0L) + { + //todo perhaps use arrival time + long diff = (System.currentTimeMillis() - timestamp); + + if (diff > 1000L || diff < 1000L) + { + _expiration = expiration + diff; + } + } + } + } + + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -205,8 +237,6 @@ public class AMQMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); -// _taken = new AtomicBoolean(false); - } /** @@ -617,6 +647,33 @@ public class AMQMessage return _messageHandle.getArrivalTime(); } + /** + * Checks to see if the message has expired. If it has the message is dequeued. + * + * @param storecontext + * @param queue + * + * @return true if the message has expire + * + * @throws AMQException + */ + public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException + { + //note: If the storecontext isn't need then we can remove the getChannel() from Subscription. + + if (_expiration != 0L) + { + long now = System.currentTimeMillis(); + + if (now > _expiration) + { + dequeue(storecontext, queue); + return true; + } + } + + return false; + } /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */ public void setDeliveredToConsumer() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 1f92cee1df..bdc2189676 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -434,13 +434,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } - /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */ + /** + * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. + * + * @return the next message or null + * + * @throws org.apache.qpid.AMQException + */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); } - private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) + private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException { AMQMessage message = messages.peek(); @@ -449,9 +455,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager && ( ((sub != null && !sub.isBrowser()) || message.isTaken(_queue)) || sub == null) - && message.taken(_queue, sub)) + && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired + || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired + ) { - //remove the already taken message + //remove the already taken message or expired AMQMessage removed = messages.poll(); assert removed == message; @@ -466,6 +474,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // try the next message message = messages.peek(); } + return message; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index e6d5d0c88d..77688f19be 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.AMQChannel; public interface Subscription { @@ -57,4 +58,6 @@ public interface Subscription void addToResendQueue(AMQMessage msg); Object getSendLock(); + + AMQChannel getChannel(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index c496996002..a7be9f2ad2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -668,5 +668,9 @@ public class SubscriptionImpl implements Subscription return _sendLock; } + public AMQChannel getChannel() + { + return channel; + } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 42412bebae..e396432cea 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.cluster.MemberHandle; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleSendable; +import org.apache.qpid.server.AMQChannel; import org.apache.qpid.AMQException; import java.util.Queue; @@ -167,4 +168,9 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return new Object(); } + public AMQChannel getChannel() + { + return null; + } + } diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 1a0a341bbf..fe947ef3bc 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.AMQChannel; + import java.util.ArrayList; import java.util.List; import java.util.Queue; @@ -82,6 +84,10 @@ public class SubscriptionTestHelper implements Subscription return new Object(); } + public AMQChannel getChannel() + { + return null; + } public void queueDeleted(AMQQueue queue) { diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java new file mode 100644 index 0000000000..d8163759a6 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java @@ -0,0 +1,145 @@ +package org.apache.qpid.server.queue; + +import junit.framework.TestCase; +import junit.framework.Assert; +import org.apache.qpid.client.transport.TransportConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; +import org.apache.log4j.Logger; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ConnectionFactory; +import javax.jms.Connection; +import javax.jms.Message; +import javax.naming.spi.InitialContextFactory; +import javax.naming.Context; +import java.util.Hashtable; + + +/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */ +public class TimeToLiveTest extends TestCase +{ + private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class); + + + protected final String BROKER = "vm://:1"; + protected final String VHOST = "/test"; + protected final String QUEUE = "TimeToLiveQueue"; + + private final long TIME_TO_LIVE = 1000L; + + Context _context; + + private Connection _clientConnection, _producerConnection; + + private MessageConsumer _consumer; + MessageProducer _producer; + Session _clientSession, _producerSession; + private static final int MSG_COUNT = 50; + + protected void setUp() throws Exception + { + if (BROKER.startsWith("vm://")) + { + TransportConnection.createVMBroker(1); + } + InitialContextFactory factory = new PropertiesFileInitialContextFactory(); + + Hashtable<String, String> env = new Hashtable<String, String>(); + + env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'"); + env.put("queue.queue", QUEUE); + + _context = factory.getInitialContext(env); + + Queue queue = (Queue) _context.lookup("queue"); + + //Create Client 1 + _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _consumer = _clientSession.createConsumer(queue); + + //Create Producer + _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection(); + + _producerConnection.start(); + + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producer = _producerSession.createProducer(queue); + } + + protected void tearDown() throws Exception + { + _clientConnection.close(); + + _producerConnection.close(); + super.tearDown(); + + if (BROKER.startsWith("vm://")) + { + TransportConnection.killAllVMBrokers(); + } + } + + public void test() throws JMSException + { + //Set TTL + int msg = 0; + _producer.send(nextMessage(String.valueOf(msg), true)); + + _producer.setTimeToLive(TIME_TO_LIVE); + + for (; msg < MSG_COUNT - 2; msg++) + { + _producer.send(nextMessage(String.valueOf(msg), false)); + } + + //Reset TTL + _producer.setTimeToLive(0L); + _producer.send(nextMessage(String.valueOf(msg), false)); + + try + { + // Sleep to ensure TTL reached + Thread.sleep(2000); + } + catch (InterruptedException e) + { + + } + + _clientConnection.start(); + + //Receive Message 0 + Message received = _consumer.receive(100); + Assert.assertNotNull("First message not received", received); + Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first")); + Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL")); + + + received = _consumer.receive(100); + Assert.assertNotNull("Final message not received", received); + Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first")); + Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL")); + + received = _consumer.receive(100); + Assert.assertNull("More messages received", received); + } + + private Message nextMessage(String msg, boolean first) throws JMSException + { + Message send = _producerSession.createTextMessage("Message " + msg); + send.setBooleanProperty("first", first); + send.setLongProperty("TTL", _producer.getTimeToLive()); + return send; + } + + +} |