summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java35
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java185
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java (renamed from java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java)32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java47
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java97
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java102
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java20
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java82
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java11
12 files changed, 446 insertions, 213 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 5bde27dba5..8dac12fe24 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -60,30 +60,6 @@ public interface AMQMessage
//Check the status of this message
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- boolean getDeliveredToConsumer();
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- boolean immediateAndNotDelivered();
-
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @return true if the message has expire
- *
- * @throws org.apache.qpid.AMQException
- */
- boolean expired() throws AMQException;
-
/** Is this a persistent message
*
* @return true if the message is persistent
@@ -91,13 +67,8 @@ public interface AMQMessage
boolean isPersistent();
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- */
- void setDeliveredToConsumer();
+ boolean isImmediate();
- void setExpiration(long expiration);
void setClientIdentifier(AMQProtocolSession.ProtocolSessionIdentifier sessionIdentifier);
@@ -121,4 +92,8 @@ public interface AMQMessage
String toString();
String debugIdentity();
+
+ void setExpiration(long expiration);
+
+ long getExpiration();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index 34a70c6969..ade780d0bb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -36,12 +36,12 @@ public class AMQPriorityQueue extends SimpleAMQQueue
int priorities)
throws AMQException
{
- super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
+ super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueEntryList.Factory(priorities));
}
public int getPriorities()
{
- return ((PriorityQueueList) _entries).getPriorities();
+ return ((PriorityQueueEntryList) _entries).getPriorities();
}
@Override
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
new file mode 100644
index 0000000000..72ea5f2667
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is an abstract base class to handle
+ */
+public abstract class FlowableBaseQueueEntryList implements FlowableQueueEntryList
+{
+ private static final Logger _log = Logger.getLogger(FlowableBaseQueueEntryList.class);
+
+ private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
+ private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
+ private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
+ /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
+
+ private long _memoryUsageMaximum = 0;
+
+ /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
+ private long _memoryUsageMinimum = 0;
+ private AtomicBoolean _flowed;
+ private QueueBackingStore _backingStore;
+ protected AMQQueue _queue;
+
+ FlowableBaseQueueEntryList(AMQQueue queue)
+ {
+ _queue = queue;
+ _flowed = new AtomicBoolean(false);
+ VirtualHost vhost = queue.getVirtualHost();
+ if (vhost != null)
+ {
+ _backingStore = vhost.getQueueBackingStore();
+ }
+ }
+
+ public void setFlowed(boolean flowed)
+ {
+ if (_flowed.get() != flowed)
+ {
+ _log.info("Marking Queue(" + _queue.getName() + ") as flowed (" + flowed + ")");
+ _flowed.set(flowed);
+ }
+ }
+
+ public boolean isFlowed()
+ {
+ return _flowed.get();
+ }
+
+ public int size()
+ {
+ return _atomicQueueCount.get();
+ }
+
+ public long dataSize()
+ {
+ return _atomicQueueSize.get();
+ }
+
+ public long memoryUsed()
+ {
+ return _atomicQueueInMemory.get();
+ }
+
+ public void setMemoryUsageMaximum(long maximumMemoryUsage)
+ {
+ _memoryUsageMaximum = maximumMemoryUsage;
+
+ // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
+ if (_memoryUsageMaximum > 0)
+ {
+ // If we've increased the max memory above what we have in memory then we can inhale more
+ if (_memoryUsageMaximum > _atomicQueueInMemory.get())
+ {
+ //TODO start inhaler
+ }
+ else // if we have now have to much memory in use we need to purge.
+ {
+ //TODO start purger
+ }
+ }
+ }
+
+ public long getMemoryUsageMaximum()
+ {
+ return _memoryUsageMaximum;
+ }
+
+ public void setMemoryUsageMinimum(long minimumMemoryUsage)
+ {
+ _memoryUsageMinimum = minimumMemoryUsage;
+
+ // Don't attempt to start the inhaler unless we have a minimum value specified.
+ if (_memoryUsageMinimum > 0)
+ {
+ // If we've increased the minimum memory above what we have in memory then we need to inhale more
+ if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
+ {
+ //TODO start inhaler
+ }
+ }
+ }
+
+ public long getMemoryUsageMinimum()
+ {
+ return _memoryUsageMinimum;
+ }
+
+ protected boolean willCauseFlowToDisk(QueueEntryImpl queueEntry)
+ {
+ return _memoryUsageMaximum != 0 && memoryUsed() + queueEntry.getSize() > _memoryUsageMaximum;
+ }
+
+ protected void incrementCounters(final QueueEntryImpl queueEntry)
+ {
+ _atomicQueueCount.incrementAndGet();
+ _atomicQueueSize.addAndGet(queueEntry.getSize());
+ if (!willCauseFlowToDisk(queueEntry))
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ else
+ {
+ setFlowed(true);
+ flowingToDisk(queueEntry);
+ }
+ }
+
+ /**
+ * Called when we are now flowing to disk
+ * @param queueEntry the entry that is being flowed to disk
+ */
+ protected void flowingToDisk(QueueEntryImpl queueEntry)
+ {
+ try
+ {
+ queueEntry.flow();
+ }
+ catch (UnableToFlowMessageException e)
+ {
+ _atomicQueueInMemory.addAndGet(queueEntry.getSize());
+ }
+ }
+
+ protected void dequeued(QueueEntryImpl queueEntry)
+ {
+ _atomicQueueCount.decrementAndGet();
+ _atomicQueueSize.addAndGet(-queueEntry.getSize());
+ if (!queueEntry.isFlowed())
+ {
+ _atomicQueueInMemory.addAndGet(-queueEntry.getSize());
+ }
+ }
+
+ public QueueBackingStore getBackingStore()
+ {
+ return _backingStore;
+ }
+
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
new file mode 100644
index 0000000000..4e95978bf8
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableQueueEntryList.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface FlowableQueueEntryList
+{
+ void setFlowed(boolean flowed);
+
+ boolean isFlowed();
+
+ int size();
+
+ long dataSize();
+
+ long memoryUsed();
+
+ void setMemoryUsageMaximum(long maximumMemoryUsage);
+
+ long getMemoryUsageMaximum();
+
+ void setMemoryUsageMinimum(long minimumMemoryUsage);
+
+ long getMemoryUsageMinimum();
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
index 7be2827e0f..d812b8ceca 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueEntryList.java
@@ -21,17 +21,17 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.framing.CommonContentHeaderProperties;
-import org.apache.qpid.AMQException;
-public class PriorityQueueList implements QueueEntryList
+public class PriorityQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
private final AMQQueue _queue;
private final QueueEntryList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
- public PriorityQueueList(AMQQueue queue, int priorities)
+ public PriorityQueueEntryList(AMQQueue queue, int priorities)
{
+ super(queue);
_queue = queue;
_priorityLists = new QueueEntryList[priorities];
_priorities = priorities;
@@ -53,7 +53,7 @@ public class PriorityQueueList implements QueueEntryList
}
public QueueEntry add(AMQMessage message)
- {
+ {
int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
@@ -154,7 +154,29 @@ public class PriorityQueueList implements QueueEntryList
public QueueEntryList createQueueEntryList(AMQQueue queue)
{
- return new PriorityQueueList(queue, _priorities);
+ return new PriorityQueueEntryList(queue, _priorities);
+ }
+ }
+
+ @Override
+ public int size()
+ {
+ int size=0;
+ for (QueueEntryList queueEntryList : _priorityLists)
+ {
+ size += queueEntryList.size();
}
+
+ return size;
+ }
+
+
+ @Override
+ protected void flowingToDisk(QueueEntryImpl queueEntry)
+ {
+ //TODO this disables FTD for priority queues
+ // As the incomming message isn't always the one to purge.
+ // More logic is required up in the add() method here to determine if the
+ // incomming message is at the 'front' or not.
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 09600b9d28..25d41c8203 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -119,16 +119,42 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
final static EntryState EXPIRED_STATE = new ExpiredState();
final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
+ /** Flag to indicate that this message requires 'immediate' delivery. */
+
+ final static byte IMMEDIATE = 0x01;
+
+ /**
+ * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
+ */
+
+ final static byte DELIVERED_TO_CONSUMER = 0x02;
+
+
AMQQueue getQueue();
AMQMessage getMessage();
long getSize();
+ /**
+ * Called selectors to determin if the message has already been sent
+ *
+ * @return _deliveredToConsumer
+ */
boolean getDeliveredToConsumer();
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @return true if the message has expire
+ *
+ * @throws org.apache.qpid.AMQException
+ */
boolean expired() throws AMQException;
+ public void setExpiration(final long expiration);
+
boolean isAcquired();
boolean acquire();
@@ -143,10 +169,22 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void setDeliveredToSubscription();
+ /**
+ * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
+ public void setDeliveredToConsumer();
+
void release();
String debugIdentity();
+ /**
+ * Called to enforce the 'immediate' flag.
+ *
+ * @returns true if the message is marked for immediate delivery but has not been marked as delivered
+ * to a consumer
+ */
boolean immediateAndNotDelivered();
void setRedelivered(boolean b);
@@ -180,4 +218,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
-}
+
+ void flow() throws UnableToFlowMessageException;
+
+ void recover();
+
+ boolean isFlowed();
+
+} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 911ed8321b..3d464d01d3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -78,6 +79,17 @@ public class QueueEntryImpl implements QueueEntry
volatile QueueEntryImpl _next;
+ private long _messageSize;
+ private QueueBackingStore _backingStore;
+ private AtomicBoolean _flowed;
+ private Long _messageId;
+
+ private byte _flags = 0;
+
+ private long _expiration;
+
+ private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
@@ -88,8 +100,7 @@ public class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
{
- _queueEntryList = queueEntryList;
- _message = message;
+ this(queueEntryList,message);
_entryIdUpdater.set(this, entryId);
}
@@ -98,6 +109,19 @@ public class QueueEntryImpl implements QueueEntry
{
_queueEntryList = queueEntryList;
_message = message;
+ if (message != null)
+ {
+ _messageId = message.getMessageId();
+ _messageSize = message.getSize();
+
+ if(message.isImmediate())
+ {
+ _flags |= IMMEDIATE;
+ }
+ _expiration = message.getExpiration();
+ }
+ _backingStore = queueEntryList.getBackingStore();
+ _flowed = new AtomicBoolean(false);
}
protected void setEntryId(long entryId)
@@ -122,17 +146,34 @@ public class QueueEntryImpl implements QueueEntry
public long getSize()
{
- return getMessage().getSize();
+ return _messageSize;
}
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return (_flags & DELIVERED_TO_CONSUMER) != 0;
+ }
+
+ public void setDeliveredToConsumer()
+ {
+ _flags |= DELIVERED_TO_CONSUMER;
}
public boolean expired() throws AMQException
{
- return getMessage().expired();
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > _expiration);
+ }
+
+ return false;
+ }
+
+ public void setExpiration(final long expiration)
+ {
+ _expiration = expiration;
}
public boolean isAcquired()
@@ -169,7 +210,7 @@ public class QueueEntryImpl implements QueueEntry
public void setDeliveredToSubscription()
{
- getMessage().setDeliveredToConsumer();
+ _flags |= DELIVERED_TO_CONSUMER;
}
public void release()
@@ -185,7 +226,7 @@ public class QueueEntryImpl implements QueueEntry
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
}
public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -206,8 +247,8 @@ public class QueueEntryImpl implements QueueEntry
public void setRedelivered(boolean redelivered)
{
_redelivered = redelivered;
- // todo - here we could mark this message as redelivered so we don't have to mark
- // all messages on recover as redelivered.
+ // todo - here we could record this message as redelivered on this queue in the transactionLog
+ // so we don't have to mark all messages on recover as redelivered.
}
public Subscription getDeliveredSubscription()
@@ -281,6 +322,8 @@ public class QueueEntryImpl implements QueueEntry
s.restoreCredit(this);
}
+ _queueEntryList.dequeued(this);
+
getQueue().dequeue(storeContext, this);
if (_stateChangeListeners != null)
@@ -337,6 +380,34 @@ public class QueueEntryImpl implements QueueEntry
return false;
}
+ public void flow() throws UnableToFlowMessageException
+ {
+ if (_message != null && _backingStore != null)
+ {
+ if(_log.isDebugEnabled())
+ {
+ _log.debug("Flowing message:" + _message.debugIdentity());
+ }
+ _backingStore.flow(_message);
+ _message = null;
+ _flowed.getAndSet(true);
+ }
+ }
+
+ public void recover()
+ {
+ if (_messageId != null && _backingStore != null)
+ {
+ _message = _backingStore.recover(_messageId);
+ _flowed.getAndSet(false);
+ }
+ }
+
+ public boolean isFlowed()
+ {
+ return _flowed.get();
+ }
+
public int compareTo(final QueueEntry o)
{
@@ -382,7 +453,11 @@ public class QueueEntryImpl implements QueueEntry
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
+ if (_backingStore != null)
+ {
+ _backingStore.delete(_messageId);
+ }
return true;
}
else
@@ -395,4 +470,6 @@ public class QueueEntryImpl implements QueueEntry
{
return _queueEntryList;
}
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 313e076f61..72783e3f78 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryList
+public interface QueueEntryList extends FlowableQueueEntryList
{
AMQQueue getQueue();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a4945bc11a..7f46a6063a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -72,12 +72,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
- private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
- private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
- private final AtomicLong _atomicQueueInMemory = new AtomicLong(0L);
-
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
@@ -106,11 +100,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** the minimum interval between sending out consecutive alerts of the same type */
public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
- /** The maximum amount of memory that is allocated to this queue. Beyond this the queue will flow to disk. */
- private long _memoryUsageMaximum = 0;
-
- /** The minimum amount of memory that is allocated to this queue. If the queueDepth hits this level then more flowed data can be read in. */
- private long _memoryUsageMinimum = 0;
private static final int MAX_ASYNC_DELIVERIES = 10;
@@ -120,8 +109,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
- /** Control to determin if this queue is flowed or not. */
- protected AtomicBoolean _flowed = new AtomicBoolean(false);
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -168,13 +155,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
resetNotifications();
- resetFlowToDisk();
- }
-
- public void resetFlowToDisk()
- {
- setMemoryUsageMaximum(_memoryUsageMaximum);
- setMemoryUsageMinimum(_memoryUsageMinimum);
}
public void resetNotifications()
@@ -205,7 +185,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean isFlowed()
{
- return _flowed.get();
+ return _entries.isFlowed();
}
public AMQShortString getOwner()
@@ -341,10 +321,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
-
- incrementQueueCount();
- incrementQueueSize(message);
-
_totalMessagesReceived.incrementAndGet();
QueueEntry entry;
@@ -485,17 +461,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Simple Queues don't :-)
}
- private void incrementQueueSize(final AMQMessage message)
- {
- getAtomicQueueSize().addAndGet(message.getSize());
- getAtomicQueueInMemory().addAndGet(message.getSize());
- }
-
- private void incrementQueueCount()
- {
- getAtomicQueueCount().incrementAndGet();
- }
-
private void deliverMessage(final Subscription sub, final QueueEntry entry)
throws AMQException
{
@@ -594,8 +559,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
*/
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
- decrementQueueCount();
- decrementQueueSize(entry);
if (entry.acquiredBySubscription())
{
_deliveredMessages.decrementAndGet();
@@ -625,16 +588,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- private void decrementQueueSize(final QueueEntry entry)
- {
- getAtomicQueueSize().addAndGet(-entry.getMessage().getSize());
- getAtomicQueueInMemory().addAndGet(-entry.getMessage().getSize());
- }
-
- void decrementQueueCount()
- {
- getAtomicQueueCount().decrementAndGet();
- }
public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
{
@@ -682,17 +635,17 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getMemoryUsageCurrent()
{
- return getAtomicQueueInMemory().get();
+ return getQueueInMemory();
}
public int getMessageCount()
{
- return getAtomicQueueCount().get();
+ return getQueueCount();
}
public long getQueueDepth()
{
- return getAtomicQueueSize().get();
+ return getQueueSize();
}
public int getUndeliveredMessageCount()
@@ -768,21 +721,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return _name.compareTo(o.getName());
}
- public AtomicInteger getAtomicQueueCount()
+ public int getQueueCount()
{
- return _atomicQueueCount;
+ return _entries.size();
}
- public AtomicLong getAtomicQueueSize()
+ public long getQueueSize()
{
- return _atomicQueueSize;
+ return _entries.dataSize();
}
- public AtomicLong getAtomicQueueInMemory()
+ public long getQueueInMemory()
{
- return _atomicQueueInMemory;
- }
-
+ return _entries.memoryUsed();
+ }
private boolean isExclusiveSubscriber()
{
@@ -1493,46 +1445,22 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public long getMemoryUsageMaximum()
{
- return _memoryUsageMaximum;
+ return _entries.getMemoryUsageMaximum();
}
public void setMemoryUsageMaximum(long maximumMemoryUsage)
{
- _memoryUsageMaximum = maximumMemoryUsage;
-
- // Don't attempt to start the inhaler/purger unless we have a minimum value specified.
- if (_memoryUsageMaximum > 0)
- {
- // If we've increased the max memory above what we have in memory then we can inhale more
- if (_memoryUsageMaximum > _atomicQueueInMemory.get())
- {
- //TODO start inhaler
- }
- else // if we have now have to much memory in use we need to purge.
- {
- //TODO start purger
- }
- }
+ _entries.setMemoryUsageMaximum(maximumMemoryUsage);
}
public long getMemoryUsageMinimum()
{
- return _memoryUsageMinimum;
+ return _entries.getMemoryUsageMinimum();
}
public void setMemoryUsageMinimum(long minimumMemoryUsage)
{
- _memoryUsageMinimum = minimumMemoryUsage;
-
- // Don't attempt to start the inhaler unless we have a minimum value specified.
- if (_memoryUsageMinimum > 0)
- {
- // If we've increased the minimum memory above what we have in memory then we need to inhale more
- if (_memoryUsageMinimum >= _atomicQueueInMemory.get())
- {
- //TODO start inhaler
- }
- }
+ _entries.setMemoryUsageMinimum(minimumMemoryUsage);
}
public long getMinimumAlertRepeatGap()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index a46c5ae2e8..10abdd8318 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -1,6 +1,9 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -22,8 +25,9 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
* under the License.
*
*/
-public class SimpleQueueEntryList implements QueueEntryList
+public class SimpleQueueEntryList extends FlowableBaseQueueEntryList implements QueueEntryList
{
+
private final QueueEntryImpl _head;
private volatile QueueEntryImpl _tail;
@@ -41,12 +45,9 @@ public class SimpleQueueEntryList implements QueueEntryList
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
-
public SimpleQueueEntryList(AMQQueue queue)
{
+ super(queue);
_queue = queue;
_head = new QueueEntryImpl(this);
_tail = _head;
@@ -77,6 +78,9 @@ public class SimpleQueueEntryList implements QueueEntryList
public QueueEntry add(AMQMessage message)
{
QueueEntryImpl node = new QueueEntryImpl(this, message);
+
+ incrementCounters(node);
+
for (;;)
{
QueueEntryImpl tail = _tail;
@@ -101,12 +105,12 @@ public class SimpleQueueEntryList implements QueueEntryList
}
}
+
public QueueEntry next(QueueEntry node)
{
return ((QueueEntryImpl)node).getNext();
}
-
public class QueueEntryIteratorImpl implements QueueEntryIterator
{
@@ -172,7 +176,9 @@ public class SimpleQueueEntryList implements QueueEntryList
{
return new SimpleQueueEntryList(queue);
}
+
}
-
+
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
index 0334a54fab..4c9fe81439 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientAMQMessage.java
@@ -54,23 +54,11 @@ public class TransientAMQMessage implements AMQMessage
protected final Long _messageId;
- /** Flag to indicate that this message requires 'immediate' delivery. */
-
- private static final byte IMMEDIATE = 0x01;
-
- /**
- * Flag to indicate whether this message has been delivered to a consumer. Used in implementing return functionality
- * for messages published with the 'immediate' flag.
- */
-
- private static final byte DELIVERED_TO_CONSUMER = 0x02;
private byte _flags = 0;
- private long _expiration;
-
private AMQProtocolSession.ProtocolSessionIdentifier _sessionIdentifier;
- private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+ private long _expiration;
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
@@ -165,11 +153,16 @@ public class TransientAMQMessage implements AMQMessage
return "(HC:" + System.identityHashCode(this) + " ID:" + getMessageId() +")";
}
- public void setExpiration(final long expiration)
+ public void setExpiration(long expiration)
{
_expiration = expiration;
}
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
public Iterator<AMQDataBlock> getBodyFrameIterator(AMQProtocolSession protocolSession, int channel)
{
return new BodyFrameIterator(protocolSession, channel);
@@ -190,57 +183,6 @@ public class TransientAMQMessage implements AMQMessage
return _messageId;
}
- /**
- * Called selectors to determin if the message has already been sent
- *
- * @return _deliveredToConsumer
- */
- public boolean getDeliveredToConsumer()
- {
- return (_flags & DELIVERED_TO_CONSUMER) != 0;
- }
-
- /**
- * Called to enforce the 'immediate' flag.
- *
- * @returns true if the message is marked for immediate delivery but has not been marked as delivered
- * to a consumer
- */
- public boolean immediateAndNotDelivered()
- {
-
- return (_flags & IMMEDIATE_AND_DELIVERED) == IMMEDIATE;
-
- }
-
- /**
- * Checks to see if the message has expired. If it has the message is dequeued.
- *
- * @return true if the message has expire
- *
- * @throws AMQException
- */
- public boolean expired() throws AMQException
- {
-
- if (_expiration != 0L)
- {
- long now = System.currentTimeMillis();
-
- return (now > _expiration);
- }
-
- return false;
- }
-
- /**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
- * And for selector efficiency.
- */
- public void setDeliveredToConsumer()
- {
- _flags |= DELIVERED_TO_CONSUMER;
- }
public long getSize()
{
@@ -315,6 +257,11 @@ public class TransientAMQMessage implements AMQMessage
return false;
}
+ public boolean isImmediate()
+ {
+ return _messagePublishInfo.isImmediate();
+ }
+
/**
* This is called when all the content has been received.
*
@@ -366,11 +313,6 @@ public class TransientAMQMessage implements AMQMessage
{
_contentBodies = Collections.EMPTY_LIST;
}
-
- if (_messagePublishInfo.isImmediate())
- {
- _flags |= IMMEDIATE;
- }
}
public void recoverContentBodyFrame(ContentChunk contentChunk, boolean isLastContentBody) throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index b4b392c91d..c5b6eeca3e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -48,6 +48,8 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.QueueBackingStore;
+import org.apache.qpid.server.queue.FileQueueBackingStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.security.access.ACLManager;
@@ -86,6 +88,7 @@ public class VirtualHost implements Accessable
private final Timer _houseKeepingTimer;
private VirtualHostConfiguration _configuration;
+ private QueueBackingStore _queueBackingStore;
public void setAccessableName(String name)
{
@@ -113,6 +116,11 @@ public class VirtualHost implements Accessable
return _configuration ;
}
+ public QueueBackingStore getQueueBackingStore()
+ {
+ return _queueBackingStore;
+ }
+
/**
* Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any
* implementaion of an Exchange MBean should extend this class.
@@ -186,6 +194,9 @@ public class VirtualHost implements Accessable
initialiseRoutingTable(hostConfig);
}
+ _queueBackingStore = new FileQueueBackingStore();
+ _queueBackingStore.configure(this,hostConfig);
+
_exchangeFactory.initialise(hostConfig);
_exchangeRegistry.initialise();