summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-05-07 22:40:52 +0000
commit9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch)
treeae86cedd9fdcea4f49993e5a82954ccda53a1ed3
parent1427de0275b5db2c8619db9211435897123259d8 (diff)
downloadqpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java142
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java41
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java111
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java212
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java61
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java85
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java28
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java91
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java9
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java27
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java347
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes2
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
26 files changed, 1166 insertions, 154 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index fb1d7c5265..c60e9d14f2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
@@ -58,6 +59,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
@@ -74,7 +76,6 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
import org.apache.qpid.server.store.berkeleydb.entry.Xid;
@@ -97,6 +98,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public static final int VERSION = 6;
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+ public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+ public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
private Environment _environment;
@@ -152,7 +155,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
private ConfigurationRecoveryHandler _configRecoveryHandler;
-
+
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private final EventManager _eventManager = new EventManager();
private String _storeLocation;
@@ -163,6 +171,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_stateManager = new StateManager(_eventManager);
}
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration) throws Exception
@@ -171,7 +185,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
_configRecoveryHandler = recoveryHandler;
- configure(name,storeConfiguration);
+ configure(name, storeConfiguration);
@@ -218,6 +232,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore
final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY,
System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name);
+ _persistentSizeHighThreshold = storeConfig.getLong(OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
+ _persistentSizeLowThreshold = storeConfig.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeLowThreshold;
+ }
+
File environmentPath = new File(storeLocation);
if (!environmentPath.exists())
{
@@ -235,6 +256,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
setupStore(environmentPath, name);
}
+ @Override
+ public String getStoreLocation()
+ {
+ return _storeLocation;
+ }
+
/**
* Move the store state from INITIAL to ACTIVE without actually recovering.
*
@@ -257,6 +284,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore
new Upgrader(_environment, name).upgradeIfNecessary();
openDatabases();
+
+ _totalStoreSize = getSizeOnDisk();
}
protected Environment createEnvironment(File environmentPath) throws DatabaseException
@@ -553,6 +582,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
StorableMessageMetaData metaData = valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
+
mrh.message(message);
maxId = Math.max(maxId, messageId);
@@ -953,7 +983,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key);
DatabaseEntry value = new DatabaseEntry();
- LongBinding.longToEntry(link.getCreateTime(),value);
+ LongBinding.longToEntry(link.getCreateTime(), value);
StringMapBinding.getInstance().objectToEntry(link.getArguments(), value);
try
@@ -1247,7 +1277,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
/**
* Primarily for testing purposes.
*
- * @param queueName
+ * @param queueId
*
* @return a list of message ids for messages enqueued for a particular queue
*/
@@ -1698,7 +1728,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
synchronized void store(com.sleepycat.je.Transaction txn)
{
- if(_metaData != null)
+ if(unstored())
{
try
{
@@ -1728,14 +1758,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
+ private boolean unstored()
+ {
+ return _metaData != null;
+ }
+
public synchronized StoreFuture flushToStore()
{
- if(_metaData != null)
+ if(unstored())
{
com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
store(txn);
AbstractBDBMessageStore.this.commit(txn,true);
-
+ storedSizeChange(getMetaData().getContentSize());
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@@ -1744,7 +1779,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
try
{
+ int delta = getMetaData().getContentSize();
AbstractBDBMessageStore.this.removeMessage(_messageId, false);
+ storedSizeChange(-delta);
+
}
catch (AMQStoreException e)
{
@@ -1756,6 +1794,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
private class BDBTransaction implements org.apache.qpid.server.store.Transaction
{
private com.sleepycat.je.Transaction _txn;
+ private int _storeSizeIncrease;
private BDBTransaction()
{
@@ -1773,7 +1812,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore
{
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
- ((StoredBDBMessage)message.getStoredMessage()).store(_txn);
+ final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
+ storedMessage.store(_txn);
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber());
@@ -1787,10 +1828,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
+ AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease);
return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
}
@@ -1811,15 +1854,84 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
+ private void storedSizeChange(final int delta)
{
- _eventManager.addEventListener(eventListener, events);
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized (this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 2*delta;
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ _totalStoreSize = getSizeOnDisk();
+
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ _totalStoreSize = getSizeOnDisk();
+
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk();
+
+ _totalStoreSize = getSizeOnDisk();
+
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ }
}
- @Override
- public String getStoreLocation()
+ private void reduceSizeOnDisk()
{
- return _storeLocation;
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false");
+ boolean cleaned = false;
+ while (_environment.cleanLog() > 0)
+ {
+ cleaned = true;
+ }
+ if (cleaned)
+ {
+ CheckpointConfig force = new CheckpointConfig();
+ force.setForce(true);
+ _environment.checkpoint(force);
+ }
+
+
+ _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true");
+ }
+
+ private long getSizeOnDisk()
+ {
+ return _environment.getStats(null).getTotalLogSize();
+ }
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index 9f7eb4bfd9..b414441b92 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -21,12 +21,19 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -49,6 +56,8 @@ public class BDBMessageStore extends AbstractBDBMessageStore
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
+ private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>();
+
@Override
protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException
{
@@ -64,6 +73,17 @@ public class BDBMessageStore extends AbstractBDBMessageStore
// This is what allows the creation of the store if it does not already exist.
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
+
+ Properties props = System.getProperties();
+
+ for(String propName : props.stringPropertyNames())
+ {
+ if(propName.startsWith("qpid.bdb.envconfig.je."))
+ {
+ envConfig.setConfigParam(propName.substring(19), props.getProperty(propName));
+ }
+ }
+
envConfig.setConfigParam("je.lock.nLockTables", "7");
// Added to help diagnosis of Deadlock issue
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 8198cec821..4fd4e02220 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -32,10 +34,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -89,7 +89,6 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -157,7 +156,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -1357,9 +1356,34 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
return _actor;
}
- public void block(AMQQueue queue)
+ public synchronized void block()
+ {
+ if(_blockingEntities.add(this))
+ {
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ flow(false);
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blockingEntities.remove(this))
+ {
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+
+ flow(true);
+ }
+ }
+ }
+
+ public synchronized void block(AMQQueue queue)
{
- if(_blockingQueues.add(queue))
+ if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -1370,11 +1394,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
- public void unblock(AMQQueue queue)
+ public synchronized void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue))
+ if(_blockingEntities.remove(queue))
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
_actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
index 10e40151b0..aaa1766489 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
@@ -61,7 +61,6 @@ public class TopicConfig extends ConfigurationPlugin
throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element.");
}
- System.err.println("********* Created TC:"+this);
}
@@ -75,5 +74,5 @@ public class TopicConfig extends ConfigurationPlugin
}
return response;
- }
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 4a58314f51..09dc5a2473 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,6 +38,8 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
+ private final Collection<RegistryChangeListener> _listeners =
+ new ArrayList<RegistryChangeListener>();
public void initialise()
{
@@ -80,16 +83,48 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
public void registerConnection(AMQConnectionModel connnection)
{
- _registry.add(connnection);
+ synchronized (this)
+ {
+ _registry.add(connnection);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionRegistered(connnection);
+ }
+ }
+ }
}
public void deregisterConnection(AMQConnectionModel connnection)
{
- _registry.remove(connnection);
+ synchronized (this)
+ {
+ _registry.remove(connnection);
+
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionUnregistered(connnection);
+ }
+ }
+ }
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ synchronized (_listeners)
+ {
+ _listeners.add(listener);
+ }
}
public List<AMQConnectionModel> getConnections()
{
- return new ArrayList<AMQConnectionModel>(_registry);
+ synchronized (this)
+ {
+ return new ArrayList<AMQConnectionModel>(_registry);
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index 954c448b72..76d97e3ad1 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -44,4 +44,13 @@ public interface IConnectionRegistry
public void registerConnection(AMQConnectionModel connnection);
public void deregisterConnection(AMQConnectionModel connnection);
+
+ void addRegistryChangeListener(RegistryChangeListener listener);
+
+ interface RegistryChangeListener
+ {
+ void connectionRegistered(AMQConnectionModel connection);
+ void connectionUnregistered(AMQConnectionModel connection);
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
index 081f2bbca3..d3823a71a0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
@@ -26,3 +26,5 @@ RECOVERY_START = MST-1004 : Recovery Start
RECOVERED = MST-1005 : Recovered {0,number} messages
RECOVERY_COMPLETE = MST-1006 : Recovery Complete
PASSIVATE = MST-1007 : Store Passivated
+OVERFULL = MST-1008 : Store overfull, flow control will be enforced
+UNDERFULL = MST-1009 : Store overfull condition cleared
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 5af3899890..b7fd2387a5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -32,23 +32,27 @@ public interface AMQConnectionModel extends StatisticsGatherer
{
/**
* get a unique id for this connection.
- *
+ *
* @return a {@link UUID} representing the connection
*/
public UUID getId();
-
+
/**
* Close the underlying Connection
- *
+ *
* @param cause
* @param message
* @throws org.apache.qpid.AMQException
*/
public void close(AMQConstant cause, String message) throws AMQException;
+ public void block();
+
+ public void unblock();
+
/**
* Close the given requested Session
- *
+ *
* @param session
* @param cause
* @param message
@@ -57,10 +61,10 @@ public interface AMQConnectionModel extends StatisticsGatherer
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
-
+
/**
* Get a list of all sessions using this connection.
- *
+ *
* @return a list of {@link AMQSessionModel}s
*/
public List<AMQSessionModel> getSessionModels();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index b750b29952..ae5ede5e82 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,14 +20,46 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.JMException;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -66,24 +98,6 @@ import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -160,6 +174,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
private volatile boolean _deferFlush;
private long _lastReceivedTime;
+ private boolean _blocking;
public ManagedObject getManagedObject()
{
@@ -633,7 +648,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
}
else
{
- _channelMap.put(channel.getChannelId(), channel);
+ synchronized (_channelMap)
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+ }
}
if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
@@ -641,6 +659,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
_cachedChannels[channelId] = channel;
}
+ if(_blocking)
+ {
+ channel.block();
+ }
+
checkForNotification();
}
@@ -735,10 +758,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
*/
public void removeChannel(int channelId)
{
- _channelMap.remove(channelId);
- if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ synchronized (_channelMap)
{
- _cachedChannels[channelId] = null;
+ _channelMap.remove(channelId);
+
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
}
@@ -767,8 +794,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
{
channel.close();
}
-
- _channelMap.clear();
+ synchronized (_channelMap)
+ {
+ _channelMap.clear();
+ }
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
_cachedChannels[i] = null;
@@ -1337,6 +1366,36 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr
(Throwable) null));
}
+ public void block()
+ {
+ synchronized (_channelMap)
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.block();
+ }
+ }
+ }
+ }
+
+ public void unblock()
+ {
+ synchronized (_channelMap)
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.unblock();
+ }
+ }
+ }
+ }
+
public boolean isClosed()
{
return _closed;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index fa171815ca..0896499cda 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -42,21 +42,21 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
public AMQConnectionModel getConnectionModel();
public String getClientID();
-
+
public void close() throws AMQException;
public LogSubject getLogSubject();
-
+
/**
* This method is called from the housekeeping thread to check the status of
* transactions on this session and react appropriately.
- *
+ *
* If a transaction is open for too long or idle for too long then a warning
* is logged or the connection is closed, depending on the configuration. An open
* transaction is one that has recent activity. The transaction age is counted
- * from the time the transaction was started. An idle transaction is one that
+ * from the time the transaction was started. An idle transaction is one that
* has had no activity, such as publishing or acknowledgeing messages.
- *
+ *
* @param openWarn time in milliseconds before alerting on open transaction
* @param openClose time in milliseconds before closing connection with open transaction
* @param idleWarn time in milliseconds before alerting on idle transaction
@@ -68,6 +68,10 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
void unblock(AMQQueue queue);
+ void block();
+
+ void unblock();
+
boolean onSameConnection(InboundMessage inbound);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
index bbde11ab4c..9b5ceef35f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
@@ -28,5 +28,7 @@ public enum Event
BEFORE_PASSIVATE,
AFTER_PASSIVATE,
BEFORE_CLOSE,
- AFTER_CLOSE
+ AFTER_CLOSE,
+ PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL,
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
index caff17daa5..4ab1a3ab05 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
@@ -33,7 +33,14 @@ public class OperationalLoggingListener implements EventListener
private OperationalLoggingListener(final MessageStore store, LogSubject logSubject)
{
_logSubject = logSubject;
- store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE);
+ store.addEventListener(this,
+ Event.BEFORE_INIT,
+ Event.AFTER_INIT,
+ Event.BEFORE_ACTIVATE,
+ Event.AFTER_ACTIVATE,
+ Event.AFTER_CLOSE,
+ Event.PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
_store = store;
}
@@ -62,7 +69,13 @@ public class OperationalLoggingListener implements EventListener
case AFTER_CLOSE:
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
break;
-
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL());
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL());
+ break;
+
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 0371cdcfcb..36351cc426 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -31,6 +31,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Blob;
+import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
@@ -89,6 +90,9 @@ public class DerbyMessageStore implements MessageStore
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
+ public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+ public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
+
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
@@ -234,6 +238,11 @@ public class DerbyMessageStore implements MessageStore
private final EventManager _eventManager = new EventManager();
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -308,7 +317,24 @@ public class DerbyMessageStore implements MessageStore
_storeLocation = databasePath;
+ _persistentSizeHighThreshold = storeConfiguration.getLong(OVERFULL_SIZE_PROPERTY, -1l);
+ _persistentSizeLowThreshold = storeConfiguration.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
createOrOpenDatabase(name, databasePath);
+
+ Connection conn = newAutoCommitConnection();;
+ try
+ {
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ finally
+ {
+ conn.close();
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -1921,6 +1947,7 @@ public class DerbyMessageStore implements MessageStore
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
private DerbyTransaction()
@@ -1938,18 +1965,19 @@ public class DerbyMessageStore implements MessageStore
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredDerbyMessage)
{
try
{
- ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
}
}
-
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1964,12 +1992,15 @@ public class DerbyMessageStore implements MessageStore
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
}
@Override
@@ -2111,6 +2142,7 @@ public class DerbyMessageStore implements MessageStore
conn.commit();
conn.close();
+ storedSizeChange(getMetaData().getContentSize());
}
}
catch (SQLException e)
@@ -2150,7 +2182,9 @@ public class DerbyMessageStore implements MessageStore
@Override
public void remove()
{
+ int delta = getMetaData().getContentSize();
DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
}
}
@@ -2446,4 +2480,174 @@ public class DerbyMessageStore implements MessageStore
}
return results;
}
+
+ private synchronized void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Exception will processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closePreparedStatement(cs);
+ }
+
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index c38f3d0761..77d07e49f3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -74,7 +74,8 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
private ServerConnectionMBean _mBean;
private VirtualHost _virtualHost;
private AtomicLong _lastIoTime = new AtomicLong();
-
+ private boolean _blocking;
+
public ServerConnection(final long connectionId)
{
_connectionId = connectionId;
@@ -100,12 +101,12 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
protected void setState(State state)
{
super.setState(state);
-
+
if (state == State.OPEN)
{
if (_onOpenTask != null)
{
- _onOpenTask.run();
+ _onOpenTask.run();
}
_actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true));
@@ -193,7 +194,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
((ServerSession)session).close();
}
-
+
public LogSubject getLogSubject()
{
return (LogSubject) this;
@@ -286,6 +287,46 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
close(replyCode, message);
}
+ public synchronized void block()
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.block();
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.unblock();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void registerSession(final Session ssn)
+ {
+ super.registerSession(ssn);
+ if(_blocking)
+ {
+ ((ServerSession)ssn).block();
+ }
+ }
+
+ @Override
+ public synchronized void removeSession(final Session ssn)
+ {
+ super.removeSession(ssn);
+ }
+
public List<AMQSessionModel> getSessionModels()
{
List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
@@ -315,27 +356,27 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
}
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
@@ -348,7 +389,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
_dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
_messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 5460c89eab..0d8036ec3a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -70,7 +70,7 @@ public class ServerConnectionDelegate extends ServerDelegate
String localFQDN)
{
super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
-
+
_appRegistry = appRegistry;
_localFQDN = localFQDN;
_maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
@@ -118,8 +118,8 @@ public class ServerConnectionDelegate extends ServerDelegate
{
final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
final ServerConnection sconn = (ServerConnection) conn;
-
-
+
+
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
tuneAuthorizedConnection(sconn);
@@ -168,7 +168,7 @@ public class ServerConnectionDelegate extends ServerDelegate
vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
-
+
if(vhost != null)
{
sconn.setVirtualHost(vhost);
@@ -194,7 +194,7 @@ public class ServerConnectionDelegate extends ServerDelegate
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
}
-
+
}
@Override
@@ -216,7 +216,7 @@ public class ServerConnectionDelegate extends ServerDelegate
setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
-
+
@Override
protected int getHeartbeatMax()
{
@@ -265,7 +265,9 @@ public class ServerConnectionDelegate extends ServerDelegate
if(isSessionNameUnique(atc.getName(), conn))
{
super.sessionAttach(conn, atc);
- ((ServerConnection)conn).checkForNotification();
+ final ServerConnection serverConnection = (ServerConnection) conn;
+
+ serverConnection.checkForNotification();
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 6f979e035e..d4631ae675 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.transport;
+import java.util.Collections;
+import java.util.HashSet;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_0_10;
@@ -40,7 +42,6 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -90,12 +91,12 @@ import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session
- implements AuthorizationHolder, SessionConfig,
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
@@ -105,7 +106,7 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -167,9 +168,19 @@ public class ServerSession extends Session
if (state == State.OPEN)
{
_actor.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ private void invokeBlock()
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -455,7 +466,7 @@ public class ServerSession extends Session
{
return _transaction.isTransactional();
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -630,7 +641,7 @@ public class ServerSession extends Session
{
return getConnection().getAuthorizedPrincipal();
}
-
+
public Subject getAuthorizedSubject()
{
return getConnection().getAuthorizedSubject();
@@ -781,37 +792,65 @@ public class ServerSession extends Session
public void block(AMQQueue queue)
{
+ block(queue, queue.getName());
+ }
- if(_blockingQueues.add(queue))
- {
+ public void block()
+ {
+ block(this, "** All Queues **");
+ }
- if(_blocking.compareAndSet(false,true))
+
+ private void block(Object queue, String name)
+ {
+ synchronized (_blockingEntities)
+ {
+ if(_blockingEntities.add(queue))
{
- invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
- invoke(new MessageStop(""));
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
- }
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ if(getState() == State.OPEN)
+ {
+ invokeBlock();
+ }
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ }
+ }
}
}
public void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ unblock((Object)queue);
+ }
+
+ public void unblock()
+ {
+ unblock(this);
+ }
+
+ private void unblock(Object queue)
+ {
+ synchronized(_blockingEntities)
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
+ if(_blocking.compareAndSet(true,false) && !isClosing())
+ {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 79a8bc0e4c..85ea97c107 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -99,9 +99,9 @@ public class ServerSessionDelegate extends SessionDelegate
Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
if(newOutstanding == null || newOutstanding == asyncCommandMark)
{
- session.processed(method);
+ session.processed(method);
}
-
+
if(newOutstanding != null)
{
((ServerSession)session).completeAsyncCommands();
@@ -240,13 +240,13 @@ public class ServerSessionDelegate extends SessionDelegate
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
-
+
FilterManager filterManager = null;
- try
+ try
{
filterManager = FilterManagerFactory.createManager(method.getArguments());
- }
- catch (AMQException amqe)
+ }
+ catch (AMQException amqe)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
return;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate extends SessionDelegate
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
- creditManager,
+ creditManager,
filterManager,
method.getArguments());
@@ -297,13 +297,13 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
-
+
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
exception(ssn, xfr, errorCode, description);
-
+
return;
}
@@ -807,7 +807,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
- // TODO decouple AMQException and AMQConstant error codes
+ // TODO decouple AMQException and AMQConstant error codes
private void exception(Session session, Method method, AMQException exception, String message)
{
ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
@@ -823,7 +823,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
}
String description = message + "': " + exception.getMessage();
-
+
exception(session, method, errorCode, description);
}
@@ -1349,9 +1349,9 @@ public class ServerSessionDelegate extends SessionDelegate
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-
+
exception(session, method, errorCode, description);
-
+
return;
}
}
@@ -1436,7 +1436,7 @@ public class ServerSessionDelegate extends SessionDelegate
else
{
VirtualHost virtualHost = getVirtualHost(session);
-
+
try
{
queue.delete();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 489b985222..c59016173a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -74,7 +74,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
int getHouseKeepingPoolSize();
- void setHouseKeepingPoolSize(int newSize);
+ void setHouseKeepingPoolSize(int newSize);
int getHouseKeepingActiveCount();
@@ -102,4 +102,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
State getState();
+
+ public void block();
+
+ public void unblock();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 9b113525d4..b05025467d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -80,7 +80,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class VirtualHostImpl implements VirtualHost
+public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
@@ -129,6 +129,7 @@ public class VirtualHostImpl implements VirtualHost
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+ private boolean _blocked;
public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
@@ -157,6 +158,7 @@ public class VirtualHostImpl implements VirtualHost
_securityManager.configureHostPlugins(_vhostConfig);
_connectionRegistry = new ConnectionRegistry();
+ _connectionRegistry.addRegistryChangeListener(this);
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
@@ -178,6 +180,9 @@ public class VirtualHostImpl implements VirtualHost
activateNonHAMessageStore();
initialiseStatistics();
+
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
public IConnectionRegistry getConnectionRegistry()
@@ -558,7 +563,7 @@ public class VirtualHostImpl implements VirtualHost
{
return _bindingFactory;
}
-
+
public void registerMessageDelivered(long messageSize)
{
if (isStatisticsEnabled())
@@ -568,7 +573,7 @@ public class VirtualHostImpl implements VirtualHost
}
_appRegistry.registerMessageDelivered(messageSize);
}
-
+
public void registerMessageReceived(long messageSize, long timestamp)
{
if (isStatisticsEnabled())
@@ -578,34 +583,34 @@ public class VirtualHostImpl implements VirtualHost
}
_appRegistry.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
-
+
for (AMQConnectionModel connection : _connectionRegistry.getConnections())
{
connection.resetStatistics();
@@ -616,7 +621,7 @@ public class VirtualHostImpl implements VirtualHost
{
setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
_appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
+
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
_dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
_messagesReceived = new StatisticsCounter("messages-received-" + getName());
@@ -699,18 +704,72 @@ public class VirtualHostImpl implements VirtualHost
return _dtxRegistry;
}
- @Override
public String toString()
{
return _name;
}
- @Override
public State getState()
{
return _state;
}
+ public void block()
+ {
+ synchronized (_connectionRegistry)
+ {
+ if(!_blocked)
+ {
+ _blocked = true;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.block();
+ }
+ }
+ }
+ }
+
+
+ public void unblock()
+ {
+ synchronized (_connectionRegistry)
+ {
+ if(_blocked)
+ {
+ _blocked = false;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.unblock();
+ }
+ }
+ }
+ }
+
+ public void connectionRegistered(final AMQConnectionModel connection)
+ {
+ if(_blocked)
+ {
+ connection.block();
+ }
+ }
+
+ public void connectionUnregistered(final AMQConnectionModel connection)
+ {
+ }
+
+ public void event(final Event event)
+ {
+ switch(event)
+ {
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ block();
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ unblock();
+ break;
+ }
+ }
+
/**
* Virtual host JMX MBean class.
@@ -750,7 +809,8 @@ public class VirtualHostImpl implements VirtualHost
{
_exchangeRegistry.initialise();
initialiseModel(_vhostConfig);
- } catch (Exception e)
+ }
+ catch (Exception e)
{
throw new RuntimeException("Failed to initialise virtual host after state change", e);
}
@@ -766,7 +826,8 @@ public class VirtualHostImpl implements VirtualHost
try
{
_brokerMBean.register();
- } catch (JMException e)
+ }
+ catch (JMException e)
{
throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
}
@@ -777,8 +838,6 @@ public class VirtualHostImpl implements VirtualHost
public class BeforePassivationListener implements EventListener
{
-
- @Override
public void event(Event event)
{
_connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 91174c5d10..58c7625ad6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -282,9 +282,16 @@ public class MockVirtualHost implements VirtualHost
}
- @Override
public State getState()
{
return State.ACTIVE;
}
+
+ public void block()
+ {
+ }
+
+ public void unblock()
+ {
+ }
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 55d3ccb6e7..c09b438424 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3130,6 +3130,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
_ticket = ticket;
}
+ public boolean isBrokerFlowControlled()
+ {
+ synchronized (_flowControl)
+ {
+ return _flowControl.getFlowControl();
+ }
+ }
+
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3902c726f3..06ee651a1e 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1408,5 +1408,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ public boolean isBrokerFlowControlled()
+ {
+ return _qpidSession.isFlowControlled();
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 110c73f718..06b606f2d3 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -210,6 +210,17 @@ public class Session extends SessionInvoker
}
}
+ protected State getState()
+ {
+ return this.state;
+ }
+
+ public boolean isFlowControlled()
+ {
+ return flowControl;
+ }
+
+
void setFlowControl(boolean value)
{
flowControl = value;
@@ -307,7 +318,7 @@ public class Session extends SessionInvoker
xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(),
header.getNonStandardProperties()));
}
-
+
}
else
{
@@ -616,7 +627,7 @@ public class Session extends SessionInvoker
{
acquireCredit();
}
-
+
synchronized (commandsLock)
{
if (state == DETACHED && m.isUnreliable())
@@ -732,11 +743,11 @@ public class Session extends SessionInvoker
{
sessionCommandPoint(0, 0);
}
-
+
boolean replayTransfer = !closing && !transacted &&
m instanceof MessageTransfer &&
! m.isUnreliable();
-
+
if ((replayTransfer) || m.hasCompletionListener())
{
setCommand(next, m);
@@ -833,7 +844,7 @@ public class Session extends SessionInvoker
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
{
- checkFailoverRequired("Session sync was interrupted by failover.");
+ checkFailoverRequired("Session sync was interrupted by failover.");
if(log.isDebugEnabled())
{
log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands);
@@ -871,7 +882,7 @@ public class Session extends SessionInvoker
{
future = results.remove(command);
}
-
+
if (future != null)
{
future.set(result);
@@ -1039,7 +1050,7 @@ public class Session extends SessionInvoker
}
}
- protected void awaitClose()
+ protected void awaitClose()
{
Waiter w = new Waiter(commandsLock, timeout);
while (w.hasTime() && state != CLOSED)
@@ -1096,7 +1107,7 @@ public class Session extends SessionInvoker
if(state == CLOSED)
{
- connection.removeSession(this);
+ connection.removeSession(this);
listener.closed(this);
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
new file mode 100644
index 0000000000..283fb4ed4c
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
@@ -0,0 +1,347 @@
+package org.apache.qpid.server.store;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StoreOverfullTest extends QpidBrokerTestCase
+{
+ private static final int TIMEOUT = 10000;
+ public static final int TEST_SIZE = 150;
+
+ private Connection _producerConnection;
+ private Connection _consumerConnection;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageProducer _producer;
+ private MessageConsumer _consumer;
+ private Queue _queue;
+
+ //private final AtomicInteger sentMessages = new AtomicInteger(0);
+
+ private static final int OVERFULL_SIZE = 4000000;
+ private static final int UNDERFULL_SIZE = 3500000;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size",
+ String.valueOf(OVERFULL_SIZE));
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size",
+ String.valueOf(UNDERFULL_SIZE));
+ setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000");
+ super.setUp();
+
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producerConnection.start();
+
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ _producerConnection.close();
+ _consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /*
+ * Test:
+ *
+ * Send > threshold amount of data : Sender is blocked
+ * Remove 90% of data : Sender is unblocked
+ *
+ */
+ public void testCapacityExceededCausesBlock() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ _queue = getTestQueue();
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ int mostMessages = (int) (0.9 * sentCount);
+ for(int i = 0; i < mostMessages; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ long targetTime = System.currentTimeMillis() + 5000l;
+ while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount);
+
+ for(int i = mostMessages; i < TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE);
+
+ }
+
+ /* Two producers on different queues
+ */
+
+ public void testCapacityExceededCausesBlockTwoConnections() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+ AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2");
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(queue2);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+
+ while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(queue2);
+ _consumerConnection.start();
+
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null
+ && consumer2.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+ }
+
+ /*
+ * New producers are blocked
+ */
+
+ public void testCapacityExceededCausesBlockNewConnection() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(_queue);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession)_producerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession)secondProducerSession).isBrokerFlowControlled())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(2000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+
+ }
+
+
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod,
+ AtomicInteger sentMessages)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
+ private final int _numMessages;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+ private long _sleepPeriod;
+ private final AtomicInteger _sentMessages;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ {
+ _senderProducer = producer;
+ _senderSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ _sentMessages = sentMessages;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ _exceptionThrownLatch.countDown();
+ }
+ }
+
+ public Exception awaitSenderException(long timeout) throws InterruptedException
+ {
+ _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return _exception;
+ }
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ sentMessages.incrementAndGet();
+
+
+ try
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static final byte[] BYTE_32K = new byte[32*1024];
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_32K);
+ send.setIntProperty("msg", msg);
+
+ return send;
+ }
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 1943049a7b..a8315cbb59 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -121,6 +121,8 @@ org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
org.apache.qpid.server.queue.ProducerFlowControlTest#*
+//QPID-3986 : Flow control invoked on total store disk usage
+org.apache.qpid.server.store.StoreOverfullTest#*
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index eb4c1f814d..28d39278b9 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -30,7 +30,6 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash
org.apache.qpid.test.unit.xa.TopicTest#testRecover
-
org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence
org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval
org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence
@@ -39,6 +38,7 @@ org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence
org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval
org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence
org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval
+org.apache.qpid.server.store.StoreOverfullTest#*
org.apache.qpid.server.store.berkeleydb.*