summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java61
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java4
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java145
8 files changed, 238 insertions, 6 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 33c87d90e3..9f803558d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -216,6 +216,8 @@ public class AMQChannel
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setExpiration();
+
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 90cf845f10..87c7db46e4 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.registry.ApplicationRegistry;
/** Combines the information that make up a deliverable message into a more manageable form. */
@@ -93,12 +95,42 @@ public class AMQMessage
private final int hashcode = System.identityHashCode(this);
+ private long _expiration;
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
+ public void setExpiration()
+ {
+ long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ {
+ _expiration = expiration;
+ }
+ else
+ {
+ // Update TTL to be in broker time.
+ if (expiration != 0L)
+ {
+ if (timestamp != 0L)
+ {
+ //todo perhaps use arrival time
+ long diff = (System.currentTimeMillis() - timestamp);
+
+ if (diff > 1000L || diff < 1000L)
+ {
+ _expiration = expiration + diff;
+ }
+ }
+ }
+ }
+
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -205,8 +237,6 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
-// _taken = new AtomicBoolean(false);
-
}
/**
@@ -617,6 +647,33 @@ public class AMQMessage
return _messageHandle.getArrivalTime();
}
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @param storecontext
+ * @param queue
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
+ */
+ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ {
+ //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ if (now > _expiration)
+ {
+ dequeue(storecontext, queue);
+ return true;
+ }
+ }
+
+ return false;
+ }
/** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 1f92cee1df..bdc2189676 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -434,13 +434,19 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
- /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+ /**
+ * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ *
+ * @return the next message or null
+ *
+ * @throws org.apache.qpid.AMQException
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
{
AMQMessage message = messages.peek();
@@ -449,9 +455,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
&& (
((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
|| sub == null)
- && message.taken(_queue, sub))
+ && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+ || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+ )
{
- //remove the already taken message
+ //remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
@@ -466,6 +474,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// try the next message
message = messages.peek();
}
+
return message;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index e6d5d0c88d..77688f19be 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import java.util.Queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
public interface Subscription
{
@@ -57,4 +58,6 @@ public interface Subscription
void addToResendQueue(AMQMessage msg);
Object getSendLock();
+
+ AMQChannel getChannel();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index c496996002..a7be9f2ad2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -668,5 +668,9 @@ public class SubscriptionImpl implements Subscription
return _sendLock;
}
+ public AMQChannel getChannel()
+ {
+ return channel;
+ }
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 42412bebae..e396432cea 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import java.util.Queue;
@@ -167,4 +168,9 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return new Object();
}
+ public AMQChannel getChannel()
+ {
+ return null;
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 1a0a341bbf..fe947ef3bc 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.AMQChannel;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -82,6 +84,10 @@ public class SubscriptionTestHelper implements Subscription
return new Object();
}
+ public AMQChannel getChannel()
+ {
+ return null;
+ }
public void queueDeleted(AMQQueue queue)
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
new file mode 100644
index 0000000000..d8163759a6
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -0,0 +1,145 @@
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import java.util.Hashtable;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class TimeToLiveTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
+
+
+ protected final String BROKER = "vm://:1";
+ protected final String VHOST = "/test";
+ protected final String QUEUE = "TimeToLiveQueue";
+
+ private final long TIME_TO_LIVE = 1000L;
+
+ Context _context;
+
+ private Connection _clientConnection, _producerConnection;
+
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+ private static final int MSG_COUNT = 50;
+
+ protected void setUp() throws Exception
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+ env.put("queue.queue", QUEUE);
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ }
+
+ public void test() throws JMSException
+ {
+ //Set TTL
+ int msg = 0;
+ _producer.send(nextMessage(String.valueOf(msg), true));
+
+ _producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ _producer.send(nextMessage(String.valueOf(msg), false));
+ }
+
+ //Reset TTL
+ _producer.setTimeToLive(0L);
+ _producer.send(nextMessage(String.valueOf(msg), false));
+
+ try
+ {
+ // Sleep to ensure TTL reached
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ _clientConnection.start();
+
+ //Receive Message 0
+ Message received = _consumer.receive(100);
+ Assert.assertNotNull("First message not received", received);
+ Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+
+ received = _consumer.receive(100);
+ Assert.assertNotNull("Final message not received", received);
+ Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+ received = _consumer.receive(100);
+ Assert.assertNull("More messages received", received);
+ }
+
+ private Message nextMessage(String msg, boolean first) throws JMSException
+ {
+ Message send = _producerSession.createTextMessage("Message " + msg);
+ send.setBooleanProperty("first", first);
+ send.setLongProperty("TTL", _producer.getTimeToLive());
+ return send;
+ }
+
+
+}