diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2011-08-29 11:00:31 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2011-08-29 11:00:31 +0000 |
commit | 065cf4329f0fc01bf756f015bcb605713a354c38 (patch) | |
tree | 24184fc1a21937f02bf45df1b8312094e74bdbe2 | |
parent | 4e2c545f9570051b5ba00b3a7b666a23892fa1b4 (diff) | |
download | qpid-python-065cf4329f0fc01bf756f015bcb605713a354c38.tar.gz |
NO-JIRA : Merged from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rg-amqp-1-0-sandbox@1162728 13f79535-47bb-0310-9956-ffa450edef68
24 files changed, 229 insertions, 291 deletions
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml index 5860bfe2cb..33a48a1349 100644 --- a/qpid/java/broker/etc/virtualhosts.xml +++ b/qpid/java/broker/etc/virtualhosts.xml @@ -31,7 +31,7 @@ <housekeeping> <threadCount>2</threadCount> - <expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod> + <checkPeriod>20000</checkPeriod> </housekeeping> <exchanges> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 02f8a346cf..dd2df45019 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -227,6 +227,14 @@ public class ServerConfiguration extends ConfigurationPlugin + (_configFile == null ? "" : " Configuration file : " + _configFile); throw new ConfigurationException(message); } + + // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency + if (contains("housekeeping.expiredMessageCheckPeriod")) + { + String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod." + + (_configFile == null ? "" : " Configuration file : " + _configFile); + throw new ConfigurationException(message); + } } /* @@ -707,16 +715,14 @@ public class ServerConfiguration extends ConfigurationPlugin getConfig().setProperty("virtualhosts.default", vhost); } - public void setHousekeepingExpiredMessageCheckPeriod(long value) + public void setHousekeepingCheckPeriod(long value) { - getConfig().setProperty("housekeeping.expiredMessageCheckPeriod", value); + getConfig().setProperty("housekeeping.checkPeriod", value); } public long getHousekeepingCheckPeriod() { - return getLongValue("housekeeping.checkPeriod", - getLongValue("housekeeping.expiredMessageCheckPeriod", - DEFAULT_HOUSEKEEPING_PERIOD)); + return getLongValue("housekeeping.checkPeriod", DEFAULT_HOUSEKEEPING_PERIOD); } public long getStatisticsSamplePeriod() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index a710230616..6729a5ce0f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -86,9 +86,9 @@ public class VirtualHostConfiguration extends ConfigurationPlugin return _name; } - public long getHousekeepingExpiredMessageCheckPeriod() + public long getHousekeepingCheckPeriod() { - return getLongValue("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod()); + return getLongValue("housekeeping.checkPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod()); } public String getAuthenticationDatabase() @@ -313,6 +313,14 @@ public class VirtualHostConfiguration extends ConfigurationPlugin + " It appears in virtual host definition : " + _name; throw new ConfigurationException(message); } + + // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency + if (contains("housekeeping.expiredMessageCheckPeriod")) + { + String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod." + + " It appears in virtual host definition : " + _name; + throw new ConfigurationException(message); + } } public int getHouseKeepingThreadCount() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 43c4fa26b7..dab6c3b231 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -92,6 +92,7 @@ public class PluginManager implements Closeable private Activator _activator; + private final List<ServiceTracker> _trackers = new ArrayList<ServiceTracker>(); private Map<String, SecurityPluginFactory> _securityPlugins = new HashMap<String, SecurityPluginFactory>(); private Map<List<String>, ConfigurationPluginFactory> _configPlugins = new IdentityHashMap<List<String>, ConfigurationPluginFactory>(); private Map<String, VirtualHostPluginFactory> _vhostPlugins = new HashMap<String, VirtualHostPluginFactory>(); @@ -253,25 +254,29 @@ public class PluginManager implements Closeable _logger.info("Using the specified external BundleContext"); } - // TODO save trackers in a map, keyed by class name - _exchangeTracker = new ServiceTracker(bundleContext, ExchangeType.class.getName(), null); _exchangeTracker.open(); + _trackers.add(_exchangeTracker); _securityTracker = new ServiceTracker(bundleContext, SecurityPluginFactory.class.getName(), null); _securityTracker.open(); + _trackers.add(_securityTracker); _configTracker = new ServiceTracker(bundleContext, ConfigurationPluginFactory.class.getName(), null); _configTracker.open(); + _trackers.add(_configTracker); _virtualHostTracker = new ServiceTracker(bundleContext, VirtualHostPluginFactory.class.getName(), null); _virtualHostTracker.open(); + _trackers.add(_virtualHostTracker); _policyTracker = new ServiceTracker(bundleContext, SlowConsumerPolicyPluginFactory.class.getName(), null); _policyTracker.open(); - + _trackers.add(_policyTracker); + _authenticationManagerTracker = new ServiceTracker(bundleContext, AuthenticationManagerPluginFactory.class.getName(), null); _authenticationManagerTracker.open(); + _trackers.add(_authenticationManagerTracker); _logger.info("Opened service trackers"); } @@ -353,12 +358,10 @@ public class PluginManager implements Closeable try { // Close all bundle trackers - _exchangeTracker.close(); - _securityTracker.close(); - _configTracker.close(); - _virtualHostTracker.close(); - _policyTracker.close(); - _authenticationManagerTracker.close(); + for(ServiceTracker tracker : _trackers) + { + tracker.close(); + } } finally { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index f65cad23e7..b3acf48676 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.transport; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus; +import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; @@ -194,4 +196,23 @@ public class ServerConnectionDelegate extends ServerDelegate { return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(); } + + @Override public void sessionDetach(Connection conn, SessionDetach dtc) + { + // To ensure a clean detach, we unregister any remaining subscriptions. Unregister ensures + // that any in-progress delivery (SubFlushRunner/QueueRunner) is completed before the unregister + // completes. + unregisterAllSubscriptions(conn, dtc); + super.sessionDetach(conn, dtc); + } + + private void unregisterAllSubscriptions(Connection conn, SessionDetach dtc) + { + final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel()); + final Collection<Subscription_0_10> subs = ssn.getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subs) + { + ssn.unregister(subscription_0_10); + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index 3d92df4513..fde758203b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -242,19 +242,24 @@ public class VirtualHostImpl implements VirtualHost _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); - initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod()); + initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod()); initialiseStatistics(); } + /** + * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers + * and checking for idle or open transactions that have exceeded the permitted thresholds. + * + * @param period + */ private void initialiseHouseKeeping(long period) { - /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ if (period != 0L) { - class ExpiredMessagesTask extends HouseKeepingTask + class VirtualHostHouseKeepingTask extends HouseKeepingTask { - public ExpiredMessagesTask(VirtualHost vhost) + public VirtualHostHouseKeepingTask(VirtualHost vhost) { super(vhost); } @@ -299,7 +304,7 @@ public class VirtualHostImpl implements VirtualHost } } - scheduleHouseKeepingTask(period, new ExpiredMessagesTask(this)); + scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask(this)); Map<String, VirtualHostPluginFactory> plugins = ApplicationRegistry.getInstance().getPluginManager().getVirtualHostPlugins(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 37375c2b7b..7d54533632 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -559,18 +559,17 @@ public class ServerConfigurationTest extends QpidTestCase assertEquals(true, _serverConfig.getUseBiasedWrites()); } - public void testGetHousekeepingExpiredMessageCheckPeriod() throws ConfigurationException + public void testGetHousekeepingCheckPeriod() throws ConfigurationException { // Check default _serverConfig.initialise(); assertEquals(30000, _serverConfig.getHousekeepingCheckPeriod()); // Check value we set - _config.setProperty("housekeeping.expiredMessageCheckPeriod", 23L); + _config.setProperty("housekeeping.checkPeriod", 23L); _serverConfig = new ServerConfiguration(_config); _serverConfig.initialise(); - assertEquals(23, _serverConfig.getHousekeepingCheckPeriod()); - _serverConfig.setHousekeepingExpiredMessageCheckPeriod(42L); + _serverConfig.setHousekeepingCheckPeriod(42L); assertEquals(42, _serverConfig.getHousekeepingCheckPeriod()); } @@ -1362,4 +1361,29 @@ public class ServerConfigurationTest extends QpidTestCase ce.getMessage()); } } + + /* + * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was + * replaced by housekeeping.checkPeriod) is rejected. + */ + public void testExpiredMessageCheckPeriodRejected() throws ConfigurationException + { + _serverConfig.initialise(); + + // Check value we set + _config.setProperty("housekeeping.expiredMessageCheckPeriod", 23L); + _serverConfig = new ServerConfiguration(_config); + + try + { + _serverConfig.initialise(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod.", + ce.getMessage()); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java index 593119041d..b133d53ac5 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java @@ -229,5 +229,26 @@ public class VirtualHostConfigurationTest extends InternalBrokerBaseCase } } + /* + * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was + * replaced by housekeeping.checkPeriod) is rejected. + */ + public void testExpiredMessageCheckPeriodRejected() throws Exception + { + getConfigXml().addProperty("virtualhosts.virtualhost.testExpiredMessageCheckPeriodRejected.housekeeping.expiredMessageCheckPeriod", + 5); + try + { + super.createBroker(); + fail("Exception not thrown"); + } + catch (ConfigurationException ce) + { + assertEquals("Incorrect error message", + "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod." + + " It appears in virtual host definition : " + getName(), + ce.getMessage()); + } + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index a8bddcf6bf..4272c77798 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -289,7 +289,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase protected void configure() { // Increase Alert Check period - getConfiguration().setHousekeepingExpiredMessageCheckPeriod(200); + getConfiguration().setHousekeepingCheckPeriod(200); } private void sendMessages(AMQChannel channel, long messageCount, final long size) throws AMQException diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index eef6c047d3..82a6cdaa67 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslServer; @@ -406,7 +405,7 @@ public class Connection extends ConnectionInvoker else { throw new ProtocolViolationException( - "Received frames for an already dettached session", null); + "Received frames for an already detached session", null); } } @@ -455,7 +454,7 @@ public class Connection extends ConnectionInvoker } } - protected Session getSession(int channel) + public Session getSession(int channel) { synchronized (lock) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index e0c6cb29d3..4c996e261c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -964,16 +964,29 @@ public class Session extends SessionInvoker public void close() { + if (log.isDebugEnabled()) + { + log.debug("Closing [%s] in state [%s]", this, state); + } synchronized (commands) { - state = CLOSING; - setClose(true); - sessionRequestTimeout(0); - sessionDetach(name.getBytes()); - - awaitClose(); - - + switch(state) + { + case DETACHED: + state = CLOSED; + delegate.closed(this); + connection.removeSession(this); + listener.closed(this); + break; + case CLOSED: + break; + default: + state = CLOSING; + setClose(true); + sessionRequestTimeout(0); + sessionDetach(name.getBytes()); + awaitClose(); + } } } diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml index 1da0a6d355..33ad2227bb 100644 --- a/qpid/java/systests/build.xml +++ b/qpid/java/systests/build.xml @@ -22,7 +22,7 @@ nn - or more contributor license agreements. See the NOTICE file <property name="module.depends" value="client management/tools/qpid-cli management/common broker broker/test common common/test junit-toolkit"/> <property name="module.test.src" location="src/main/java"/> <property name="module.test.excludes" - value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/> + value="**/DropInTest.java,**/TestClientControlledTest.java"/> <import file="../module.xml"/> diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java index 58b2edfee2..a2487b49bf 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java @@ -46,7 +46,7 @@ public class AlertingTest extends AbstractTestLogging { // Update the configuration to make our virtualhost Persistent. makeVirtualHostPersistent(VIRTUALHOST); - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "5000"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "5000"); _numMessages = 50; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java deleted file mode 100644 index d4bab657d7..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/testcases/TTLTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.test.testcases; - -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.ACK_MODE_PROPNAME; -import static org.apache.qpid.test.framework.MessagingTestConfigProperties.PUBSUB_PROPNAME; -import org.apache.qpid.test.framework.TestUtils; -import org.apache.qpid.test.framework.localcircuit.LocalCircuitImpl; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import javax.jms.*; - -import java.util.LinkedList; -import java.util.List; -import java.util.Random; - -/** - * TTLTest checks that time-to-live is applied to messages. The test sends messages with a variety of TTL stamps on them - * then after a pause attempts to receive those messages. Only messages with a large enough TTL to have survived the pause - * should be receiveable. This test case also applies an additional assertion against the broker, that the message store - * is empty at the end of the test. - * - * <p/>This test is designed to run over local circuits only, as it must control a timed pause between sending and receiving - * messages to that TTL can be applied to purge some of the messages. - * - * <p/><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> - * </table> - * - * @todo Use an interface or other method to mark this test as local only. - * - * @todo Implement the message store assertion for in-vm broker. Could also be done for external broker, for example - * by using diagnostic exchange. - * - * @todo Implement and add a queue depth assertion too. This might already be in another test to copy from. - * - * @todo Create variations on test theme, for different ack mode and tx and message sizes etc. - * - * @todo Add an allowable margin of error to the test, as ttl may not be precise. - */ -public class TTLTest extends FrameworkBaseCase -{ - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public TTLTest(String name) - { - super(name); - } - - /** - * Checks that all messages sent with a TTL shorter than a pause between sending them and attempting to receive them - * will fail to arrive. Once all messages have been purged by TTL or received, check that they no longer exist on - * the broker. - * - * @throws javax.jms.JMSException Allowed to fall through and fail test. - */ - public void testTTLP2P() throws Exception - { - String errorMessages = ""; - Random r = new Random(); - - // Used to accumulate correctly received messages in. - List<Message> receivedMessages = new LinkedList<Message>(); - - // Set up the test properties to match the test case requirements. - getTestProps().setProperty(ACK_MODE_PROPNAME, Session.AUTO_ACKNOWLEDGE); - getTestProps().setProperty(PUBSUB_PROPNAME, false); - - // Create the test circuit from the test configuration parameters. - CircuitFactory circuitFactory = getCircuitFactory(); - Circuit testCircuit = circuitFactory.createCircuit(getConnection(), getTestProps()); - - // This test case assumes it is using a local circuit. - LocalCircuitImpl localCircuit = (LocalCircuitImpl) testCircuit; - - Session producerSession = localCircuit.getLocalPublisherCircuitEnd().getSession(); - MessageProducer producer = localCircuit.getLocalPublisherCircuitEnd().getProducer(); - MessageConsumer consumer = localCircuit.getLocalReceiverCircuitEnd().getConsumer(); - - // Send some tests messages, with random TTLs, some shorter and some longer than the pause time. - for (int i = 0; i < 100; i++) - { - Message testMessage = TestUtils.createTestMessageOfSize(producerSession, 10); - - // Set the TTL on the message and record its value in the message headers. - long ttl = 500 + r.nextInt(1500); - producer.setTimeToLive(ttl); - testMessage.setLongProperty("testTTL", ttl); - - producer.send(testMessage); - // producerSession.commit(); - } - - // Inject a pause to allow some messages to be purged by TTL. - TestUtils.pause(1000); - - // Attempt to receive back all of the messages, confirming by the message time stamps and TTLs that only - // those received should have avoided being purged by the TTL. - boolean timedOut = false; - - - Message testMessage = null; - - do - { - testMessage = consumer.receive(1000); - - long ttl = testMessage.getLongProperty("testTTL"); - long timeStamp = testMessage.getJMSTimestamp(); - long now = System.currentTimeMillis(); - - if ((timeStamp + ttl) < now) - { - errorMessages += - "Received message [sent: " + timeStamp + ", ttl: " + ttl + ", received: " + now - + "] which should have been purged by its TTL.\n"; - } - /*else - { - receivedMessages.add(testMessage); - }*/ - } while (!timedOut && testMessage != null); - - // Check that the queue and message store on the broker are empty. - // assertTrue("Message store is not empty.", messageStoreEmpty.apply()); - // assertTrue("Queue is not empty.", queueEmpty.apply()); - - assertTrue(errorMessages, "".equals(errorMessages)); - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java index 6d1b6de238..7f166d07fe 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -98,6 +99,31 @@ public class ConnectionCloseTest extends QpidBrokerTestCase delta.size() < deltaThreshold); } + /** + * This test is added due to QPID-3453 to test connection closing when AMQ + * session is not closed but underlying transport session is in detached + * state and transport connection is closed + */ + public void testConnectionCloseOnOnForcibleBrokerStop() throws Exception + { + Connection connection = getConnection(); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + stopBroker(); + + // we need to close connection explicitly in order to verify that + // closing of connection having transport session in DETACHED state and + // transport connection in CLOSED state does not throw an exception + try + { + connection.close(); + } + catch (JMSException e) + { + // session closing should not fail + fail("Cannot close connection:" + e.getMessage()); + } + } + private void dumpStacks(Map<Thread,StackTraceElement[]> map) { for (Map.Entry<Thread,StackTraceElement[]> entry : map.entrySet()) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java index 0f799073b4..39691a5c7c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.test.unit.message; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQHeadersExchange; import org.apache.qpid.client.AMQQueue; @@ -50,21 +54,8 @@ import javax.jms.StreamMessage; */ public class StreamMessageTest extends QpidBrokerTestCase { - private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class); - public String _connectionString = "vm://:1"; - - protected void setUp() throws Exception - { - super.setUp(); - } - - protected void tearDown() throws Exception - { - super.tearDown(); - } - public void testStreamMessageEOF() throws Exception { Connection con = (AMQConnection) getConnection("guest", "guest"); @@ -114,6 +105,7 @@ public class StreamMessageTest extends QpidBrokerTestCase try { msg2.readByte(); + fail("Expected exception not thrown"); } catch (Exception e) { @@ -125,6 +117,9 @@ public class StreamMessageTest extends QpidBrokerTestCase public void testModifyReceivedMessageExpandsBuffer() throws Exception { + final CountDownLatch awaitMessages = new CountDownLatch(1); + final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>(); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ")); @@ -134,28 +129,38 @@ public class StreamMessageTest extends QpidBrokerTestCase public void onMessage(Message message) { - StreamMessage sm = (StreamMessage) message; + final StreamMessage sm = (StreamMessage) message; try { sm.clearBody(); + // it is legal to extend a stream message's content sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd"); } - catch (JMSException e) + catch (Throwable t) + { + listenerCaughtException.set(t); + } + finally { - _logger.error("Error when writing large string to received msg: " + e, e); - fail("Error when writing large string to received msg" + e); + awaitMessages.countDown(); } } }); Connection con2 = (AMQConnection) getConnection("guest", "guest"); AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageProducer mandatoryProducer = producerSession.createProducer(queue); + MessageProducer producer = producerSession.createProducer(queue); con.start(); StreamMessage sm = producerSession.createStreamMessage(); sm.writeInt(42); - mandatoryProducer.send(sm); - Thread.sleep(2000); + producer.send(sm); + + // Allow up to five seconds for the message to arrive with the consumer + final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS); + assertTrue("Message did not arrive with consumer within a reasonable time", completed); + final Throwable listenerException = listenerCaughtException.get(); + assertNull("No exception should be caught by listener : " + listenerException, listenerException); + con.close(); con2.close(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java index 36bac3b715..46e5d214f5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutConfigurationTest.java @@ -31,7 +31,7 @@ public class TransactionTimeoutConfigurationTest extends TransactionTimeoutTestC protected void configure() throws Exception { // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); // Set transaction timout properties. setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".transactionTimeout.openWarn", "200"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java index 71b89bf911..db508143f9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -29,7 +29,7 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase protected void configure() throws Exception { // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); } public void testProducerIdleCommit() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java index 786fc2adb0..ef2de5c592 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java @@ -72,7 +72,7 @@ public class TransactionTimeoutTestCase extends QpidBrokerTestCase implements Ex protected void configure() throws Exception { // Setup housekeeping every second - setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.expiredMessageCheckPeriod", "100"); + setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100"); /* * Set transaction timout properties. The XML in the virtualhosts configuration is as follows: diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 8c05b28161..2e32754943 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -24,7 +24,6 @@ org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* org.apache.qpid.client.ResetMessageListenerTest#* // This test is not finished -org.apache.qpid.test.testcases.TTLTest#* org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover // Those tests are testing 0.8 specific semantics diff --git a/qpid/java/test-profiles/Excludes b/qpid/java/test-profiles/Excludes index 7e096d3ac8..56a256f191 100644 --- a/qpid/java/test-profiles/Excludes +++ b/qpid/java/test-profiles/Excludes @@ -17,12 +17,6 @@ // under the License. // -// QPID-1715, QPID-1715 : Client Error Handling on close is still broken -org.apache.qpid.server.queue.QueueCreateTest#testCreatePriorityString -org.apache.qpid.server.queue.QueueCreateTest#testCreateFlowToDiskValidNoSize -org.apache.qpid.server.queue.QueueCreateTest#testCreateFlowToDiskInvalid -org.apache.qpid.server.queue.QueueCreateTest#testCreateFlowToDiskInvalidSize - // // QPID-2031 : Broker is not cleanly shutdown by QpidTestCase // @@ -32,7 +26,7 @@ org.apache.qpid.server.logging.BrokerLoggingTest#testBrokerShutdownStopped org.apache.qpid.server.logging.VirtualHostLoggingTest#testVirtualhostClosure org.apache.qpid.server.logging.MemoryMessageStoreLoggingTest#testMessageStoreClose -// QPID-XXX : Test fails to start external broker due to Derby Exception. +// QPID-3424 : Test fails to start external broker due to Derby Exception. org.apache.qpid.server.logging.DerbyMessageStoreLoggingTest#* // QPID-1816 : Client Ack has not been addressed diff --git a/qpid/java/test-profiles/Java010Excludes b/qpid/java/test-profiles/Java010Excludes index efe4b73ec5..e180566699 100755 --- a/qpid/java/test-profiles/Java010Excludes +++ b/qpid/java/test-profiles/Java010Excludes @@ -37,9 +37,6 @@ org.apache.qpid.test.client.FlowControlTest#* // 0-10 protocol doesn't support message bouncing org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#* -// QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache -org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#* - // 0-10 and 0-9 connections dont generate the exact same logging due to protocol differences org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted @@ -49,9 +46,6 @@ org.apache.qpid.server.logging.SubscriptionLoggingTest#testSubscriptionSuspend org.apache.qpid.management.jmx.ManagementActorLoggingTest#testConnectionCloseViaManagement org.apache.qpid.management.jmx.MessageConnectionStatisticsTest#* -// 0-10 has different ideas about clientid and ownership of queues -org.apache.qpid.server.queue.ModelTest#* - // 0-10 is not supported by the MethodRegistry org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#* @@ -61,22 +55,16 @@ org.apache.qpid.server.queue.ProducerFlowControlTest#* //QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010. org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* -//QPID-2471 : Issues with 0-10 recovery -org.apache.qpid.test.unit.ack.RecoverTest#testRecoverInAutoAckListener -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverInAutoAckListener - -// Temporarily disabling until properly investigated. +//QPID-3421: tests are failing on 0.10 test profile org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* -//rollback with subscriptions does not work in 0-10 yet +//QPID-1864: rollback with subscriptions does not work in 0-10 yet org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage -org.apache.qpid.test.unit.ack.RecoverTest#testRecoverInAutoAckListener // This test uses 0-8 channel frames org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* -//Temporarily adding the following until the issues are sorted out. -//Should probably raise JIRAs for them. +//QPID-3422: test fails because ring queue is not implemented on java broker org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode // QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs. @@ -85,3 +73,12 @@ org.apache.qpid.server.failover.FailoverMethodTest#* // QPID-3392: the Java broker does not yet implement exchange creation arguments org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs + +// QPID-1935: the following tests are failing on 0.10 profiles +org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated +org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser + +// QPID-3432: These tests test the behaviour of 0-8..-0-9-1 specific system property (amqj.default_syncwrite_timeout) +org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* +org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* + diff --git a/qpid/java/test-profiles/JavaExcludes b/qpid/java/test-profiles/JavaExcludes index 2fc70e6e70..dcccce7e2f 100644 --- a/qpid/java/test-profiles/JavaExcludes +++ b/qpid/java/test-profiles/JavaExcludes @@ -17,31 +17,9 @@ // under the License. // -// Those tests are not finished -org.apache.qpid.test.testcases.TTLTest#* - // QPID-1823: this takes ages to run org.apache.qpid.client.SessionCreateTest#* -// related to QPID-2471. Temporarily disabling these tests until I figure out why they are failing with the Java broker. -org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgs -org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier -org.apache.qpid.test.unit.ack.RecoverTest#testAcknowledgePerConsumer -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgs -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgsAckOnEarlier -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testAcknowledgePerConsumer - -// related to QPID-2471. These are new test cases and fail with the Java broker. -org.apache.qpid.test.unit.ack.RecoverTest#testOderingWithAsyncConsumer -org.apache.qpid.test.unit.ack.RecoverTest#testOderingWithSyncConsumer -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWithAsyncConsumer -org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWithSyncConsumer - -org.apache.qpid.test.client.queue.LVQTest#* - -// Session resume is not supported in the Java client -org.apache.qpid.transport.ConnectionTest#testResumeNonemptyReplayBuffer - //QPID-2845: The queue policy types used by the C++ broker are not currently supported by the Java broker org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy @@ -50,43 +28,20 @@ org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy //Moved from JavaStandaloneExcludes when it was removed /////////////////////////////////////////////////////// +// QPID-3426: The following test is broken. // This is a long running test so should exclude from normal runs org.apache.qpid.test.client.failover.FailoverTest#test4MinuteFailover -// Those tests require failover support -org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserAutoAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserClientAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserDupsOkTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserNoAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserPreAckTest#testFailoverWithQueueBrowser -org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverAsQueueBrowserCreated -org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrowser -org.apache.qpid.test.testcases.FailoverTest#* -org.apache.qpid.test.client.failover.FailoverTest#* - -// InVM Broker tests awaiting resolution of QPID-1103 -org.apache.qpid.test.client.timeouts.SyncWaitDelayTest#* -org.apache.qpid.test.client.timeouts.SyncWaitTimeoutDelayTest#* - -// This test currently does not pick up the runtime location of the nonVm queueBacking store. -org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#* - // This test may use QpidTestCase but it is not using the getConnection and is hardwired to InVM org.apache.qpid.test.unit.client.connection.CloseAfterConnectionFailureTest#* -//QPID-1818 : Client code path does not correctly restore a transacted session after failover. +//QPID-1818, QPID-1821 : Client code path does not correctly restore a transacted session after failover. org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#* -// This test requires the standard configuration file for validation. -// Excluding here does not reduce test coverage. -org.apache.qpid.server.configuration.ServerConfigurationFileTest#* - +//QPID-3428: verification of unique client id does not work org.apache.qpid.test.unit.client.connection.ConnectionTest#testClientIDVerification + +//XA functionality is not fully implemented yet org.apache.qpid.jms.xa.XAResourceTest#* //The Java broker doesnt support client auth diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index 5d0c82c5d7..07c3f15a8f 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -27,7 +27,7 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend // QPID-2478 test fails when run against broker using 0-8/9 org.apache.qpid.test.client.message.JMSDestinationTest#testGetDestinationWithCustomExchange -// The new addressing based sytanx is not supported for AMQP 0-8/0-9 versions +// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy @@ -37,6 +37,22 @@ org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties org.apache.qpid.test.unit.message.UTF8Test#* org.apache.qpid.client.MessageListenerTest#testSynchronousReceiveNoWait +// Tests 0.10 client feature org.apache.qpid.test.unit.client.connection.ConnectionTest#testUnsupportedSASLMechanism +// uses AMQP 0-10 related properties org.apache.qpid.test.unit.message.JMSPropertiesTest#testQpidExtensionProperties + +// QPID-3034: tests are passing on 0.10 profiles but failing on 0.9.1 profiles +org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgs +org.apache.qpid.test.unit.ack.RecoverTest#testRecoverResendsMsgsAckOnEarlier +org.apache.qpid.test.unit.ack.RecoverTest#testAcknowledgePerConsumer +org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgs +org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testRecoverResendsMsgsAckOnEarlier +org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testAcknowledgePerConsumer +org.apache.qpid.test.unit.ack.RecoverTest#testOderingWithSyncConsumer +org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#testOderingWithSyncConsumer + + +// LVQ tests use new address syntax and can not be run on 0.9.1 profiles +org.apache.qpid.test.client.queue.LVQTest#* |