diff options
author | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2012-05-29 11:37:11 +0000 |
commit | 5a888989bd28402e3271b05bcc32a7410f8d17a2 (patch) | |
tree | 469fe7c0a7a484cf2663a70d289e342497303f0b | |
parent | 739f8964cdb5baf786759fb900928206a404d3fa (diff) | |
download | qpid-python-5a888989bd28402e3271b05bcc32a7410f8d17a2.tar.gz |
QPID-3986: Improved tests and resolved some potential thread-safety issues
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>, Philip Harvey <phil@philharveyonline.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343675 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 558 insertions, 71 deletions
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..fe48e29d0b --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(BDBMessageStoreQuotaEventsTest.class); + + /* + * Notes on calculation of quota limits. + * + * 150 32kb messages is approximately 4.8MB which is greater than + * OVERFULL_SIZE. + * + * We deliberately use settings that force BDB to use multiple log files, so + * that when one or more of them are subsequently cleaned (following message + * consumption) the actual size on disk is reduced. + */ + + private static final String MAX_BDB_LOG_SIZE = "1000000"; // ~1MB + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 150; + + private static final int OVERFULL_SIZE = 4000000; // ~4MB + private static final int UNDERFULL_SIZE = 3500000; // ~3.5MB + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(XMLConfiguration config) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + config.addProperty("envConfig(-1).name", "je.log.fileMax"); + config.addProperty("envConfig.value", MAX_BDB_LOG_SIZE); + config.addProperty("overfull-size", OVERFULL_SIZE); + config.addProperty("underfull-size", UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + MessageStore store = new BDBMessageStore(); + return store; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index e12c6fa271..849aa05099 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -591,7 +591,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr public List<AMQChannel> getChannels() { - return new ArrayList<AMQChannel>(_channelMap.values()); + synchronized (_channelMap) + { + return new ArrayList<AMQChannel>(_channelMap.values()); + } } public AMQChannel getAndAssertChannel(int channelId) throws AMQException @@ -651,6 +654,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); + + if(_blocking) + { + channel.block(); + } } } @@ -659,11 +667,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr _cachedChannels[channelId] = channel; } - if(_blocking) - { - channel.block(); - } - checkForNotification(); } @@ -790,7 +793,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQPr */ private void closeAllChannels() throws AMQException { - for (AMQChannel channel : _channelMap.values()) + for (AMQChannel channel : getChannels()) { channel.close(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java new file mode 100644 index 0000000000..f1976ecee3 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.MethodRegistry; +import org.apache.qpid.framing.ProtocolVersion; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase implements EventListener, TransactionLogResource +{ + private static final Logger _logger = Logger.getLogger(MessageStoreQuotaEventsTestBase.class); + + protected static final byte[] MESSAGE_DATA = new byte[32 * 1024]; + + private MessageStore _store; + private File _storeLocation; + + private List<Event> _events; + private UUID _transactionResource; + + protected abstract MessageStore createStore() throws Exception; + + protected abstract void applyStoreSpecificConfiguration(XMLConfiguration config); + + protected abstract int getNumberOfMessagesToFillStore(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + + _storeLocation = new File(new File(TMP_FOLDER), getTestName()); + FileUtils.delete(_storeLocation, true); + + XMLConfiguration config = new XMLConfiguration(); + config.addProperty("environment-path", _storeLocation.getAbsolutePath()); + applyStoreSpecificConfiguration(config); + + _store = createStore(); + _store.configureConfigStore("test", null, config); + + _transactionResource = UUID.randomUUID(); + _events = new ArrayList<Event>(); + _store.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + @Override + public void tearDown() throws Exception + { + super.tearDown(); + FileUtils.delete(_storeLocation, true); + } + + public void testOverflow() throws Exception + { + Transaction transaction = _store.newTransaction(); + + List<EnqueableMessage> messages = new ArrayList<EnqueableMessage>(); + for (int i = 0; i < getNumberOfMessagesToFillStore(); i++) + { + EnqueableMessage m = addMessage(i); + messages.add(m); + transaction.enqueueMessage(this, m); + } + transaction.commitTran(); + + assertEvent(1, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + + for (EnqueableMessage m : messages) + { + m.getStoredMessage().remove(); + } + + assertEvent(2, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + + protected EnqueableMessage addMessage(long id) + { + MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString(getName()), false, false, + new AMQShortString(getName())); + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue()); + props.setContentType(getTestName()); + + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9); + int classForBasic = methodRegistry.createBasicQosOkBody().getClazz(); + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, MESSAGE_DATA.length); + + MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1); + StoredMessage<MessageMetaData> handle = _store.addMessage(metaData); + handle.addContent(0, ByteBuffer.wrap(MESSAGE_DATA)); + TestMessage message = new TestMessage(id, handle); + return message; + } + + @Override + public void event(Event event) + { + _logger.debug("Test event listener received event " + event); + _events.add(event); + } + + private void assertEvent(int expectedNumberOfEvents, Event... expectedEvents) + { + assertEquals("Unexpected number of events received ", expectedNumberOfEvents, _events.size()); + for (Event event : expectedEvents) + { + assertTrue("Expected event is not found:" + event, _events.contains(event)); + } + } + + @Override + public UUID getId() + { + return _transactionResource; + } + + private static class TestMessage implements EnqueableMessage + { + private final StoredMessage<?> _handle; + private final long _messageId; + + public TestMessage(long messageId, StoredMessage<?> handle) + { + _messageId = messageId; + _handle = handle; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage<?> getStoredMessage() + { + return _handle; + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java new file mode 100644 index 0000000000..5d316fca43 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.derby; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; + +public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase +{ + private static final Logger _logger = Logger.getLogger(DerbyMessageStoreQuotaEventsTest.class); + + private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; + + /** + * Estimated using an assumption that a physical disk space occupied by a + * message is 3 times bigger then a message size + */ + private static final int OVERFULL_SIZE = (int) (MESSAGE_DATA.length * 3 * NUMBER_OF_MESSAGES_TO_OVERFILL_STORE * 0.8); + + private static final int UNDERFULL_SIZE = (int) (OVERFULL_SIZE * 0.8); + + @Override + protected int getNumberOfMessagesToFillStore() + { + return NUMBER_OF_MESSAGES_TO_OVERFILL_STORE; + } + + @Override + protected void applyStoreSpecificConfiguration(XMLConfiguration config) + { + _logger.debug("Applying store specific config. overfull-sze=" + OVERFULL_SIZE + ", underfull-size=" + UNDERFULL_SIZE); + + config.addProperty("overfull-size", OVERFULL_SIZE); + config.addProperty("underfull-size", UNDERFULL_SIZE); + } + + @Override + protected MessageStore createStore() throws Exception + { + return new DerbyMessageStore(); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c09b438424..92a8f88d28 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -3130,11 +3130,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } - public boolean isBrokerFlowControlled() + public boolean isFlowBlocked() { synchronized (_flowControl) { - return _flowControl.getFlowControl(); + return !_flowControl.getFlowControl(); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 06ee651a1e..317c8902be 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1408,9 +1408,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } - public boolean isBrokerFlowControlled() + @Override + public boolean isFlowBlocked() { - return _qpidSession.isFlowControlled(); + return _qpidSession.isFlowBlocked(); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 06b606f2d3..9a9e131adc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -1195,4 +1195,12 @@ public class Session extends SessionInvoker } } } + + /** + * An auxiliary method for test purposes only + */ + public boolean isFlowBlocked() + { + return isFlowControlled() && credit.availablePermits() == 0; + } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java new file mode 100644 index 0000000000..2c029b4bf3 --- /dev/null +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.message.MessageContentSource; + +public class QuotaMessageStore extends NullMessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + private final AtomicBoolean _closed = new AtomicBoolean(false); + + private long _totalStoreSize;; + private boolean _limitBusted; + private long _persistentSizeLowThreshold; + private long _persistentSizeHighThreshold; + + private final StateManager _stateManager; + private final EventManager _eventManager = new EventManager(); + + public QuotaMessageStore() + { + _stateManager = new StateManager(_eventManager); + } + + @Override + public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) + throws Exception + { + _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE); + _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, + _persistentSizeHighThreshold); + if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l) + { + _persistentSizeLowThreshold = _persistentSizeHighThreshold; + } + _stateManager.attainState(State.INITIALISING); + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception + { + _stateManager.attainState(State.INITIALISED); + } + + @Override + public void activate() throws Exception + { + _stateManager.attainState(State.ACTIVATING); + _stateManager.attainState(State.ACTIVE); + } + + @SuppressWarnings("unchecked") + @Override + public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData) + { + final long id = _messageId.getAndIncrement(); + return new StoredMemoryMessage(id, metaData); + } + + @Override + public Transaction newTransaction() + { + return new Transaction() + { + private AtomicLong _storeSizeIncrease = new AtomicLong(); + + @Override + public StoreFuture commitTranAsync() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException + { + _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize()); + } + + @Override + public void commitTran() throws AMQStoreException + { + QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue()); + } + + @Override + public void abortTran() throws AMQStoreException + { + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + } + }; + } + + @Override + public boolean isPersistent() + { + return true; + } + + @Override + public void close() throws Exception + { + _stateManager.attainState(State.CLOSING); + _closed.getAndSet(true); + _stateManager.attainState(State.CLOSED); + } + + @Override + public void addEventListener(EventListener eventListener, Event... events) + { + _eventManager.addEventListener(eventListener, events); + } + + private void storedSizeChange(final int delta) + { + if(_persistentSizeHighThreshold > 0) + { + synchronized (this) + { + long newSize = _totalStoreSize += delta; + if(!_limitBusted && newSize > _persistentSizeHighThreshold) + { + _limitBusted = true; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + } + else if(_limitBusted && newSize < _persistentSizeHighThreshold) + { + _limitBusted = false; + _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + } + } + } +} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java index bcee4e4930..cfe530750b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java @@ -1,8 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.server.store; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.JMSException; @@ -11,6 +31,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; + import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -18,26 +39,13 @@ import org.apache.qpid.client.AMQSession; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.test.utils.QpidBrokerTestCase; -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ public class StoreOverfullTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 10000; - public static final int TEST_SIZE = 150; + /** Number of messages to send*/ + public static final int TEST_SIZE = 15; + + /** Message payload*/ + private static final byte[] BYTE_32K = new byte[32*1024]; private Connection _producerConnection; private Connection _consumerConnection; @@ -47,23 +55,14 @@ public class StoreOverfullTest extends QpidBrokerTestCase private MessageConsumer _consumer; private Queue _queue; - //private final AtomicInteger sentMessages = new AtomicInteger(0); - - private static final int OVERFULL_SIZE = 4000000; - private static final int UNDERFULL_SIZE = 3500000; + private static final int OVERFULL_SIZE = 400000; + private static final int UNDERFULL_SIZE = 350000; public void setUp() throws Exception { - setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", - String.valueOf(OVERFULL_SIZE)); - setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", - String.valueOf(UNDERFULL_SIZE)); - - if(getTestProfileMessageStoreClassName().contains("BDB")) - { - setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).name", "je.log.fileMax"); - setConfigurationProperty("virtualhosts.virtualhost.test.store.envConfig(1).value", "1000000"); - } + setConfigurationProperty("virtualhosts.virtualhost.test.store.class", QuotaMessageStore.class.getName()); + setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", String.valueOf(OVERFULL_SIZE)); + setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", String.valueOf(UNDERFULL_SIZE)); super.setUp(); @@ -90,7 +89,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - /* + /** * Test: * * Send > threshold amount of data : Sender is blocked @@ -105,9 +104,9 @@ public class StoreOverfullTest extends QpidBrokerTestCase _producer = _producerSession.createProducer(_queue); - sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); + MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -125,7 +124,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase break; } } - + long targetTime = System.currentTimeMillis() + 5000l; while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime) { @@ -143,12 +142,12 @@ public class StoreOverfullTest extends QpidBrokerTestCase } assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE); - + assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException()); } - /* Two producers on different queues + /** + * Two producers on different queues */ - public void testCapacityExceededCausesBlockTwoConnections() throws Exception { AtomicInteger sentMessages = new AtomicInteger(0); @@ -168,7 +167,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -176,19 +175,17 @@ public class StoreOverfullTest extends QpidBrokerTestCase assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount); - while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked()) { Thread.sleep(100l); } int sentCount2 = sentMessages2.get(); assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - _consumer = _consumerSession.createConsumer(_queue); MessageConsumer consumer2 = _consumerSession.createConsumer(queue2); _consumerConnection.start(); - for(int i = 0; i < 2*TEST_SIZE; i++) { if(_consumer.receive(1000l) == null @@ -202,10 +199,9 @@ public class StoreOverfullTest extends QpidBrokerTestCase assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get()); } - /* + /** * New producers are blocked */ - public void testCapacityExceededCausesBlockNewConnection() throws Exception { AtomicInteger sentMessages = new AtomicInteger(0); @@ -223,7 +219,7 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages); - while(!((AMQSession)_producerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } @@ -232,18 +228,16 @@ public class StoreOverfullTest extends QpidBrokerTestCase sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2); - while(!((AMQSession)secondProducerSession).isBrokerFlowControlled()) + while(!((AMQSession<?,?>)_producerSession).isFlowBlocked()) { Thread.sleep(100l); } int sentCount2 = sentMessages2.get(); assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2); - _consumer = _consumerSession.createConsumer(_queue); _consumerConnection.start(); - for(int i = 0; i < 2*TEST_SIZE; i++) { if(_consumer.receive(2000l) == null) @@ -257,8 +251,6 @@ public class StoreOverfullTest extends QpidBrokerTestCase } - - private MessageSender sendMessagesAsync(final MessageProducer producer, final Session producerSession, final int numMessages, @@ -302,11 +294,11 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - public Exception awaitSenderException(long timeout) throws InterruptedException + public Exception getException() { - _exceptionThrownLatch.await(timeout, TimeUnit.MILLISECONDS); return _exception; } + } private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages) @@ -318,7 +310,6 @@ public class StoreOverfullTest extends QpidBrokerTestCase producer.send(nextMessage(msg, producerSession)); sentMessages.incrementAndGet(); - try { ((AMQSession<?,?>)producerSession).sync(); @@ -340,14 +331,12 @@ public class StoreOverfullTest extends QpidBrokerTestCase } } - private static final byte[] BYTE_32K = new byte[32*1024]; - private Message nextMessage(int msg, Session producerSession) throws JMSException { BytesMessage send = producerSession.createBytesMessage(); send.writeBytes(BYTE_32K); send.setIntProperty("msg", msg); - return send; } + } diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index 28d39278b9..3469e35d02 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -38,8 +38,7 @@ org.apache.qpid.server.store.MessageStoreTest#testQueuePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableQueueRemoval org.apache.qpid.server.store.MessageStoreTest#testExchangePersistence org.apache.qpid.server.store.MessageStoreTest#testDurableExchangeRemoval -org.apache.qpid.server.store.StoreOverfullTest#* org.apache.qpid.server.store.berkeleydb.* -org.apache.qpid.server.store.DurableConfigurationStoreTest#*
\ No newline at end of file +org.apache.qpid.server.store.DurableConfigurationStoreTest#* |