diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-05-07 22:40:52 +0000 |
commit | 9eab96a9a3569486f6351c94abf4f95ed515e9b1 (patch) | |
tree | ae86cedd9fdcea4f49993e5a82954ccda53a1ed3 | |
parent | 1427de0275b5db2c8619db9211435897123259d8 (diff) | |
download | qpid-python-9eab96a9a3569486f6351c94abf4f95ed515e9b1.tar.gz |
QPID-3986 : [Java Broker] Add producer flow control based on total disk usage
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1335290 13f79535-47bb-0310-9956-ffa450edef68
26 files changed, 1166 insertions, 154 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index fb1d7c5265..c60e9d14f2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import com.sleepycat.bind.tuple.ByteBinding; import com.sleepycat.bind.tuple.LongBinding; +import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; @@ -58,6 +59,7 @@ import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecovery import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; import org.apache.qpid.server.store.ConfiguredObjectHelper; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; @@ -74,7 +76,6 @@ import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogRecoveryHandler; import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction; import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey; import org.apache.qpid.server.store.berkeleydb.entry.Xid; @@ -97,6 +98,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore public static final int VERSION = 6; public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path"; + public static final String OVERFULL_SIZE_PROPERTY = "overfull-size"; + public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size"; private Environment _environment; @@ -152,7 +155,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore private TransactionLogRecoveryHandler _tlogRecoveryHandler; private ConfigurationRecoveryHandler _configRecoveryHandler; - + + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private final EventManager _eventManager = new EventManager(); private String _storeLocation; @@ -163,6 +171,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore _stateManager = new StateManager(_eventManager); } + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration storeConfiguration) throws Exception @@ -171,7 +185,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore _configRecoveryHandler = recoveryHandler; - configure(name,storeConfiguration); + configure(name, storeConfiguration); @@ -218,6 +232,13 @@ public abstract class AbstractBDBMessageStore implements MessageStore final String storeLocation = storeConfig.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK") + File.separator + "bdbstore" + File.separator + name); + _persistentSizeHighThreshold = storeConfig.getLong(OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = storeConfig.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeLowThreshold; + } + File environmentPath = new File(storeLocation); if (!environmentPath.exists()) { @@ -235,6 +256,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore setupStore(environmentPath, name); } + @Override + public String getStoreLocation() + { + return _storeLocation; + } + /** * Move the store state from INITIAL to ACTIVE without actually recovering. * @@ -257,6 +284,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore new Upgrader(_environment, name).upgradeIfNecessary(); openDatabases(); + + _totalStoreSize = getSizeOnDisk(); } protected Environment createEnvironment(File environmentPath) throws DatabaseException @@ -553,6 +582,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore StorableMessageMetaData metaData = valueBinding.entryToObject(value); StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false); + mrh.message(message); maxId = Math.max(maxId, messageId); @@ -953,7 +983,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore UUIDTupleBinding.getInstance().objectToEntry(link.getId(), key); DatabaseEntry value = new DatabaseEntry(); - LongBinding.longToEntry(link.getCreateTime(),value); + LongBinding.longToEntry(link.getCreateTime(), value); StringMapBinding.getInstance().objectToEntry(link.getArguments(), value); try @@ -1247,7 +1277,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore /** * Primarily for testing purposes. * - * @param queueName + * @param queueId * * @return a list of message ids for messages enqueued for a particular queue */ @@ -1698,7 +1728,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore synchronized void store(com.sleepycat.je.Transaction txn) { - if(_metaData != null) + if(unstored()) { try { @@ -1728,14 +1758,19 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + private boolean unstored() + { + return _metaData != null; + } + public synchronized StoreFuture flushToStore() { - if(_metaData != null) + if(unstored()) { com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null); store(txn); AbstractBDBMessageStore.this.commit(txn,true); - + storedSizeChange(getMetaData().getContentSize()); } return StoreFuture.IMMEDIATE_FUTURE; } @@ -1744,7 +1779,10 @@ public abstract class AbstractBDBMessageStore implements MessageStore { try { + int delta = getMetaData().getContentSize(); AbstractBDBMessageStore.this.removeMessage(_messageId, false); + storedSizeChange(-delta); + } catch (AMQStoreException e) { @@ -1756,6 +1794,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private class BDBTransaction implements org.apache.qpid.server.store.Transaction { private com.sleepycat.je.Transaction _txn; + private int _storeSizeIncrease; private BDBTransaction() { @@ -1773,7 +1812,9 @@ public abstract class AbstractBDBMessageStore implements MessageStore { if(message.getStoredMessage() instanceof StoredBDBMessage) { - ((StoredBDBMessage)message.getStoredMessage()).store(_txn); + final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); + storedMessage.store(_txn); + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, message.getMessageNumber()); @@ -1787,10 +1828,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void commitTran() throws AMQStoreException { AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); } public StoreFuture commitTranAsync() throws AMQStoreException { + AbstractBDBMessageStore.this.storedSizeChange(_storeSizeIncrease); return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); } @@ -1811,15 +1854,84 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - @Override - public void addEventListener(EventListener eventListener, Event... events) + private void storedSizeChange(final int delta) { - _eventManager.addEventListener(eventListener, events); + if(getPersistentSizeHighThreshold() > 0) + { + synchronized (this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 2*delta; + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + _totalStoreSize = getSizeOnDisk(); + + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + _totalStoreSize = getSizeOnDisk(); + + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(); + + _totalStoreSize = getSizeOnDisk(); + + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + } } - @Override - public String getStoreLocation() + private void reduceSizeOnDisk() { - return _storeLocation; + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "false"); + boolean cleaned = false; + while (_environment.cleanLog() > 0) + { + cleaned = true; + } + if (cleaned) + { + CheckpointConfig force = new CheckpointConfig(); + force.setForce(true); + _environment.checkpoint(force); + } + + + _environment.getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); + } + + private long getSizeOnDisk() + { + return _environment.getStats(null).getTotalLogSize(); + } + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index 9f7eb4bfd9..b414441b92 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -21,12 +21,19 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.store.Event; +import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -49,6 +56,8 @@ public class BDBMessageStore extends AbstractBDBMessageStore private final CommitThread _commitThread = new CommitThread("Commit-Thread"); + private final Map<Event, List<EventListener>> _eventListeners = new HashMap<Event, List<EventListener>>(); + @Override protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException { @@ -64,6 +73,17 @@ public class BDBMessageStore extends AbstractBDBMessageStore // This is what allows the creation of the store if it does not already exist. envConfig.setAllowCreate(true); envConfig.setTransactional(true); + + Properties props = System.getProperties(); + + for(String propName : props.stringPropertyNames()) + { + if(propName.startsWith("qpid.bdb.envconfig.je.")) + { + envConfig.setConfigParam(propName.substring(19), props.getProperty(propName)); + } + } + envConfig.setConfigParam("je.lock.nLockTables", "7"); // Added to help diagnosis of Deadlock issue diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 8198cec821..4fd4e02220 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -23,7 +23,9 @@ package org.apache.qpid.server; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -32,10 +34,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; @@ -89,7 +89,6 @@ import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; -import org.apache.qpid.server.subscription.SubscriptionImpl; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -157,7 +156,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm private final AMQProtocolSession _session; private AtomicBoolean _closing = new AtomicBoolean(false); - private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); + private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); private final AtomicBoolean _blocking = new AtomicBoolean(false); @@ -1357,9 +1356,34 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm return _actor; } - public void block(AMQQueue queue) + public synchronized void block() + { + if(_blockingEntities.add(this)) + { + if(_blocking.compareAndSet(false,true)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); + flow(false); + } + } + } + + public synchronized void unblock() + { + if(_blockingEntities.remove(this)) + { + if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) + { + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + + flow(true); + } + } + } + + public synchronized void block(AMQQueue queue) { - if(_blockingQueues.add(queue)) + if(_blockingEntities.add(queue)) { if(_blocking.compareAndSet(false,true)) @@ -1370,11 +1394,11 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } - public void unblock(AMQQueue queue) + public synchronized void unblock(AMQQueue queue) { - if(_blockingQueues.remove(queue)) + if(_blockingEntities.remove(queue)) { - if(_blocking.compareAndSet(true,false) && !isClosing()) + if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java index 10e40151b0..aaa1766489 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java @@ -61,7 +61,6 @@ public class TopicConfig extends ConfigurationPlugin throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element."); } - System.err.println("********* Created TC:"+this); } @@ -75,5 +74,5 @@ public class TopicConfig extends ConfigurationPlugin } return response; - } + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index 4a58314f51..09dc5a2473 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.TransportException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -37,6 +38,8 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); private Logger _logger = Logger.getLogger(ConnectionRegistry.class); + private final Collection<RegistryChangeListener> _listeners = + new ArrayList<RegistryChangeListener>(); public void initialise() { @@ -80,16 +83,48 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable public void registerConnection(AMQConnectionModel connnection) { - _registry.add(connnection); + synchronized (this) + { + _registry.add(connnection); + synchronized (_listeners) + { + for(RegistryChangeListener listener : _listeners) + { + listener.connectionRegistered(connnection); + } + } + } } public void deregisterConnection(AMQConnectionModel connnection) { - _registry.remove(connnection); + synchronized (this) + { + _registry.remove(connnection); + + synchronized (_listeners) + { + for(RegistryChangeListener listener : _listeners) + { + listener.connectionUnregistered(connnection); + } + } + } + } + + public void addRegistryChangeListener(RegistryChangeListener listener) + { + synchronized (_listeners) + { + _listeners.add(listener); + } } public List<AMQConnectionModel> getConnections() { - return new ArrayList<AMQConnectionModel>(_registry); + synchronized (this) + { + return new ArrayList<AMQConnectionModel>(_registry); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java index 954c448b72..76d97e3ad1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java @@ -44,4 +44,13 @@ public interface IConnectionRegistry public void registerConnection(AMQConnectionModel connnection); public void deregisterConnection(AMQConnectionModel connnection); + + void addRegistryChangeListener(RegistryChangeListener listener); + + interface RegistryChangeListener + { + void connectionRegistered(AMQConnectionModel connection); + void connectionUnregistered(AMQConnectionModel connection); + + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties index 081f2bbca3..d3823a71a0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties @@ -26,3 +26,5 @@ RECOVERY_START = MST-1004 : Recovery Start RECOVERED = MST-1005 : Recovered {0,number} messages RECOVERY_COMPLETE = MST-1006 : Recovery Complete PASSIVATE = MST-1007 : Store Passivated +OVERFULL = MST-1008 : Store overfull, flow control will be enforced +UNDERFULL = MST-1009 : Store overfull condition cleared diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 5af3899890..b7fd2387a5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -32,23 +32,27 @@ public interface AMQConnectionModel extends StatisticsGatherer { /** * get a unique id for this connection. - * + * * @return a {@link UUID} representing the connection */ public UUID getId(); - + /** * Close the underlying Connection - * + * * @param cause * @param message * @throws org.apache.qpid.AMQException */ public void close(AMQConstant cause, String message) throws AMQException; + public void block(); + + public void unblock(); + /** * Close the given requested Session - * + * * @param session * @param cause * @param message @@ -57,10 +61,10 @@ public interface AMQConnectionModel extends StatisticsGatherer public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException; public long getConnectionId(); - + /** * Get a list of all sessions using this connection. - * + * * @return a list of {@link AMQSessionModel}s */ public List<AMQSessionModel> getSessionModels(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index b750b29952..ae5ede5e82 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -20,14 +20,46 @@ */ package org.apache.qpid.server.protocol; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.security.Principal; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.management.JMException; +import javax.security.auth.Subject; +import javax.security.sasl.SaslServer; import org.apache.log4j.Logger; - import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.codec.AMQCodecFactory; -import org.apache.qpid.framing.*; +import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQProtocolHeaderException; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.ChannelCloseBody; +import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.HeartbeatBody; +import org.apache.qpid.framing.MethodDispatcher; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolInitiation; +import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -66,24 +98,6 @@ import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.util.BytesDataOutput; -import javax.management.JMException; -import javax.security.auth.Subject; -import javax.security.sasl.SaslServer; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.security.Principal; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.atomic.AtomicBoolean; - public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); @@ -160,6 +174,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr private volatile boolean _deferFlush; private long _lastReceivedTime; + private boolean _blocking; public ManagedObject getManagedObject() { @@ -633,7 +648,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr } else { - _channelMap.put(channel.getChannelId(), channel); + synchronized (_channelMap) + { + _channelMap.put(channel.getChannelId(), channel); + } } if (((channelId & CHANNEL_CACHE_SIZE) == channelId)) @@ -641,6 +659,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _cachedChannels[channelId] = channel; } + if(_blocking) + { + channel.block(); + } + checkForNotification(); } @@ -735,10 +758,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ public void removeChannel(int channelId) { - _channelMap.remove(channelId); - if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + synchronized (_channelMap) { - _cachedChannels[channelId] = null; + _channelMap.remove(channelId); + + if ((channelId & CHANNEL_CACHE_SIZE) == channelId) + { + _cachedChannels[channelId] = null; + } } } @@ -767,8 +794,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr { channel.close(); } - - _channelMap.clear(); + synchronized (_channelMap) + { + _channelMap.clear(); + } for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++) { _cachedChannels[i] = null; @@ -1337,6 +1366,36 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr (Throwable) null)); } + public void block() + { + synchronized (_channelMap) + { + if(!_blocking) + { + _blocking = true; + for(AMQChannel channel : _channelMap.values()) + { + channel.block(); + } + } + } + } + + public void unblock() + { + synchronized (_channelMap) + { + if(_blocking) + { + _blocking = false; + for(AMQChannel channel : _channelMap.values()) + { + channel.unblock(); + } + } + } + } + public boolean isClosed() { return _closed; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index fa171815ca..0896499cda 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -42,21 +42,21 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> public AMQConnectionModel getConnectionModel(); public String getClientID(); - + public void close() throws AMQException; public LogSubject getLogSubject(); - + /** * This method is called from the housekeeping thread to check the status of * transactions on this session and react appropriately. - * + * * If a transaction is open for too long or idle for too long then a warning * is logged or the connection is closed, depending on the configuration. An open * transaction is one that has recent activity. The transaction age is counted - * from the time the transaction was started. An idle transaction is one that + * from the time the transaction was started. An idle transaction is one that * has had no activity, such as publishing or acknowledgeing messages. - * + * * @param openWarn time in milliseconds before alerting on open transaction * @param openClose time in milliseconds before closing connection with open transaction * @param idleWarn time in milliseconds before alerting on idle transaction @@ -68,6 +68,10 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> void unblock(AMQQueue queue); + void block(); + + void unblock(); + boolean onSameConnection(InboundMessage inbound); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java index bbde11ab4c..9b5ceef35f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/Event.java @@ -28,5 +28,7 @@ public enum Event BEFORE_PASSIVATE, AFTER_PASSIVATE, BEFORE_CLOSE, - AFTER_CLOSE + AFTER_CLOSE, + PERSISTENT_MESSAGE_SIZE_OVERFULL, + PERSISTENT_MESSAGE_SIZE_UNDERFULL, } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java index caff17daa5..4ab1a3ab05 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java @@ -33,7 +33,14 @@ public class OperationalLoggingListener implements EventListener private OperationalLoggingListener(final MessageStore store, LogSubject logSubject) { _logSubject = logSubject; - store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE); + store.addEventListener(this, + Event.BEFORE_INIT, + Event.AFTER_INIT, + Event.BEFORE_ACTIVATE, + Event.AFTER_ACTIVATE, + Event.AFTER_CLOSE, + Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, + Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); _store = store; } @@ -62,7 +69,13 @@ public class OperationalLoggingListener implements EventListener case AFTER_CLOSE: CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED()); break; - + case PERSISTENT_MESSAGE_SIZE_OVERFULL: + CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL()); + break; + case PERSISTENT_MESSAGE_SIZE_UNDERFULL: + CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL()); + break; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 0371cdcfcb..36351cc426 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -31,6 +31,7 @@ import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Blob; +import java.sql.CallableStatement; import java.sql.Connection; import java.sql.Driver; import java.sql.DriverManager; @@ -89,6 +90,9 @@ public class DerbyMessageStore implements MessageStore private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class); + public static final String OVERFULL_SIZE_PROPERTY = "overfull-size"; + public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size"; + private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver"; private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; @@ -234,6 +238,11 @@ public class DerbyMessageStore implements MessageStore private final EventManager _eventManager = new EventManager(); + private long _totalStoreSize; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + private MessageStoreRecoveryHandler _messageRecoveryHandler; private TransactionLogRecoveryHandler _tlogRecoveryHandler; @@ -308,7 +317,24 @@ public class DerbyMessageStore implements MessageStore _storeLocation = databasePath; + _persistentSizeHighThreshold = storeConfiguration.getLong(OVERFULL_SIZE_PROPERTY, -1l); + _persistentSizeLowThreshold = storeConfiguration.getLong(UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold); + if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + createOrOpenDatabase(name, databasePath); + + Connection conn = newAutoCommitConnection();; + try + { + _totalStoreSize = getSizeOnDisk(conn); + } + finally + { + conn.close(); + } } private static synchronized void initialiseDriver() throws ClassNotFoundException @@ -1921,6 +1947,7 @@ public class DerbyMessageStore implements MessageStore private class DerbyTransaction implements Transaction { private final ConnectionWrapper _connWrapper; + private int _storeSizeIncrease; private DerbyTransaction() @@ -1938,18 +1965,19 @@ public class DerbyMessageStore implements MessageStore @Override public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException { - if(message.getStoredMessage() instanceof StoredDerbyMessage) + final StoredMessage storedMessage = message.getStoredMessage(); + if(storedMessage instanceof StoredDerbyMessage) { try { - ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection()); + ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection()); } catch (SQLException e) { throw new AMQStoreException("Exception on enqueuing message " + _messageId, e); } } - + _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber()); } @@ -1964,12 +1992,15 @@ public class DerbyMessageStore implements MessageStore public void commitTran() throws AMQStoreException { DerbyMessageStore.this.commitTran(_connWrapper); + storedSizeChange(_storeSizeIncrease); } @Override public StoreFuture commitTranAsync() throws AMQStoreException { - return DerbyMessageStore.this.commitTranAsync(_connWrapper); + final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper); + storedSizeChange(_storeSizeIncrease); + return storeFuture; } @Override @@ -2111,6 +2142,7 @@ public class DerbyMessageStore implements MessageStore conn.commit(); conn.close(); + storedSizeChange(getMetaData().getContentSize()); } } catch (SQLException e) @@ -2150,7 +2182,9 @@ public class DerbyMessageStore implements MessageStore @Override public void remove() { + int delta = getMetaData().getContentSize(); DerbyMessageStore.this.removeMessage(_messageId); + storedSizeChange(-delta); } } @@ -2446,4 +2480,174 @@ public class DerbyMessageStore implements MessageStore } return results; } + + private synchronized void storedSizeChange(final int delta) + { + if(getPersistentSizeHighThreshold() > 0) + { + synchronized(this) + { + // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every + // time, so we do so only when there's been enough change that it is worth looking again. We do this by + // assuming the total size will change by less than twice the amount of the message data change. + long newSize = _totalStoreSize += 3*delta; + + Connection conn = null; + try + { + + if(!_limitBusted && newSize > getPersistentSizeHighThreshold()) + { + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(_totalStoreSize > getPersistentSizeHighThreshold()) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + } + else if(_limitBusted && newSize < getPersistentSizeLowThreshold()) + { + long oldSize = _totalStoreSize; + conn = newAutoCommitConnection(); + _totalStoreSize = getSizeOnDisk(conn); + if(oldSize <= _totalStoreSize) + { + + reduceSizeOnDisk(conn); + + _totalStoreSize = getSizeOnDisk(conn); + } + + if(_totalStoreSize < getPersistentSizeLowThreshold()) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Exception will processing store size change", e); + } + } + } + } + + private void reduceSizeOnDisk(Connection conn) + { + CallableStatement cs = null; + PreparedStatement stmt = null; + try + { + String tableQuery = + "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'"; + stmt = conn.prepareStatement(tableQuery); + ResultSet rs = null; + + List<String> schemas = new ArrayList<String>(); + List<String> tables = new ArrayList<String>(); + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + schemas.add(rs.getString(1)); + tables.add(rs.getString(2)); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + + cs = conn.prepareCall + ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)"); + + for(int i = 0; i < schemas.size(); i++) + { + cs.setString(1, schemas.get(i)); + cs.setString(2, tables.get(i)); + cs.setShort(3, (short) 0); + cs.execute(); + } + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error reducing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + closePreparedStatement(cs); + } + + } + + private long getSizeOnDisk(Connection conn) + { + PreparedStatement stmt = null; + try + { + String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" + + " FROM " + + " SYS.SYSTABLES systabs," + + " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" + + " WHERE systabs.tabletype = 'T'"; + + stmt = conn.prepareStatement(sizeQuery); + + ResultSet rs = null; + long size = 0l; + + try + { + rs = stmt.executeQuery(); + while(rs.next()) + { + size = rs.getLong(1); + } + } + finally + { + if(rs != null) + { + rs.close(); + } + } + + return size; + + } + catch (SQLException e) + { + closeConnection(conn); + throw new RuntimeException("Error establishing on disk size", e); + } + finally + { + closePreparedStatement(stmt); + } + + } + + + private long getPersistentSizeLowThreshold() + { + return _persistentSizeLowThreshold; + } + + private long getPersistentSizeHighThreshold() + { + return _persistentSizeHighThreshold; + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index c38f3d0761..77d07e49f3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -74,7 +74,8 @@ public class ServerConnection extends Connection implements Managable, AMQConnec private ServerConnectionMBean _mBean; private VirtualHost _virtualHost; private AtomicLong _lastIoTime = new AtomicLong(); - + private boolean _blocking; + public ServerConnection(final long connectionId) { _connectionId = connectionId; @@ -100,12 +101,12 @@ public class ServerConnection extends Connection implements Managable, AMQConnec protected void setState(State state) { super.setState(state); - + if (state == State.OPEN) { if (_onOpenTask != null) { - _onOpenTask.run(); + _onOpenTask.run(); } _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true)); @@ -193,7 +194,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec ((ServerSession)session).close(); } - + public LogSubject getLogSubject() { return (LogSubject) this; @@ -286,6 +287,46 @@ public class ServerConnection extends Connection implements Managable, AMQConnec close(replyCode, message); } + public synchronized void block() + { + if(!_blocking) + { + _blocking = true; + for(AMQSessionModel ssn : getSessionModels()) + { + ssn.block(); + } + } + } + + public synchronized void unblock() + { + if(_blocking) + { + _blocking = false; + for(AMQSessionModel ssn : getSessionModels()) + { + ssn.unblock(); + } + } + } + + @Override + public synchronized void registerSession(final Session ssn) + { + super.registerSession(ssn); + if(_blocking) + { + ((ServerSession)ssn).block(); + } + } + + @Override + public synchronized void removeSession(final Session ssn) + { + super.removeSession(ssn); + } + public List<AMQSessionModel> getSessionModels() { List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>(); @@ -315,27 +356,27 @@ public class ServerConnection extends Connection implements Managable, AMQConnec } _virtualHost.registerMessageReceived(messageSize, timestamp); } - + public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } - + public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } - + public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } - + public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } - + public void resetStatistics() { _messagesDelivered.reset(); @@ -348,7 +389,7 @@ public class ServerConnection extends Connection implements Managable, AMQConnec { setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled()); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId()); _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId()); _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId()); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 5460c89eab..0d8036ec3a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -70,7 +70,7 @@ public class ServerConnectionDelegate extends ServerDelegate String localFQDN) { super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales); - + _appRegistry = appRegistry; _localFQDN = localFQDN; _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); @@ -118,8 +118,8 @@ public class ServerConnectionDelegate extends ServerDelegate { final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response); final ServerConnection sconn = (ServerConnection) conn; - - + + if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus())) { tuneAuthorizedConnection(sconn); @@ -168,7 +168,7 @@ public class ServerConnectionDelegate extends ServerDelegate vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName); SecurityManager.setThreadSubject(sconn.getAuthorizedSubject()); - + if(vhost != null) { sconn.setVirtualHost(vhost); @@ -194,7 +194,7 @@ public class ServerConnectionDelegate extends ServerDelegate sconn.setState(Connection.State.CLOSING); sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'")); } - + } @Override @@ -216,7 +216,7 @@ public class ServerConnectionDelegate extends ServerDelegate setConnectionTuneOkChannelMax(sconn, okChannelMax); } - + @Override protected int getHeartbeatMax() { @@ -265,7 +265,9 @@ public class ServerConnectionDelegate extends ServerDelegate if(isSessionNameUnique(atc.getName(), conn)) { super.sessionAttach(conn, atc); - ((ServerConnection)conn).checkForNotification(); + final ServerConnection serverConnection = (ServerConnection) conn; + + serverConnection.checkForNotification(); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 6f979e035e..d4631ae675 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.transport; +import java.util.Collections; +import java.util.HashSet; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageMetaData_0_10; @@ -40,7 +42,6 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -90,12 +91,12 @@ import org.apache.qpid.transport.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerSession extends Session - implements AuthorizationHolder, SessionConfig, +public class ServerSession extends Session + implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; @@ -105,7 +106,7 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); + private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; @@ -167,9 +168,19 @@ public class ServerSession extends Session if (state == State.OPEN) { _actor.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } } } + private void invokeBlock() + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + } + private ConfigStore getConfigStore() { return getConnectionConfig().getConfigStore(); @@ -455,7 +466,7 @@ public class ServerSession extends Session { return _transaction.isTransactional(); } - + public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; @@ -630,7 +641,7 @@ public class ServerSession extends Session { return getConnection().getAuthorizedPrincipal(); } - + public Subject getAuthorizedSubject() { return getConnection().getAuthorizedSubject(); @@ -781,37 +792,65 @@ public class ServerSession extends Session public void block(AMQQueue queue) { + block(queue, queue.getName()); + } - if(_blockingQueues.add(queue)) - { + public void block() + { + block(this, "** All Queues **"); + } - if(_blocking.compareAndSet(false,true)) + + private void block(Object queue, String name) + { + synchronized (_blockingEntities) + { + if(_blockingEntities.add(queue)) { - invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); - invoke(new MessageStop("")); - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); - } + + if(_blocking.compareAndSet(false,true)) + { + if(getState() == State.OPEN) + { + invokeBlock(); + } + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + } + } } } public void unblock(AMQQueue queue) { - if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) + unblock((Object)queue); + } + + public void unblock() + { + unblock(this); + } + + private void unblock(Object queue) + { + synchronized(_blockingEntities) { - if(_blocking.compareAndSet(true,false) && !isClosing()) + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) { + if(_blocking.compareAndSet(true,false) && !isClosing()) + { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 79a8bc0e4c..85ea97c107 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -99,9 +99,9 @@ public class ServerSessionDelegate extends SessionDelegate Object newOutstanding = ((ServerSession)session).getAsyncCommandMark(); if(newOutstanding == null || newOutstanding == asyncCommandMark) { - session.processed(method); + session.processed(method); } - + if(newOutstanding != null) { ((ServerSession)session).completeAsyncCommands(); @@ -240,13 +240,13 @@ public class ServerSessionDelegate extends SessionDelegate } FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L); - + FilterManager filterManager = null; - try + try { filterManager = FilterManagerFactory.createManager(method.getArguments()); - } - catch (AMQException amqe) + } + catch (AMQException amqe) { exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager"); return; @@ -257,7 +257,7 @@ public class ServerSessionDelegate extends SessionDelegate method.getAcceptMode(), method.getAcquireMode(), MessageFlowMode.WINDOW, - creditManager, + creditManager, filterManager, method.getArguments()); @@ -297,13 +297,13 @@ public class ServerSessionDelegate extends SessionDelegate final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); messageMetaData.setConnectionReference(((ServerSession)ssn).getReference()); - + if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) { ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS; String description = "Permission denied: exchange-name '" + exchange.getName() + "'"; exception(ssn, xfr, errorCode, description); - + return; } @@ -807,7 +807,7 @@ public class ServerSessionDelegate extends SessionDelegate } } - // TODO decouple AMQException and AMQConstant error codes + // TODO decouple AMQException and AMQConstant error codes private void exception(Session session, Method method, AMQException exception, String message) { ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; @@ -823,7 +823,7 @@ public class ServerSessionDelegate extends SessionDelegate } } String description = message + "': " + exception.getMessage(); - + exception(session, method, errorCode, description); } @@ -1349,9 +1349,9 @@ public class ServerSessionDelegate extends SessionDelegate + " as exclusive queue with same name " + "declared on another session"; ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED; - + exception(session, method, errorCode, description); - + return; } } @@ -1436,7 +1436,7 @@ public class ServerSessionDelegate extends SessionDelegate else { VirtualHost virtualHost = getVirtualHost(session); - + try { queue.delete(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 489b985222..c59016173a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -74,7 +74,7 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo int getHouseKeepingPoolSize(); - void setHouseKeepingPoolSize(int newSize); + void setHouseKeepingPoolSize(int newSize); int getHouseKeepingActiveCount(); @@ -102,4 +102,8 @@ public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHo ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask); State getState(); + + public void block(); + + public void unblock(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 9b113525d4..b05025467d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -80,7 +80,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class VirtualHostImpl implements VirtualHost +public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener { private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class); @@ -129,6 +129,7 @@ public class VirtualHostImpl implements VirtualHost private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); + private boolean _blocked; public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception { @@ -157,6 +158,7 @@ public class VirtualHostImpl implements VirtualHost _securityManager.configureHostPlugins(_vhostConfig); _connectionRegistry = new ConnectionRegistry(); + _connectionRegistry.addRegistryChangeListener(this); _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); @@ -178,6 +180,9 @@ public class VirtualHostImpl implements VirtualHost activateNonHAMessageStore(); initialiseStatistics(); + + _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); } public IConnectionRegistry getConnectionRegistry() @@ -558,7 +563,7 @@ public class VirtualHostImpl implements VirtualHost { return _bindingFactory; } - + public void registerMessageDelivered(long messageSize) { if (isStatisticsEnabled()) @@ -568,7 +573,7 @@ public class VirtualHostImpl implements VirtualHost } _appRegistry.registerMessageDelivered(messageSize); } - + public void registerMessageReceived(long messageSize, long timestamp) { if (isStatisticsEnabled()) @@ -578,34 +583,34 @@ public class VirtualHostImpl implements VirtualHost } _appRegistry.registerMessageReceived(messageSize, timestamp); } - + public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } - + public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } - + public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } - + public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } - + public void resetStatistics() { _messagesDelivered.reset(); _dataDelivered.reset(); _messagesReceived.reset(); _dataReceived.reset(); - + for (AMQConnectionModel connection : _connectionRegistry.getConnections()) { connection.resetStatistics(); @@ -616,7 +621,7 @@ public class VirtualHostImpl implements VirtualHost { setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS && _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled()); - + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); _messagesReceived = new StatisticsCounter("messages-received-" + getName()); @@ -699,18 +704,72 @@ public class VirtualHostImpl implements VirtualHost return _dtxRegistry; } - @Override public String toString() { return _name; } - @Override public State getState() { return _state; } + public void block() + { + synchronized (_connectionRegistry) + { + if(!_blocked) + { + _blocked = true; + for(AMQConnectionModel conn : _connectionRegistry.getConnections()) + { + conn.block(); + } + } + } + } + + + public void unblock() + { + synchronized (_connectionRegistry) + { + if(_blocked) + { + _blocked = false; + for(AMQConnectionModel conn : _connectionRegistry.getConnections()) + { + conn.unblock(); + } + } + } + } + + public void connectionRegistered(final AMQConnectionModel connection) + { + if(_blocked) + { + connection.block(); + } + } + + public void connectionUnregistered(final AMQConnectionModel connection) + { + } + + public void event(final Event event) + { + switch(event) + { + case PERSISTENT_MESSAGE_SIZE_OVERFULL: + block(); + break; + case PERSISTENT_MESSAGE_SIZE_UNDERFULL: + unblock(); + break; + } + } + /** * Virtual host JMX MBean class. @@ -750,7 +809,8 @@ public class VirtualHostImpl implements VirtualHost { _exchangeRegistry.initialise(); initialiseModel(_vhostConfig); - } catch (Exception e) + } + catch (Exception e) { throw new RuntimeException("Failed to initialise virtual host after state change", e); } @@ -766,7 +826,8 @@ public class VirtualHostImpl implements VirtualHost try { _brokerMBean.register(); - } catch (JMException e) + } + catch (JMException e) { throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e); } @@ -777,8 +838,6 @@ public class VirtualHostImpl implements VirtualHost public class BeforePassivationListener implements EventListener { - - @Override public void event(Event event) { _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 91174c5d10..58c7625ad6 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -282,9 +282,16 @@ public class MockVirtualHost implements VirtualHost } - @Override public State getState() { return State.ACTIVE; } + + public void block() + { + } + + public void unblock() + { + } }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 55d3ccb6e7..c09b438424 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3130,6 +3130,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } + public boolean isBrokerFlowControlled() + { + synchronized (_flowControl) + { + return _flowControl.getFlowControl(); + } + } + public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 3902c726f3..06ee651a1e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1408,5 +1408,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + public boolean isBrokerFlowControlled() + { + return _qpidSession.isFlowControlled(); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 110c73f718..06b606f2d3 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -210,6 +210,17 @@ public class Session extends SessionInvoker } } + protected State getState() + { + return this.state; + } + + public boolean isFlowControlled() + { + return flowControl; + } + + void setFlowControl(boolean value) { flowControl = value; @@ -307,7 +318,7 @@ public class Session extends SessionInvoker xfr.setHeader(new Header(deliveryProps, header.getMessageProperties(), header.getNonStandardProperties())); } - + } else { @@ -616,7 +627,7 @@ public class Session extends SessionInvoker { acquireCredit(); } - + synchronized (commandsLock) { if (state == DETACHED && m.isUnreliable()) @@ -732,11 +743,11 @@ public class Session extends SessionInvoker { sessionCommandPoint(0, 0); } - + boolean replayTransfer = !closing && !transacted && m instanceof MessageTransfer && ! m.isUnreliable(); - + if ((replayTransfer) || m.hasCompletionListener()) { setCommand(next, m); @@ -833,7 +844,7 @@ public class Session extends SessionInvoker Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED && lt(maxComplete, point)) { - checkFailoverRequired("Session sync was interrupted by failover."); + checkFailoverRequired("Session sync was interrupted by failover."); if(log.isDebugEnabled()) { log.debug("%s waiting for[%d]: %d, %s", this, point, maxComplete, commands); @@ -871,7 +882,7 @@ public class Session extends SessionInvoker { future = results.remove(command); } - + if (future != null) { future.set(result); @@ -1039,7 +1050,7 @@ public class Session extends SessionInvoker } } - protected void awaitClose() + protected void awaitClose() { Waiter w = new Waiter(commandsLock, timeout); while (w.hasTime() && state != CLOSED) @@ -1096,7 +1107,7 @@ public class Session extends SessionInvoker if(state == CLOSED) { - connection.removeSession(this); + connection.removeSession(this); listener.closed(this); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java new file mode 100644 index 0000000000..283fb4ed4c --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -0,0 +1,347 @@ +package org.apache.qpid.server.store; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class StoreOverfullTest extends QpidBrokerTestCase +{ + private static final int TIMEOUT = 10000; + public static final int TEST_SIZE = 150; + + private Connection _producerConnection; + private Connection _consumerConnection; + private Session _producerSession; + private Session _consumerSession; + private MessageProducer _producer; + private MessageConsumer _consumer; + private Queue _queue; + + //private final AtomicInteger sentMessages = new AtomicInteger(0); + + private static final int OVERFULL_SIZE = 4000000; + private static final int UNDERFULL_SIZE = 3500000; + + public void setUp() throws Exception + { + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", + String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", + String.valueOf(UNDERFULL_SIZE)); + setSystemProperty("qpid.bdb.envconfig.je.log.fileMax", "1000000"); + super.setUp(); + + _producerConnection = getConnection(); + _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + _producerConnection.start(); + + _consumerConnection = getConnection(); + _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + } + + public void tearDown() throws Exception + { + try + { + _producerConnection.close(); + _consumerConnection.close(); + } + finally + { + super.tearDown(); + } + } + + /* + * Test: + * + * Send > threshold amount of data : Sender is blocked + * Remove 90% of data : Sender is unblocked + * + */ + public void testCapacityExceededCausesBlock() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + _queue = getTestQueue(); + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + int mostMessages = (int) (0.9 * sentCount); + for(int i = 0; i < mostMessages; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + long targetTime = System.currentTimeMillis() + 5000l; + while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) + { + Thread.sleep(100l); + } + + assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount); + + for(int i = mostMessages; i < TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null) + { + break; + } + } + + assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); + + } + + /* Two producers on different queues + */ + + public void testCapacityExceededCausesBlockTwoConnections() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2"); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(queue2); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(1000l) == null + && consumer2.receive(1000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + } + + /* + * New producers are blocked + */ + + public void testCapacityExceededCausesBlockNewConnection() throws Exception + { + AtomicInteger sentMessages = new AtomicInteger(0); + AtomicInteger sentMessages2 = new AtomicInteger(0); + + _queue = getTestQueue(); + + ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue); + + _producer = _producerSession.createProducer(_queue); + + Connection secondProducerConnection = getConnection(); + Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer secondProducer = secondProducerSession.createProducer(_queue); + + sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + + while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount = sentMessages.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); + + sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); + + while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + { + Thread.sleep(100l); + } + int sentCount2 = sentMessages2.get(); + assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); + + + _consumer = _consumerSession.createConsumer(_queue); + _consumerConnection.start(); + + + for(int i = 0; i < 2*TEST_SIZE; i++) + { + if(_consumer.receive(2000l) == null) + { + break; + } + } + + assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get()); + assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); + + } + + + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + long sleepPeriod, + AtomicInteger sentMessages) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages); + new Thread(sender).start(); + return sender; + } + + private class MessageSender implements Runnable + { + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile JMSException _exception; + private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1); + private long _sleepPeriod; + private final AtomicInteger _sentMessages; + + public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sleepPeriod = sleepPeriod; + _sentMessages = sentMessages; + } + + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages); + } + catch (JMSException e) + { + _exception = e; + _exceptionThrownLatch.countDown(); + } + } + + public Exception awaitSenderException(long timeout) throws InterruptedException + { + _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); + return _exception; + } + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + sentMessages.incrementAndGet(); + + + try + { + ((AMQSession<?,?>)producerSession).sync(); + } + catch (AMQException e) + { + e.printStackTrace(); + throw new RuntimeException(e); + } + + try + { + Thread.sleep(sleepPeriod); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } + + private static final byte[] BYTE_32K = new byte[32*1024]; + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_32K); + send.setIntProperty("msg", msg); + + return send; + } +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 1943049a7b..a8315cbb59 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -121,6 +121,8 @@ org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker) org.apache.qpid.server.queue.ProducerFlowControlTest#* +//QPID-3986 : Flow control invoked on total store disk usage +org.apache.qpid.server.store.StoreOverfullTest#* org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#* diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index eb4c1f814d..28d39278b9 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -30,7 +30,6 @@ org.apache.qpid.test.unit.xa.TopicTest#testMultiMessagesDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testDurSubCrash org.apache.qpid.test.unit.xa.TopicTest#testRecover - org.apache.qpid.server.store.MessageStoreTest#testMessagePersistence org.apache.qpid.server.store.MessageStoreTest#testMessageRemoval org.apache.qpid.server.store.MessageStoreTest#testBindingPersistence @@ -39,6 +38,7 @@ org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval +org.apache.qpid.server.store.StoreOverfullTest#* org.apache.qpid.server.store.berkeleydb.* |