summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-02-27 13:35:38 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-02-27 13:35:38 +0000
commit9f75e5deb5ef0ca80e77a4f11983bb6a44e10e50 (patch)
treefd44664a749220744620db3d50fd6b40a13a10b2
parent6561836eb77f51b8882be7d6f87c71fa536260d0 (diff)
downloadqpid-python-9f75e5deb5ef0ca80e77a4f11983bb6a44e10e50.tar.gz
QPID-1635,QPID-1636 : Moved additional properties from AMQMessage up to QueueEntry to allow processing whilst messasge has been flowed. Moved : _flags (for Immediate and delivered status), expiry, messageID. Created base class to maintain counts of data and objects in queue. Removed this responsibility from the AMQQueues and on to the QueueEntryLists. This will more easily allow the QEL structure to be flowed to disk at a later stage. Updated tests as a result of moves.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748516 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java185
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (renamed from java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java)32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java102
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java31
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java199
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java114
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java177
19 files changed, 775 insertions, 416 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 5bde27dba5..8dac12fe24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -60,30 +60,6 @@ public interface AMQMessage
//Check the status of this message
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- boolean getDeliveredToConsumer();
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- boolean immediateAndNotDelivered();
-
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @return true if the message has expire
- *
- * @throws org.apache.qpid.AMQException
- */
- boolean expired() throws AMQException;
-
/** Is this a persistent message
*
* @return true if the message is persistent
@@ -91,13 +67,8 @@ public interface AMQMessage
boolean isPersistent();
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- */
- void setDeliveredToConsumer();
+ boolean isImmediate();
- void setExpiration(long expiration);
void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
@@ -121,4 +92,8 @@ public interface AMQMessage
String toString();
String debugIdentity();
+
+ void setExpiration(long expiration);
+
+ long getExpiration();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 34a70c6969..ade780d0bb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -36,12 +36,12 @@ public class AMQPriorityQueue extends SimpleAMQQueue
int priorities)
throws AMQException
{
- super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+ super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueEntryList.Factory(priorities));
}
public int getPriorities()
{
- return ((PriorityQueueList) _entries).getPriorities();
+ return ((PriorityQueueEntryList) _entries).getPriorities();
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
new file mode 100644
index 0000000000..72ea5f2667
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is an abstract base class to handle
+ */
+public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
+{
+ private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
+
+ private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
+ private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+ private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
+ /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
+
+ private long _memoryUsageMaximum = 0;
+
+ /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
+ private long _memoryUsageMinimum = 0;
+ private AtomicBoolean _flowed;
+ private QueueBackingStore _backingStore;
+ protected AMQQueue _queue;
+
+ FlowableBaseQueueEntryList(AMQQueue queue)
+ {
+ _queue = queue;
+ _flowed = new AtomicBoolean(false);
+ VirtualHost vhost = queue.getVirtualHost();
+ if (vhost != null)
+ {
+ _backingStore = vhost.getQueueBackingStore();
+ }
+ }
+
+ public void setFlowed(boolean flowed)
+ {
+ if (_flowed.get() != flowed)
+ {
+ _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ _flowed.set(flowed);
+ }
+ }
+
+ public boolean isFlowed()
+ {
+ return _flowed.get();
+ }
+
+ public int size()
+ {
+ return _atomicQueueCount.get();
+ }
+
+ public long dataSize()
+ {
+ return _atomicQueueSize.get();
+ }
+
+ public long memoryUsed()
+ {
+ return _atomicQueueInMemory.get();
+ }
+
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ _memoryUsageMaximum = maximumMemoryUsage;
+
+ // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
+ if (_memoryUsageMaximum > 0)
+ {
+ // If we've increased the max memory above what we have in memory then we can inhale more
+ if (_memoryUsageMaximum > _atomicQueueInMemory.get())
+ {
+ //TODO start inhaler
+ }
+ else // if we have now have to much memory in use we need to purge.
+ {
+ //TODO start purger
+ }
+ }
+ }
+
+ public long getMemoryUsageMaximum()
+ {
+ return _memoryUsageMaximum;
+ }
+
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ _memoryUsageMinimum = minimumMemoryUsage;
+
+ // Don't attempt to start the inhaler unless we have a minimum value specified.
+ if (_memoryUsageMinimum > 0)
+ {
+ // If we've increased the minimum memory above what we have in memory then we need to inhale more
+ if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
+ {
+ //TODO start inhaler
+ }
+ }
+ }
+
+ public long getMemoryUsageMinimum()
+ {
+ return _memoryUsageMinimum;
+ }
+
+ protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
+ {
+ return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
+ }
+
+ protected void incrementCounters(final QueueEntryImpl queueEntry)
+ {
+ _atomicQueueCount.incrementAndGet();
+ _atomicQueueSize.addAndGet(queueEntry.getSize());
+ if (!willCauseFlowToDisk(queueEntry))
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ else
+ {
+ setFlowed(true);
+ flowingToDisk(queueEntry);
+ }
+ }
+
+ /**
+ * Called when we are now flowing to disk
+ * @param queueEntry the entry that is being flowed to disk
+ */
+ protected void flowingToDisk(QueueEntryImpl queueEntry)
+ {
+ try
+ {
+ queueEntry.flow();
+ }
+ catch (UnableToFlowMessageException e)
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ }
+
+ protected void dequeued(QueueEntryImpl queueEntry)
+ {
+ _atomicQueueCount.decrementAndGet();
+ _atomicQueueSize.addAndGet(-queueEntry.getSize());
+ if (!queueEntry.isFlowed())
+ {
+ _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ }
+ }
+
+ public QueueBackingStore getBackingStore()
+ {
+ return _backingStore;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
new file mode 100644
index 0000000000..4e95978bf8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface FlowableQueueEntryList
+{
+ void setFlowed(boolean flowed);
+
+ boolean isFlowed();
+
+ int size();
+
+ long dataSize();
+
+ long memoryUsed();
+
+ void setMemoryUsageMaximum(long maximumMemoryUsage);
+
+ long getMemoryUsageMaximum();
+
+ void setMemoryUsageMinimum(long minimumMemoryUsage);
+
+ long getMemoryUsageMinimum();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index 7be2827e0f..d812b8ceca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -21,17 +21,17 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.AMQException;
-public class PriorityQueueList implements QueueEntryList
+public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
private final QueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
- public PriorityQueueList(AMQQueue queue, int priorities)
+ public PriorityQueueEntryList(AMQQueue queue, int priorities)
{
+ super(queue);
_queue = queue;
_priorityLists = new QueueEntryList[priorities];
_priorities = priorities;
@@ -53,7 +53,7 @@ public class PriorityQueueList implements QueueEntryList
}
public QueueEntry add(AMQMessage message)
- {
+ {
int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
@@ -154,7 +154,29 @@ public class PriorityQueueList implements QueueEntryList
public QueueEntryList createQueueEntryList(AMQQueue queue)
{
- return new PriorityQueueList(queue, _priorities);
+ return new PriorityQueueEntryList(queue, _priorities);
+ }
+ }
+
+ @Override
+ public int size()
+ {
+ int size=0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ size += queueEntryList.size();
}
+
+ return size;
+ }
+
+
+ @Override
+ protected void flowingToDisk(QueueEntryImpl queueEntry)
+ {
+ //TODO this disables FTD for priority queues
+ // As the incomming message isn't always the one to purge.
+ // More logic is required up in the add() method here to determine if the
+ // incomming message is at the 'front' or not.
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 09600b9d28..25d41c8203 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -119,16 +119,42 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ final static byte IMMEDIATE = 0x01;
+
+ /**
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
+ */
+
+ final static byte DELIVERED_TO_CONSUMER = 0x02;
+
+
AMQQueue getQueue();
AMQMessage getMessage();
long getSize();
+ /**
+ * Called selectors to determin if the message has already been sent
+ *
+ * @return _deliveredToConsumer
+ */
boolean getDeliveredToConsumer();
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @return true if the message has expire
+ *
+ * @throws org.apache.qpid.AMQException
+ */
boolean expired() throws AMQException;
+ public void setExpiration(final long expiration);
+
boolean isAcquired();
boolean acquire();
@@ -143,10 +169,22 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void setDeliveredToSubscription();
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
+ public void setDeliveredToConsumer();
+
void release();
String debugIdentity();
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
+ */
boolean immediateAndNotDelivered();
void setRedelivered(boolean b);
@@ -180,4 +218,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
-}
+
+ void flow() throws UnableToFlowMessageException;
+
+ void recover();
+
+ boolean isFlowed();
+
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 911ed8321b..3d464d01d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -78,6 +79,17 @@ public class QueueEntryImpl implements QueueEntry
volatile QueueEntryImpl _next;
+ private long _messageSize;
+ private QueueBackingStore _backingStore;
+ private AtomicBoolean _flowed;
+ private Long _messageId;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
@@ -88,8 +100,7 @@ public class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
- _queueEntryList = queueEntryList;
- _message = message;
+ this(queueEntryList,message);
_entryIdUpdater.set(this, entryId);
}
@@ -98,6 +109,19 @@ public class QueueEntryImpl implements QueueEntry
{
_queueEntryList = queueEntryList;
_message = message;
+ if (message != null)
+ {
+ _messageId = message.getMessageId();
+ _messageSize = message.getSize();
+
+ if(message.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ _expiration = message.getExpiration();
+ }
+ _backingStore = queueEntryList.getBackingStore();
+ _flowed = new AtomicBoolean(false);
}
protected void setEntryId(long entryId)
@@ -122,17 +146,34 @@ public class QueueEntryImpl implements QueueEntry
public long getSize()
{
- return getMessage().getSize();
+ return _messageSize;
}
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ public void setDeliveredToConsumer()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
}
public boolean expired() throws AMQException
{
- return getMessage().expired();
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
}
public boolean isAcquired()
@@ -169,7 +210,7 @@ public class QueueEntryImpl implements QueueEntry
public void setDeliveredToSubscription()
{
- getMessage().setDeliveredToConsumer();
+ _flags |= DELIVERED_TO_CONSUMER;
}
public void release()
@@ -185,7 +226,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -206,8 +247,8 @@ public class QueueEntryImpl implements QueueEntry
public void setRedelivered(boolean redelivered)
{
_redelivered = redelivered;
- // todo - here we could mark this message as redelivered so we don't have to mark
- // all messages on recover as redelivered.
+ // todo - here we could record this message as redelivered on this queue in the transactionLog
+ // so we don't have to mark all messages on recover as redelivered.
}
public Subscription getDeliveredSubscription()
@@ -281,6 +322,8 @@ public class QueueEntryImpl implements QueueEntry
s.restoreCredit(this);
}
+ _queueEntryList.dequeued(this);
+
getQueue().dequeue(storeContext, this);
if (_stateChangeListeners != null)
@@ -337,6 +380,34 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
+ public void flow() throws UnableToFlowMessageException
+ {
+ if (_message != null && _backingStore != null)
+ {
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Flowing message:" + _message.debugIdentity());
+ }
+ _backingStore.flow(_message);
+ _message = null;
+ _flowed.getAndSet(true);
+ }
+ }
+
+ public void recover()
+ {
+ if (_messageId != null && _backingStore != null)
+ {
+ _message = _backingStore.recover(_messageId);
+ _flowed.getAndSet(false);
+ }
+ }
+
+ public boolean isFlowed()
+ {
+ return _flowed.get();
+ }
+
public int compareTo(final QueueEntry o)
{
@@ -382,7 +453,11 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
+ if (_backingStore != null)
+ {
+ _backingStore.delete(_messageId);
+ }
return true;
}
else
@@ -395,4 +470,6 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 313e076f61..72783e3f78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryList
+public interface QueueEntryList extends FlowableQueueEntryList
{
AMQQueue getQueue();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a4945bc11a..7f46a6063a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -72,12 +72,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
- private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
@@ -106,11 +100,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** the minimum interval between sending out consecutive alerts of the same type */
public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
- /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
- private long _memoryUsageMaximum = 0;
-
- /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
- private long _memoryUsageMinimum = 0;
private static final int MAX_ASYNC_DELIVERIES = 10;
@@ -120,8 +109,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- /** Control to determin if this queue is flowed or not. */
- protected AtomicBoolean _flowed = new AtomicBoolean(false);
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -168,13 +155,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
resetNotifications();
- resetFlowToDisk();
- }
-
- public void resetFlowToDisk()
- {
- setMemoryUsageMaximum(_memoryUsageMaximum);
- setMemoryUsageMinimum(_memoryUsageMinimum);
}
public void resetNotifications()
@@ -205,7 +185,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean isFlowed()
{
- return _flowed.get();
+ return _entries.isFlowed();
}
public AMQShortString getOwner()
@@ -341,10 +321,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
-
- incrementQueueCount();
- incrementQueueSize(message);
-
_totalMessagesReceived.incrementAndGet();
QueueEntry entry;
@@ -485,17 +461,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Simple Queues don't :-)
}
- private void incrementQueueSize(final AMQMessage message)
- {
- getAtomicQueueSize().addAndGet(message.getSize());
- getAtomicQueueInMemory().addAndGet(message.getSize());
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
{
@@ -594,8 +559,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
*/
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
- decrementQueueCount();
- decrementQueueSize(entry);
if (entry.acquiredBySubscription())
{
_deliveredMessages.decrementAndGet();
@@ -625,16 +588,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- private void decrementQueueSize(final QueueEntry entry)
- {
- getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
- getAtomicQueueInMemory().addAndGet(-entry.getMessage().getSize());
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- }
public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
{
@@ -682,17 +635,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getMemoryUsageCurrent()
{
- return getAtomicQueueInMemory().get();
+ return getQueueInMemory();
}
public int getMessageCount()
{
- return getAtomicQueueCount().get();
+ return getQueueCount();
}
public long getQueueDepth()
{
- return getAtomicQueueSize().get();
+ return getQueueSize();
}
public int getUndeliveredMessageCount()
@@ -768,21 +721,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _name.compareTo(o.getName());
}
- public AtomicInteger getAtomicQueueCount()
+ public int getQueueCount()
{
- return _atomicQueueCount;
+ return _entries.size();
}
- public AtomicLong getAtomicQueueSize()
+ public long getQueueSize()
{
- return _atomicQueueSize;
+ return _entries.dataSize();
}
- public AtomicLong getAtomicQueueInMemory()
+ public long getQueueInMemory()
{
- return _atomicQueueInMemory;
- }
-
+ return _entries.memoryUsed();
+ }
private boolean isExclusiveSubscriber()
{
@@ -1493,46 +1445,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getMemoryUsageMaximum()
{
- return _memoryUsageMaximum;
+ return _entries.getMemoryUsageMaximum();
}
public void setMemoryUsageMaximum(long maximumMemoryUsage)
{
- _memoryUsageMaximum = maximumMemoryUsage;
-
- // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
- if (_memoryUsageMaximum > 0)
- {
- // If we've increased the max memory above what we have in memory then we can inhale more
- if (_memoryUsageMaximum > _atomicQueueInMemory.get())
- {
- //TODO start inhaler
- }
- else // if we have now have to much memory in use we need to purge.
- {
- //TODO start purger
- }
- }
+ _entries.setMemoryUsageMaximum(maximumMemoryUsage);
}
public long getMemoryUsageMinimum()
{
- return _memoryUsageMinimum;
+ return _entries.getMemoryUsageMinimum();
}
public void setMemoryUsageMinimum(long minimumMemoryUsage)
{
- _memoryUsageMinimum = minimumMemoryUsage;
-
- // Don't attempt to start the inhaler unless we have a minimum value specified.
- if (_memoryUsageMinimum > 0)
- {
- // If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
- {
- //TODO start inhaler
- }
- }
+ _entries.setMemoryUsageMinimum(minimumMemoryUsage);
}
public long getMinimumAlertRepeatGap()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index a46c5ae2e8..10abdd8318 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,6 +1,9 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -22,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* under the License.
*
*/
-public class SimpleQueueEntryList implements QueueEntryList
+public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
+
private final QueueEntryImpl _head;
private volatile QueueEntryImpl _tail;
@@ -41,12 +45,9 @@ public class SimpleQueueEntryList implements QueueEntryList
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
-
public SimpleQueueEntryList(AMQQueue queue)
{
+ super(queue);
_queue = queue;
_head = new QueueEntryImpl(this);
_tail = _head;
@@ -77,6 +78,9 @@ public class SimpleQueueEntryList implements QueueEntryList
public QueueEntry add(AMQMessage message)
{
QueueEntryImpl node = new QueueEntryImpl(this, message);
+
+ incrementCounters(node);
+
for (;;)
{
QueueEntryImpl tail = _tail;
@@ -101,12 +105,12 @@ public class SimpleQueueEntryList implements QueueEntryList
}
}
+
public QueueEntry next(QueueEntry node)
{
return ((QueueEntryImpl)node).getNext();
}
-
public class QueueEntryIteratorImpl implements QueueEntryIterator
{
@@ -172,7 +176,9 @@ public class SimpleQueueEntryList implements QueueEntryList
{
return new SimpleQueueEntryList(queue);
}
+
}
-
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index 0334a54fab..4c9fe81439 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -54,23 +54,11 @@ public class TransientAMQMessage implements AMQMessage
protected final Long _messageId;
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
private byte _flags = 0;
- private long _expiration;
-
private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+ private long _expiration;
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -165,11 +153,16 @@ public class TransientAMQMessage implements AMQMessage
return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")";
}
- public void setExpiration(final long expiration)
+ public void setExpiration(long expiration)
{
_expiration = expiration;
}
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -190,57 +183,6 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- public boolean immediateAndNotDelivered()
- {
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
- }
-
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @return true if the message has expire
- *
- * @throws AMQException
- */
- public boolean expired() throws AMQException
- {
-
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
-
- return false;
- }
-
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- */
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
public long getSize()
{
@@ -315,6 +257,11 @@ public class TransientAMQMessage implements AMQMessage
return false;
}
+ public boolean isImmediate()
+ {
+ return _messagePublishInfo.isImmediate();
+ }
+
/**
* This is called when all the content has been received.
*
@@ -366,11 +313,6 @@ public class TransientAMQMessage implements AMQMessage
{
_contentBodies = Collections.EMPTY_LIST;
}
-
- if (_messagePublishInfo.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
}
public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index b4b392c91d..c5b6eeca3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -48,6 +48,8 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.QueueBackingStore;
+import org.apache.qpid.server.queue.FileQueueBackingStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -86,6 +88,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
+ private QueueBackingStore _queueBackingStore;
public void setAccessableName(String name)
{
@@ -113,6 +116,11 @@ public class VirtualHost implements Accessable
return _configuration ;
}
+ public QueueBackingStore getQueueBackingStore()
+ {
+ return _queueBackingStore;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -186,6 +194,9 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
+ _queueBackingStore = new FileQueueBackingStore();
+ _queueBackingStore.configure(this,hostConfig);
+
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry.initialise();
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
index d46ba85069..49afcb1340 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
@@ -352,7 +352,7 @@ public class Show extends AbstractCommand
isredelivered.add(entry.isRedelivered() ? "true" : "false");
- isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false");
+ isdelivered.add(entry.getDeliveredToConsumer() ? "true" : "false");
BasicContentHeaderProperties headers = null;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 78cf610f28..6021f100f5 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.queue.UnableToFlowMessageException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -229,6 +230,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return false; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void setExpiration(long expiration)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean isAcquired()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
@@ -264,6 +270,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void setDeliveredToConsumer()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void release()
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -314,32 +325,38 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+
+ public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public void restoreCredit()
+ public boolean isQueueDeleted()
{
- //To change body of implemented methods use File | Settings | File Templates.
+ return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void dequeueAndDelete(StoreContext storeContext) throws FailedDequeueException
+ public void addStateChangeListener(StateChangeListener listener)
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public boolean isQueueDeleted()
+ public boolean removeStateChangeListener(StateChangeListener listener)
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void flow() throws UnableToFlowMessageException
{
//To change body of implemented methods use File | Settings | File Templates.
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public void recover()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean isFlowed()
{
return false; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index ba02e6f6bd..a11e60d7de 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -102,4 +102,11 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
return message;
}
+
+ @Override
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ //Disable this test pending completion of QPID-1637
+ }
+
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index cc6c486e11..b38da53406 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
public class MockAMQMessage extends TransientAMQMessage
{
@@ -29,6 +30,7 @@ public class MockAMQMessage extends TransientAMQMessage
throws AMQException
{
super(messageId);
+ _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
index f7cd860c22..9e12e1bef7 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTest.java
@@ -21,16 +21,25 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
public class QueueEntryImplTest extends TestCase
{
- /**
- * Test the Redelivered state of a QueueEntryImpl
- */
+ /** Test the Redelivered state of a QueueEntryImpl */
public void testRedelivered()
{
- QueueEntry entry = new QueueEntryImpl(null, null);
+ QueueEntry entry = new MockQueueEntry(null);
assertFalse("New message should not be redelivered", entry.isRedelivered());
@@ -45,5 +54,187 @@ public class QueueEntryImplTest extends TestCase
}
+ public void testImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertTrue("Undelivered Immediate message should still be marked as so", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Immediate message now be marked as so", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNotImmediateAndNotDelivered()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ try
+ {
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("Undelivered Non-Immediate message should not result in true.", queueEntry.immediateAndNotDelivered());
+
+ assertFalse("Undelivered Message should not say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ queueEntry.setDeliveredToConsumer();
+
+ assertTrue("Delivered Message should say it is delivered.", queueEntry.getDeliveredToConsumer());
+
+ assertFalse("Delivered Non-Immediate message not change this return", queueEntry.immediateAndNotDelivered());
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+ message.setExpiration(System.currentTimeMillis() + 10L);
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 20 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertTrue("After a sleep messages should now be expired.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testNoExpiry()
+ {
+ AMQMessage message = MessageFactory.getInstance().createMessage(null, false);
+
+ MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
+ int bodySize = 0;
+
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ props.setAppId("HandleTest");
+
+ ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
+
+ ReentrantLock waitLock = new ReentrantLock();
+ Condition wait = waitLock.newCondition();
+ try
+ {
+
+ message.setPublishAndContentHeaderBody(null, mpi, chb);
+
+ QueueEntry queueEntry = new MockQueueEntry(message);
+
+ assertFalse("New messages should not be expired.", queueEntry.expired());
+
+ final long MILLIS = 1000000L;
+ long waitTime = 10 * MILLIS;
+
+ while (waitTime > 0)
+ {
+ try
+ {
+ waitLock.lock();
+
+ waitTime = wait.awaitNanos(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ //Stop if we are interrupted
+ fail(e.getMessage());
+ }
+ finally
+ {
+ waitLock.unlock();
+ }
+
+ }
+
+ assertFalse("After a sleep messages without an expiry should not expire.", queueEntry.expired());
+
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ }
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 7a97837208..9a5f7f20c6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,8 +21,6 @@ package org.apache.qpid.server.queue;
*/
import junit.framework.TestCase;
-
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
@@ -35,13 +33,13 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -51,7 +49,7 @@ public class SimpleAMQQueueTest extends TestCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore();
+ protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -70,7 +68,7 @@ public class SimpleAMQQueueTest extends TestCase
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
PropertiesConfiguration env = new PropertiesConfiguration();
- _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _store);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments);
@@ -320,8 +318,8 @@ public class SimpleAMQQueueTest extends TestCase
public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException
{
// Create IncomingMessage and nondurable queue
- NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
- IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_store), _store);
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.properties = new BasicContentHeaderProperties();
@@ -335,18 +333,18 @@ public class SimpleAMQQueueTest extends TestCase
// Send persistent message
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_store);
+ msg.routingComplete(_transactionLog);
- _store.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
+ _transactionLog.storeMessageMetaData(null, messageId, new MessageMetaData(info, contentHeaderBody, 1));
// Check that it is enqueued
- List<AMQQueue> data = _store.getMessageReferenceMap(messageId);
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
assertNotNull(data);
// Dequeue message
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
- AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _store);
+ AMQMessage message = new MockPersistentAMQMessage(msg.getMessageId(), _transactionLog);
message.setPublishAndContentHeaderBody(new StoreContext(), info, header);
MockQueueEntry entry = new MockQueueEntry(message, _queue);
@@ -355,10 +353,97 @@ public class SimpleAMQQueueTest extends TestCase
entry.dequeue(null);
// Check that it is dequeued
- data = _store.getMessageReferenceMap(messageId);
+ data = _transactionLog.getMessageReferenceMap(messageId);
assertNull(data);
}
+ public void testMessagesFlowToDisk() throws AMQException, InterruptedException
+ {
+ // Create IncomingMessage and nondurable queue
+ NonTransactionalContext txnContext = new NonTransactionalContext(_transactionLog, null, null, null);
+
+ //Set the Memory Usage to be very low
+ _queue.setMemoryUsageMaximum(10);
+
+ for (int msgCount = 0; msgCount < 10; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+
+ //Check that we can hold 10 messages without flowing
+ assertEquals(10, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is flowed.", !_queue.isFlowed());
+
+ // Send anothe and ensure we are flowed
+ sendMessage(txnContext);
+ assertEquals(11, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ //send another 9 so there are 20msgs in total on the queue
+ for (int msgCount = 0; msgCount < 9; msgCount++)
+ {
+ sendMessage(txnContext);
+ }
+ assertEquals(20, _queue.getMessageCount());
+ assertEquals(10, _queue.getMemoryUsageCurrent());
+ assertTrue("Queue is not flowed.", _queue.isFlowed());
+
+ _queue.registerSubscription(_subscription, false);
+
+ Thread.sleep(200);
+
+ //Ensure the messages are retreived
+ assertEquals("Not all messages were received.", 20, _subscription.getMessages().size());
+
+ //Ensure we got the content
+ for (int index = 0; index < 10; index++)
+ {
+ QueueEntry entry = _subscription.getMessages().get(index);
+ assertNotNull("Message:" + index + " was null.", entry.getMessage());
+ assertTrue(!entry.isFlowed());
+ }
+
+ //ensure we were received 10 flowed messages
+ for (int index = 10; index < 20; index++)
+ {
+ QueueEntry entry = _subscription.getMessages().get(index);
+ assertNull("Message:" + index + " was not null.", entry.getMessage());
+ assertTrue(entry.isFlowed());
+ }
+ }
+
+ private void sendMessage(TransactionalContext txnContext) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, txnContext, new MockProtocolSession(_transactionLog), _transactionLog);
+
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.bodySize = 1;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ msg.setContentHeaderBody(contentHeaderBody);
+
+ long messageId = msg.getMessageId();
+
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+
+ // Send persistent 10 messages
+
+ qs.add(_queue);
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ msg.addContentBodyFrame(new MockContentChunk(1));
+
+ msg.deliverToQueues();
+
+ //Check message was correctly enqueued
+ List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(data);
+ }
+
// FIXME: move this to somewhere useful
private static AMQMessage createMessage(final MessagePublishInfo publishBody)
{
@@ -384,7 +469,7 @@ public class SimpleAMQQueueTest extends TestCase
public AMQMessage createMessage() throws AMQException
{
- AMQMessage message = new TestMessage(info, _store);
+ AMQMessage message = new TestMessage(info, _transactionLog);
ContentHeaderBody header = new ContentHeaderBody();
header.bodySize = MESSAGE_SIZE;
@@ -410,7 +495,6 @@ public class SimpleAMQQueueTest extends TestCase
_transactionLog = transactionLog;
}
-
void assertCountEquals(int expected)
{
assertEquals("Wrong count for message with tag " + _tag, expected,
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
index 16d1ab60f3..6fd153f398 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/TransientMessageTest.java
@@ -287,180 +287,5 @@ public class TransientMessageTest extends TestCase
assertFalse(_message.isPersistent());
}
- public void testImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, true, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertTrue("Undelivered Immediate message should still be marked as so", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Immediate message now be marked as so", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testNotImmediateAndNotDelivered()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- try
- {
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("Undelivered Non-Immediate message should not result in true.", _message.immediateAndNotDelivered());
-
- assertFalse("Undelivered Message should not say it is delivered.", _message.getDeliveredToConsumer());
-
- _message.setDeliveredToConsumer();
-
- assertTrue("Delivered Message should say it is delivered.", _message.getDeliveredToConsumer());
-
- assertFalse("Delivered Non-Immediate message not change this return", _message.immediateAndNotDelivered());
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
- public void testExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
- _message.setExpiration(System.currentTimeMillis() + 10L);
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 20 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertTrue("After a sleep messages should now be expired.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
-
- public void testNoExpiry()
- {
- _message = newMessage();
-
- MessagePublishInfo mpi = new MessagePublishInfoImpl(null, false, false, null);
- int bodySize = 0;
-
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
-
- props.setAppId("HandleTest");
-
- ContentHeaderBody chb = new ContentHeaderBody(0, 0, props, bodySize);
-
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- try
- {
-
- _message.setPublishAndContentHeaderBody(_storeContext, mpi, chb);
-
- assertFalse("New messages should not be expired.", _message.expired());
-
- final long MILLIS =1000000L;
- long waitTime = 10 * MILLIS;
-
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- assertFalse("After a sleep messages without an expiry should not expire.", _message.expired());
-
- }
- catch (AMQException e)
- {
- fail(e.getMessage());
- }
- }
-
+
}