summaryrefslogtreecommitdiff
path: root/java/systests/src/main/java/org/apache/qpid
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /java/systests/src/main/java/org/apache/qpid
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src/main/java/org/apache/qpid')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java185
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java73
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java164
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java278
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java345
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java480
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java101
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java162
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java109
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java196
-rw-r--r--java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java128
-rw-r--r--java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java67
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java102
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java316
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java449
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java3
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java109
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/stats/StatisticsReportingTest.java (renamed from java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java)94
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java180
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java7
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java37
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java343
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java40
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java74
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java19
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java33
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java1
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java51
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java101
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java340
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java44
38 files changed, 1897 insertions, 2746 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 526db29181..4b766864b4 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -19,7 +19,13 @@
package org.apache.qpid.client.failover;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.test.utils.FailoverBaseCase;
@@ -36,10 +42,14 @@ import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TransactionRolledBackException;
+import javax.naming.NamingException;
+
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -760,6 +770,181 @@ public class FailoverBehaviourTest extends FailoverBaseCase implements Connectio
//got started, before allowing the test to tear down
awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
}
+
+ /**
+ * This test only tests 0-8/0-9/0-9-1 failover timeout
+ */
+ public void testFailoverHandlerTimeoutExpires() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+
+ // sleep interval exceeds failover timeout interval
+ Thread.sleep(11000l);
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ assertFalse("Unexpected failover", _failoverComplete.await(2000l, TimeUnit.MILLISECONDS));
+ assertTrue("Failover should not succeed due to timeout", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ public void testFailoverHandlerTimeoutReconnected() throws Exception
+ {
+ _connection.close();
+ setTestSystemProperty("qpid.failover_method_timeout", "10000");
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ // holding failover mutex should prevent the failover from proceeding
+ synchronized(connection.getFailoverMutex())
+ {
+ killBroker();
+ startBroker();
+ }
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(DEFAULT_FAILOVER_TIME);
+ assertFalse("Failover should restore connectivity", connection.isClosed());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ /**
+ * Tests that the producer flow control flag is reset when failover occurs while
+ * the producers are being blocked by the broker.
+ *
+ * Uses Java broker specific queue configuration to enabled PSFC.
+ */
+ public void testFlowControlFlagResetOnFailover() throws Exception
+ {
+ // we do not need the connection failing to second broker
+ _connection.close();
+
+ // make sure that failover timeout is bigger than flow control timeout
+ setTestSystemProperty("qpid.failover_method_timeout", "60000");
+ setTestSystemProperty("qpid.flow_control_wait_failure", "10000");
+
+ AMQConnection connection = null;
+ try
+ {
+ connection = createConnectionWithFailover();
+
+ final Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, getTestQueueName(), DEFAULT_MESSAGE_SIZE * 3, DEFAULT_MESSAGE_SIZE * 2);
+ final AtomicInteger counter = new AtomicInteger();
+ // try to send 5 messages (should block after 4)
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ MessageProducer producer = producerSession.createProducer(queue);
+ for (int i=0; i < 5; i++)
+ {
+ Message next = createNextMessage(producerSession, i);
+ producer.send(next);
+ producerSession.commit();
+ counter.incrementAndGet();
+ }
+ }
+ catch(Exception e)
+ {
+ // ignore
+ }
+ }
+ }).start();
+
+ long limit= 30000l;
+ long start = System.currentTimeMillis();
+
+ // wait until session is blocked
+ while(!((AMQSession<?,?>)producerSession).isFlowBlocked() && System.currentTimeMillis() - start < limit)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertTrue("Flow is not blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ // Message counter could be 3 or 4 depending on the progression of producing thread relative
+ // to the receipt of the ChannelFlow.
+ final int currentCounter = counter.get();
+ assertTrue("Unexpected number of sent messages", currentCounter == 3 || currentCounter == 4);
+
+ killBroker();
+ startBroker();
+
+ // allows the failover thread to proceed
+ Thread.yield();
+ awaitForFailoverCompletion(60000l);
+
+ assertFalse("Flow is blocked", ((AMQSession<?, ?>) producerSession).isFlowBlocked());
+ }
+ finally
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(new AMQShortString(queueName), true, true, false, arguments);
+ Queue queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + true
+ + "'&autodelete='" + true + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) queue);
+ return queue;
+ }
+
+ private AMQConnection createConnectionWithFailover() throws NamingException, JMSException
+ {
+ AMQConnection connection;
+ AMQConnectionFactory connectionFactory = (AMQConnectionFactory)getConnectionFactory("default");
+ ConnectionURL connectionURL = connectionFactory.getConnectionURL();
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER, "singlebroker");
+ connectionURL.setOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE, "2");
+ BrokerDetails details = connectionURL.getBrokerDetails(0);
+ details.setProperty(BrokerDetails.OPTIONS_RETRY, "200");
+ details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, "1000");
+
+ connection = (AMQConnection)connectionFactory.createConnection("admin", "admin");
+ connection.setConnectionListener(this);
+ return connection;
+ }
+
/**
* Tests {@link Session#close()} for session with given acknowledge mode
* to ensure that close works after failover.
diff --git a/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
index c4b1b08eea..787e727e66 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client.message;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.client.message;
* under the License.
*
*/
+package org.apache.qpid.client.message;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
diff --git a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
index 1cd088b736..39689f5096 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -46,6 +46,7 @@ public class SSLTest extends QpidBrokerTestCase
setTestClientSystemProperty("profile.use_ssl", "true");
setConfigurationProperty("connector.ssl.enabled", "true");
setConfigurationProperty("connector.ssl.sslOnly", "true");
+ setConfigurationProperty("connector.ssl.wantClientAuth", "true");
}
// set the ssl system properties
diff --git a/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java b/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
index ac29b72620..e18f70b01d 100644
--- a/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/jms/xa/XAResourceTest.java
@@ -24,21 +24,72 @@ import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.FileUtils;
+import org.apache.qpid.test.unit.xa.AbstractXATestCase;
+import org.apache.qpid.client.AMQXAResource;
+
+import org.apache.qpid.dtx.XidImpl;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
-public class XAResourceTest extends QpidBrokerTestCase
+public class XAResourceTest extends AbstractXATestCase
{
private static final String FACTORY_NAME = "default";
private static final String ALT_FACTORY_NAME = "connection2";
+ public void init() throws Exception
+ {
+ }
+
+ public void testIsSameRMJoin() throws Exception
+ {
+ XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
+ XAConnection conn1 = factory.createXAConnection("guest", "guest");
+ XAConnection conn2 = factory.createXAConnection("guest", "guest");
+ XAConnection conn3 = factory.createXAConnection("guest", "guest");
+
+ XASession session1 = conn1.createXASession();
+ XASession session2 = conn2.createXASession();
+ XASession session3 = conn3.createXASession();
+
+ AMQXAResource xaResource1 = (AMQXAResource)session1.getXAResource();
+ AMQXAResource xaResource2 = (AMQXAResource)session2.getXAResource();
+ AMQXAResource xaResource3 = (AMQXAResource)session3.getXAResource();
+
+ Xid xid = getNewXid();
+
+ xaResource1.start(xid, XAResource.TMNOFLAGS);
+ assertTrue("XAResource isSameRM", xaResource1.isSameRM(xaResource2));
+ xaResource2.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource1.getSiblings().size() == 1);
+
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource2.getSiblings().size() == 0);
+
+ assertTrue("XAResource isSameRM", xaResource2.isSameRM(xaResource3));
+
+
+ xaResource3.start(xid, XAResource.TMJOIN);
+ assertTrue("AMQXAResource siblings should be 1", xaResource2.getSiblings().size() == 1);
+
+ xaResource1.end(xid, XAResource.TMSUCCESS);
+ assertTrue("AMQXAResource TMJOIN resource siblings should be 0", xaResource1.getSiblings().size() == 0);
+
+ xaResource1.prepare(xid);
+ xaResource1.commit(xid, false);
+
+ conn3.close();
+ conn2.close();
+ conn1.close();
+ }
+
/*
* Test with multiple XAResources originating from the same connection factory. XAResource(s) will be equal,
- * as they originate from the same session.
+ * as they originate from the same session.
*/
public void testIsSameRMSingleCF() throws Exception
{
@@ -47,14 +98,14 @@ public class XAResourceTest extends QpidBrokerTestCase
XASession session = conn.createXASession();
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session.getXAResource();
-
+
assertEquals("XAResource objects not equal", xaResource1, xaResource2);
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
-
+
session.close();
conn.close();
}
-
+
/*
* Test with multiple XAResources originating from different connection factory's and different sessions. XAResources will not be
* equal as they do not originate from the same session. As the UUID from the broker will be the same, isSameRM will be true.
@@ -67,11 +118,11 @@ public class XAResourceTest extends QpidBrokerTestCase
XAConnectionFactory factory = new AMQConnectionFactory(url);
XAConnectionFactory factory2 = new AMQConnectionFactory(url);
XAConnectionFactory factory3 = getConnectionFactory(ALT_FACTORY_NAME);
-
+
XAConnection conn = factory.createXAConnection("guest","guest");
XAConnection conn2 = factory2.createXAConnection("guest","guest");
XAConnection conn3 = factory3.createXAConnection("guest","guest");
-
+
XASession session = conn.createXASession();
XASession session2 = conn2.createXASession();
XASession session3 = conn3.createXASession();
@@ -79,14 +130,14 @@ public class XAResourceTest extends QpidBrokerTestCase
XAResource xaResource1 = session.getXAResource();
XAResource xaResource2 = session2.getXAResource();
XAResource xaResource3 = session3.getXAResource();
-
+
assertFalse("XAResource objects should not be equal", xaResource1.equals(xaResource2));
assertTrue("isSameRM not true for identical objects", xaResource1.isSameRM(xaResource2));
assertFalse("isSameRM true for XA Resources created by two different brokers", xaResource1.isSameRM(xaResource3));
-
+
conn.close();
conn2.close();
- conn3.close();
+ conn3.close();
}
@Override
@@ -103,5 +154,5 @@ public class XAResourceTest extends QpidBrokerTestCase
FileUtils.deleteDirectory(System.getProperty("QPID_WORK") + "/" + getFailingPort());
}
}
-
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
deleted file mode 100644
index ba0f955d76..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedBrokerMBeanTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedExchange;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.management.MBeanException;
-import javax.management.ObjectName;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Tests the JMX API for the Managed Broker.
- *
- */
-public class ManagedBrokerMBeanTest extends QpidBrokerTestCase
-{
- /**
- * Test virtual host
- */
- private static final String VIRTUAL_HOST = "test";
-
- /**
- * Test exchange type
- */
- private static final String EXCHANGE_TYPE = "topic";
-
- /**
- * JMX helper.
- */
- private JMXTestUtils _jmxUtils;
- private ManagedBroker _managedBroker;
-
- public void setUp() throws Exception
- {
- _jmxUtils = new JMXTestUtils(this);
- _jmxUtils.setUp();
- super.setUp();
- _jmxUtils.open();
- _managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
- }
-
- public void tearDown() throws Exception
- {
- if (_jmxUtils != null)
- {
- _jmxUtils.close();
- }
- super.tearDown();
- }
-
- /**
- * Tests queue creation/deletion also verifying the automatic binding to the default exchange.
- */
- public void testCreateQueueAndDeletion() throws Exception
- {
- final String queueName = getTestQueueName();
- final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString());
-
- // Check that bind does not exist before queue creation
- assertFalse("Binding to " + queueName + " should not exist in default exchange before queue creation",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
-
- _managedBroker.createNewQueue(queueName, "testowner", true);
-
- // Ensure the queue exists
- assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName));
- assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
-
- // Now verify that the default exchange has been bound.
- assertTrue("Binding to " + queueName + " should exist in default exchange after queue creation",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
-
- // Now delete the queue
- _managedBroker.deleteQueue(queueName);
-
- // Finally ensure that the binding has been removed.
- assertFalse("Binding to " + queueName + " should not exist in default exchange after queue deletion",
- defaultExchange.bindings().containsKey(new String[] {queueName}));
- }
-
- /**
- * Tests exchange creation/deletion via JMX API.
- */
- public void testCreateExchangeAndUnregister() throws Exception
- {
- String exchangeName = getTestName();
- _managedBroker.createNewExchange(exchangeName, "topic", true);
- String queryString = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="
- + ObjectName.quote(VIRTUAL_HOST) + ",name=" + ObjectName.quote(exchangeName) + ",ExchangeType="
- + EXCHANGE_TYPE;
- ManagedExchange exchange = _jmxUtils.getManagedObject(ManagedExchange.class, queryString);
- assertNotNull("Exchange should exist", exchange);
-
- _managedBroker.unregisterExchange(exchangeName);
- assertFalse("Exchange should have been removed", _jmxUtils.isManagedObjectExist(queryString));
- }
-
- /**
- * Tests that it is disallowed to unregister the default exchange.
- */
- public void testUnregisterOfDefaultExchangeDisallowed() throws Exception
- {
- String defaultExchangeName = ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString();
-
- try
- {
- _managedBroker.unregisterExchange(defaultExchangeName);
- fail("Exception not thrown");
- }
- catch (MBeanException mbe)
- {
- // PASS
- assertEquals("Error in unregistering exchange " + defaultExchangeName, mbe.getMessage());
- assertTrue(mbe.getCause().getMessage().contains("Cannot unregister the default exchange"));
- }
- final ManagedExchange defaultExchange = _jmxUtils.getManagedExchange(defaultExchangeName);
- assertNotNull("Exchange should exist", defaultExchange);
- }
-
- /**
- * Tests queue creation with {@link AMQQueueFactory#X_QPID_MAXIMUM_DELIVERY_COUNT} argument. Also tests
- * that the attribute is exposed correctly through {@link ManagedQueue#getMaximumDeliveryCount()}.
- */
- public void testCreateQueueWithMaximumDeliveryCountSet() throws Exception
- {
- final String queueName = getName();
- final ManagedBroker managedBroker = _jmxUtils.getManagedBroker(VIRTUAL_HOST);
-
- final Integer deliveryCount = 1;
- final Map<String, Object> args = Collections.singletonMap(AMQQueueFactory.X_QPID_MAXIMUM_DELIVERY_COUNT, (Object)deliveryCount);
- managedBroker.createNewQueue(queueName, "testowner", true, args);
-
- // Ensure the queue exists
- assertNotNull("Queue object name expected to exist", _jmxUtils.getQueueObjectName("test", queueName));
- assertNotNull("Manager queue expected to be available", _jmxUtils.getManagedQueue(queueName));
-
- final ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
- assertEquals("Unexpected maximum delivery count", deliveryCount, managedQueue.getMaximumDeliveryCount());
- }
-
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
deleted file mode 100644
index 3fc370dc68..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedConnectionMBeanTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.management.jmx;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.JMException;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.TabularData;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-public class ManagedConnectionMBeanTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ManagedConnectionMBeanTest.class);
-
- /**
- * JMX helper.
- */
- private JMXTestUtils _jmxUtils;
- private Connection _connection;
-
- public void setUp() throws Exception
- {
- _jmxUtils = new JMXTestUtils(this);
- _jmxUtils.setUp();
- super.setUp();
- _jmxUtils.open();
- _connection = getConnection();
- }
-
- public void tearDown() throws Exception
- {
- if (_jmxUtils != null)
- {
- _jmxUtils.close();
- }
- super.tearDown();
- }
-
- public void testChannels() throws Exception
- {
- final String queueName = getTestQueueName();
-
- final Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- final Destination destination = session.createQueue(queueName);
- final MessageConsumer consumer = session.createConsumer(destination);
-
- final int numberOfMessages = 2;
- sendMessage(session, destination, numberOfMessages);
- _connection.start();
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- final Message m = consumer.receive(1000l);
- assertNotNull("Message " + i + " is not received", m);
- }
-
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
-
- TabularData channelsData = mBean.channels();
- assertNotNull("Channels data are null", channelsData);
- assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
-
- final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
- final CompositeDataSupport row = rowItr.next();
- Number unackCount = (Number) row.get(ManagedConnection.UNACKED_COUNT);
- final Boolean transactional = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
- final Boolean flowBlocked = (Boolean) row.get(ManagedConnection.FLOW_BLOCKED);
- assertNotNull("Channel should have unacknowledged messages", unackCount);
- assertEquals("Unexpected number of unacknowledged messages", 2, unackCount.intValue());
- assertNotNull("Channel should have transaction flag", transactional);
- assertTrue("Unexpected transaction flag", transactional);
- assertNotNull("Channel should have flow blocked flag", flowBlocked);
- assertFalse("Unexpected value of flow blocked flag", flowBlocked);
-
- final Date initialLastIOTime = mBean.getLastIoTime();
- session.commit();
- assertTrue("Last IO time should have been updated", mBean.getLastIoTime().after(initialLastIOTime));
-
- channelsData = mBean.channels();
- assertNotNull("Channels data are null", channelsData);
- assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
-
- final Iterator<CompositeDataSupport> rowItr2 = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
- final CompositeDataSupport row2 = rowItr2.next();
- unackCount = (Number) row2.get(ManagedConnection.UNACKED_COUNT);
- assertNotNull("Channel should have unacknowledged messages", unackCount);
- assertEquals("Unexpected number of anacknowledged messages", 0, unackCount.intValue());
-
- _connection.close();
-
- LOGGER.debug("Querying JMX for number of open connections");
- connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans after connection closed", 0, connections.size());
- }
-
- public void testCommit() throws Exception
- {
- final String queueName = getTestQueueName();
-
- final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
- final Destination destination = producerSession.createQueue(queueName);
- final MessageConsumer consumer = consumerSession.createConsumer(destination);
- final MessageProducer producer = producerSession.createProducer(destination);
-
- _connection.start();
-
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
-
- final int numberOfMessages = 2;
- for (int i = 0; i < numberOfMessages; i++)
- {
- producer.send(producerSession.createTextMessage("Test " + i));
- }
-
- // sync to make sure that messages are received on the broker
- // before we commit via JMX
- ((AMQSession<?, ?>) producerSession).sync();
-
- Message m = consumer.receive(500l);
- assertNull("Unexpected message received", m);
-
- Number channelId = getFirstTransactedChannelId(mBean, 2);
- mBean.commitTransactions(channelId.intValue());
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- m = consumer.receive(1000l);
- assertNotNull("Message " + i + " is not received", m);
- assertEquals("Unexpected message received at " + i, "Test " + i, ((TextMessage) m).getText());
- }
- producerSession.commit();
- m = consumer.receive(500l);
- assertNull("Unexpected message received", m);
- }
-
- protected Number getFirstTransactedChannelId(final ManagedConnection mBean, int channelNumber) throws IOException, JMException
- {
- TabularData channelsData = mBean.channels();
- assertNotNull("Channels data are null", channelsData);
- assertEquals("Unexpected number of rows in channel table", channelNumber, channelsData.size());
- final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
- while (rowItr.hasNext())
- {
- final CompositeDataSupport row = rowItr.next();
- Boolean transacted = (Boolean) row.get(ManagedConnection.TRANSACTIONAL);
- if (transacted.booleanValue())
- {
- return (Number) row.get(ManagedConnection.CHAN_ID);
- }
- }
- return null;
- }
-
- public void testRollback() throws Exception
- {
- final String queueName = getTestQueueName();
-
- final Session consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
- final Destination destination = producerSession.createQueue(queueName);
- final MessageConsumer consumer = consumerSession.createConsumer(destination);
- final MessageProducer producer = producerSession.createProducer(destination);
-
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
-
- final int numberOfMessages = 2;
- for (int i = 0; i < numberOfMessages; i++)
- {
- producer.send(producerSession.createTextMessage("Test " + i));
- }
-
- // sync to make sure that messages are received on the broker
- // before we rollback via JMX
- ((AMQSession<?, ?>) producerSession).sync();
-
- Number channelId = getFirstTransactedChannelId(mBean, 2);
- mBean.rollbackTransactions(channelId.intValue());
-
- Message m = consumer.receive(1000l);
- assertNull("Unexpected message received: " + String.valueOf(m), m);
-
- producerSession.commit();
-
- _connection.start();
- m = consumer.receive(1000l);
- assertNull("Unexpected message received after commit " + String.valueOf(m), m);
- }
-
- public void testAuthorisedId() throws Exception
- {
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
- assertEquals("Unexpected authorized id", "guest", mBean.getAuthorizedId());
- }
-
- public void testClientVersion() throws Exception
- {
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
-
- String expectedVersion = QpidProperties.getReleaseVersion();
- assertNotNull("version should not be null", expectedVersion);
- assertFalse("version should not be the empty string", expectedVersion.equals(""));
- assertFalse("version should not be the string 'null'", expectedVersion.equals("null"));
-
- assertEquals("Unexpected version", expectedVersion, mBean.getVersion());
- }
-
- public void testClientId() throws Exception
- {
- List<ManagedConnection> connections = _jmxUtils.getManagedConnections("test");
- assertNotNull("Connection MBean is not found", connections);
- assertEquals("Unexpected number of connection mbeans", 1, connections.size());
- final ManagedConnection mBean = connections.get(0);
- assertNotNull("Connection MBean is null", mBean);
-
- String expectedClientId = _connection.getClientID();
- assertNotNull("ClientId should not be null", expectedClientId);
- assertFalse("ClientId should not be the empty string", expectedClientId.equals(""));
- assertFalse("ClientId should not be the string 'null'", expectedClientId.equals("null"));
-
- assertEquals("Unexpected ClientId", expectedClientId, mBean.getClientId());
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
deleted file mode 100644
index 244e547e02..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagedQueueMBeanTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.commons.lang.time.FastDateFormat;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.management.common.mbeans.ManagedQueue;
-import org.apache.qpid.server.queue.AMQQueueMBean;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Tests the JMX API for the Managed Queue.
- *
- */
-public class ManagedQueueMBeanTest extends QpidBrokerTestCase
-{
- protected static final Logger LOGGER = Logger.getLogger(ManagedQueueMBeanTest.class);
-
- private JMXTestUtils _jmxUtils;
- private Connection _connection;
- private Session _session;
-
- private String _sourceQueueName;
- private String _destinationQueueName;
- private Destination _sourceQueue;
- private Destination _destinationQueue;
- private ManagedQueue _managedSourceQueue;
- private ManagedQueue _managedDestinationQueue;
-
- public void setUp() throws Exception
- {
- _jmxUtils = new JMXTestUtils(this);
- _jmxUtils.setUp();
-
- super.setUp();
- _sourceQueueName = getTestQueueName() + "_src";
- _destinationQueueName = getTestQueueName() + "_dest";
-
- _connection = getConnection();
- _connection.start();
-
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _sourceQueue = _session.createQueue(_sourceQueueName);
- _destinationQueue = _session.createQueue(_destinationQueueName);
- createQueueOnBroker(_sourceQueue);
- createQueueOnBroker(_destinationQueue);
-
- _jmxUtils.open();
-
- _managedSourceQueue = _jmxUtils.getManagedQueue(_sourceQueueName);
- _managedDestinationQueue = _jmxUtils.getManagedQueue(_destinationQueueName);
- }
-
- public void tearDown() throws Exception
- {
- if (_jmxUtils != null)
- {
- _jmxUtils.close();
- }
- super.tearDown();
- }
-
- /**
- * Tests {@link ManagedQueue#viewMessages(long, long)} interface.
- */
- public void testViewSingleMessage() throws Exception
- {
- final List<Message> sentMessages = sendMessage(_session, _sourceQueue, 1);
- syncSession(_session);
- final Message sentMessage = sentMessages.get(0);
-
- assertEquals("Unexpected queue depth", 1, _managedSourceQueue.getMessageCount().intValue());
-
- // Check the contents of the message
- final TabularData tab = _managedSourceQueue.viewMessages(1l, 1l);
- assertEquals("Unexpected number of rows in table", 1, tab.size());
- final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator();
-
- final CompositeData row1 = rowItr.next();
- assertNotNull("Message should have AMQ message id", row1.get(ManagedQueue.MSG_AMQ_ID));
- assertEquals("Unexpected queue position", 1l, row1.get(ManagedQueue.MSG_QUEUE_POS));
- assertEquals("Unexpected redelivered flag", Boolean.FALSE, row1.get(ManagedQueue.MSG_REDELIVERED));
-
- // Check the contents of header (encoded in a string array)
- final String[] headerArray = (String[]) row1.get(ManagedQueue.MSG_HEADER);
- assertNotNull("Expected message header array", headerArray);
- final Map<String, String> headers = headerArrayToMap(headerArray);
-
- final String expectedJMSMessageID = isBroker010() ? sentMessage.getJMSMessageID().replace("ID:", "") : sentMessage.getJMSMessageID();
- final String expectedFormattedJMSTimestamp = FastDateFormat.getInstance(AMQQueueMBean.JMSTIMESTAMP_DATETIME_FORMAT).format(sentMessage.getJMSTimestamp());
- assertEquals("Unexpected JMSMessageID within header", expectedJMSMessageID, headers.get("JMSMessageID"));
- assertEquals("Unexpected JMSPriority within header", String.valueOf(sentMessage.getJMSPriority()), headers.get("JMSPriority"));
- assertEquals("Unexpected JMSTimestamp within header", expectedFormattedJMSTimestamp, headers.get("JMSTimestamp"));
- }
-
- /**
- * Tests {@link ManagedQueue#moveMessages(long, long, String)} interface.
- */
- public void testMoveMessagesBetweenQueues() throws Exception
- {
- final int numberOfMessagesToSend = 10;
-
- sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
- syncSession(_session);
- assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
-
- // Move first three messages to destination
- long fromMessageId = amqMessagesIds.get(0);
- long toMessageId = amqMessagesIds.get(2);
- _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName);
-
- assertEquals("Unexpected queue depth on destination queue after first move", 3, _managedDestinationQueue.getMessageCount().intValue());
- assertEquals("Unexpected queue depth on source queue after first move", 7, _managedSourceQueue.getMessageCount().intValue());
-
- // Now move a further two messages to destination
- fromMessageId = amqMessagesIds.get(7);
- toMessageId = amqMessagesIds.get(8);
- _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName);
- assertEquals("Unexpected queue depth on destination queue after second move", 5, _managedDestinationQueue.getMessageCount().intValue());
- assertEquals("Unexpected queue depth on source queue after second move", 5, _managedSourceQueue.getMessageCount().intValue());
-
- assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8);
- }
-
- /**
- * Tests {@link ManagedQueue#copyMessages(long, long, String)} interface.
- */
- public void testCopyMessagesBetweenQueues() throws Exception
- {
- final int numberOfMessagesToSend = 10;
- sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
- syncSession(_session);
- assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
-
- // Copy first three messages to destination
- long fromMessageId = amqMessagesIds.get(0);
- long toMessageId = amqMessagesIds.get(2);
- _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
-
- assertEquals("Unexpected queue depth on destination queue after first copy", 3, _managedDestinationQueue.getMessageCount().intValue());
- assertEquals("Unexpected queue depth on source queue after first copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- // Now copy a further two messages to destination
- fromMessageId = amqMessagesIds.get(7);
- toMessageId = amqMessagesIds.get(8);
- _managedSourceQueue.copyMessages(fromMessageId, toMessageId, _destinationQueueName);
- assertEquals("Unexpected queue depth on destination queue after second copy", 5, _managedDestinationQueue.getMessageCount().intValue());
- assertEquals("Unexpected queue depth on source queue after second copy", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- assertMessageIndicesOn(_destinationQueue, 0, 1, 2, 7, 8);
- }
-
- public void testMoveMessagesBetweenQueuesWithActiveConsumerOnSourceQueue() throws Exception
- {
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
- Connection asyncConnection = getConnection();
- asyncConnection.start();
-
- final int numberOfMessagesToSend = 50;
- sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
- syncSession(_session);
- assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
-
- long fromMessageId = amqMessagesIds.get(0);
- long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1);
-
- CountDownLatch consumerReadToHalfwayLatch = new CountDownLatch(numberOfMessagesToSend / 2);
- AtomicInteger totalConsumed = new AtomicInteger(0);
- startAsyncConsumerOn(_sourceQueue, asyncConnection, consumerReadToHalfwayLatch, totalConsumed);
-
- boolean halfwayPointReached = consumerReadToHalfwayLatch.await(5000, TimeUnit.MILLISECONDS);
- assertTrue("Did not read half of messages within time allowed", halfwayPointReached);
-
- _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName);
-
- asyncConnection.stop();
-
- // The exact number of messages moved will be non deterministic, as the number of messages processed
- // by the consumer cannot be predicited. There is also the possibility that a message can remain
- // on the source queue. This situation will arise if a message has been acquired by the consumer, but not
- // yet delivered to the client application (i.e. MessageListener#onMessage()) when the Connection#stop() occurs.
- //
- // The number of messages moved + the number consumed + any messages remaining on source should
- // *always* be equal to the number we originally sent.
-
- int numberOfMessagesReadByConsumer = totalConsumed.intValue();
- int numberOfMessagesOnDestinationQueue = _managedDestinationQueue.getMessageCount().intValue();
- int numberOfMessagesRemainingOnSourceQueue = _managedSourceQueue.getMessageCount().intValue();
-
- LOGGER.debug("Async consumer read : " + numberOfMessagesReadByConsumer
- + " Number of messages moved to destination : " + numberOfMessagesOnDestinationQueue
- + " Number of messages remaining on source : " + numberOfMessagesRemainingOnSourceQueue);
- assertEquals("Unexpected number of messages after move", numberOfMessagesToSend, numberOfMessagesReadByConsumer + numberOfMessagesOnDestinationQueue + numberOfMessagesRemainingOnSourceQueue);
- }
-
- public void testMoveMessagesBetweenQueuesWithActiveConsumerOnDestinationQueue() throws Exception
- {
- setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
- Connection asyncConnection = getConnection();
- asyncConnection.start();
-
- final int numberOfMessagesToSend = 50;
- sendMessage(_session, _sourceQueue, numberOfMessagesToSend);
- syncSession(_session);
- assertEquals("Unexpected queue depth after send", numberOfMessagesToSend, _managedSourceQueue.getMessageCount().intValue());
-
- List<Long> amqMessagesIds = getAMQMessageIdsOn(_managedSourceQueue, 1, numberOfMessagesToSend);
- long fromMessageId = amqMessagesIds.get(0);
- long toMessageId = amqMessagesIds.get(numberOfMessagesToSend - 1);
-
- AtomicInteger totalConsumed = new AtomicInteger(0);
- CountDownLatch allMessagesConsumedLatch = new CountDownLatch(numberOfMessagesToSend);
- startAsyncConsumerOn(_destinationQueue, asyncConnection, allMessagesConsumedLatch, totalConsumed);
-
- _managedSourceQueue.moveMessages(fromMessageId, toMessageId, _destinationQueueName);
-
- allMessagesConsumedLatch.await(5000, TimeUnit.MILLISECONDS);
- assertEquals("Did not consume all messages from destination queue", numberOfMessagesToSend, totalConsumed.intValue());
- }
-
- private void startAsyncConsumerOn(Destination queue, Connection asyncConnection,
- final CountDownLatch requiredNumberOfMessagesRead, final AtomicInteger totalConsumed) throws Exception
- {
- Session session = asyncConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
- consumer.setMessageListener(new MessageListener()
- {
-
- @Override
- public void onMessage(Message arg0)
- {
- totalConsumed.incrementAndGet();
- requiredNumberOfMessagesRead.countDown();
- }
- });
- }
-
- private void assertMessageIndicesOn(Destination queue, int... expectedIndices) throws Exception
- {
- MessageConsumer consumer = _session.createConsumer(queue);
-
- for (int i : expectedIndices)
- {
- Message message = consumer.receive(1000);
- assertNotNull("Expected message with index " + i, message);
- assertEquals("Expected message with index " + i, i, message.getIntProperty(INDEX));
- }
-
- assertNull("Unexpected message encountered", consumer.receive(1000));
- }
-
- private List<Long> getAMQMessageIdsOn(ManagedQueue managedQueue, long startIndex, long endIndex) throws Exception
- {
- final SortedSet<Long> messageIds = new TreeSet<Long>();
-
- final TabularData tab = managedQueue.viewMessages(startIndex, endIndex);
- final Iterator<CompositeData> rowItr = (Iterator<CompositeData>) tab.values().iterator();
- while(rowItr.hasNext())
- {
- final CompositeData row = rowItr.next();
- long amqMessageId = (Long)row.get(ManagedQueue.MSG_AMQ_ID);
- messageIds.add(amqMessageId);
- }
-
- return new ArrayList<Long>(messageIds);
- }
-
- /**
- *
- * Utility method to convert array of Strings in the form x = y into a
- * map with key/value x =&gt; y.
- *
- */
- private Map<String,String> headerArrayToMap(final String[] headerArray)
- {
- final Map<String, String> headerMap = new HashMap<String, String>();
- final List<String> headerList = Arrays.asList(headerArray);
- for (Iterator<String> iterator = headerList.iterator(); iterator.hasNext();)
- {
- final String nameValuePair = iterator.next();
- final String[] nameValue = nameValuePair.split(" *= *", 2);
- headerMap.put(nameValue[0], nameValue[1]);
- }
- return headerMap;
- }
-
- private void createQueueOnBroker(Destination destination) throws JMSException
- {
- _session.createConsumer(destination).close(); // Create a consumer only to cause queue creation
- }
-
- private void syncSession(Session session) throws Exception
- {
- ((AMQSession<?,?>)session).sync();
- }
-
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
deleted file mode 100644
index 0e2875235f..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/ManagementActorLoggingTest.java
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-import org.apache.qpid.management.common.mbeans.ManagedExchange;
-import org.apache.qpid.server.logging.AbstractTestLogging;
-import org.apache.qpid.server.logging.subjects.AbstractTestLogSubject;
-import org.apache.qpid.test.utils.JMXTestUtils;
-
-import javax.jms.Connection;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.management.JMException;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Test class to test if any change in the broker JMX code is affesting the management console
- * There are some hardcoding of management feature names and parameter names to create a customized
- * look in the console.
- */
-public class ManagementActorLoggingTest extends AbstractTestLogging
-{
- private JMXTestUtils _jmxUtils;
- private boolean _closed = false;
-
- @Override
- public void setUp() throws Exception
- {
- _jmxUtils = new JMXTestUtils(this);
- _jmxUtils.setUp();
- super.setUp();
- _jmxUtils.open();
- }
-
- @Override
- public void tearDown() throws Exception
- {
- if(!_closed)
- {
- _jmxUtils.close();
- }
- super.tearDown();
- }
-
- /**
- * Description:
- * When a connected client has its connection closed via the Management Console this will be logged as a CON-1002 message.
- * Input:
- *
- * 1. Running Broker
- * 2. Connected Client
- * 3. Connection is closed via Management Console
- * Output:
- *
- * <date> CON-1002 : Close
- *
- * Validation Steps:
- * 4. The CON ID is correct
- * 5. This must be the last CON message for the Connection
- * 6. It must be preceded by a CON-1001 for this Connection
- *
- * @throws Exception - {@see ManagedConnection.closeConnection and #getConnection}
- * @throws java.io.IOException - if there is a problem reseting the log monitor
- */
- public void testConnectionCloseViaManagement() throws IOException, Exception
- {
- //Create a connection to the broker
- Connection connection = getConnection();
-
- // Monitor the connection for an exception being thrown
- // this should be a DisconnectionException but it is not this tests
- // job to valiate that. Only use the exception as a synchronisation
- // to check the log file for the Close message
- final CountDownLatch exceptionReceived = new CountDownLatch(1);
- connection.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException e)
- {
- //Failover being attempted.
- exceptionReceived.countDown();
- }
- });
-
- //Remove the connection close from any 0-10 connections
- _monitor.markDiscardPoint();
-
- // Get a managedConnection
- ManagedConnection mangedConnection = _jmxUtils.getManagedObject(ManagedConnection.class, "org.apache.qpid:type=VirtualHost.Connection,*");
-
- //Close the connection
- mangedConnection.closeConnection();
-
- //Wait for the connection to close
- assertTrue("Timed out waiting for conneciton to report close",
- exceptionReceived.await(2, TimeUnit.SECONDS));
-
- //Validate results
- List<String> results = waitAndFindMatches("CON-1002");
-
- assertEquals("Unexpected Connection Close count", 1, results.size());
- }
-
- /**
- * Description:
- * Exchange creation is possible from the Management Console.
- * When an exchanged is created in this way then a EXH-1001 create message
- * is expected to be logged.
- * Input:
- *
- * 1. Running broker
- * 2. Connected Management Console
- * 3. Exchange Created via Management Console
- * Output:
- *
- * EXH-1001 : Create : [Durable] Type:<value> Name:<value>
- *
- * Validation Steps:
- * 4. The EXH ID is correct
- * 5. The correct tags are present in the message based on the create options
- *
- * @throws java.io.IOException - if there is a problem reseting the log monitor
- * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue}
- */
- public void testCreateExchangeDirectTransientViaManagementConsole() throws IOException, JMException
- {
- _monitor.markDiscardPoint();
-
- _jmxUtils.createExchange("test", getName(), "direct", false);
-
- // Validate
-
- //1 - ID is correct
- List<String> results = waitAndFindMatches("EXH-1001");
-
- assertEquals("More than one exchange creation found", 1, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct exchange name
- assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName()));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
- public void testCreateExchangeTopicTransientViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous exchange declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createExchange("test", getName(), "topic", false);
-
- // Validate
-
- //1 - ID is correct
- List<String> results = waitAndFindMatches("EXH-1001");
-
- assertEquals("More than one exchange creation found", 1, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct exchange name
- assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName()));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
-
- }
-
- public void testCreateExchangeFanoutTransientViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous exchange declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createExchange("test", getName(), "fanout", false);
-
- // Validate
-
- //1 - ID is correct
- List<String> results = waitAndFindMatches("EXH-1001");
-
- assertEquals("More than one exchange creation found", 1, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct exchange name
- assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName()));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
-
- }
-
- public void testCreateExchangeHeadersTransientViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous exchange declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createExchange("test", getName(), "headers", false);
-
- // Validate
-
- //1 - ID is correct
- List<String> results = waitAndFindMatches("EXH-1001");
-
- assertEquals("More than one exchange creation found", 1, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct exchange name
- assertTrue("Incorrect exchange name created:" + log, log.endsWith(getName()));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
-
- }
-
- /**
- * Description:
- * Queue creation is possible from the Management Console. When a queue is created in this way then a QUE-1001 create message is expected to be logged.
- * Input:
- *
- * 1. Running broker
- * 2. Connected Management Console
- * 3. Queue Created via Management Console
- * Output:
- *
- * <date> QUE-1001 : Create : Transient Owner:<name>
- *
- * Validation Steps:
- * 4. The QUE ID is correct
- * 5. The correct tags are present in the message based on the create options
- *
- * @throws java.io.IOException - if there is a problem reseting the log monitor
- * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue}
- */
- public void testCreateQueueTransientViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createQueue("test", getName(), null, false);
-
- // Validate
-
- List<String> results = waitAndFindMatches("QUE-1001");
-
- assertEquals("More than one queue creation found", 1, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct queue name
- String subject = fromSubject(log);
- assertEquals("Incorrect queue name created", getName(), AbstractTestLogSubject.getSlice("qu", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
- /**
- * Description:
- * The ManagementConsole can be used to delete a queue. When this is done a QUE-1002 Deleted message must be logged.
- * Input:
- *
- * 1. Running Broker
- * 2. Queue created on the broker with no subscribers
- * 3. Management Console connected
- * 4. Queue is deleted via Management Console
- * Output:
- *
- * <date> QUE-1002 : Deleted
- *
- * Validation Steps:
- * 5. The QUE ID is correct
- *
- * @throws java.io.IOException - if there is a problem reseting the log monitor
- * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.deleteQueue}
- */
- public void testQueueDeleteViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createQueue("test", getName(), null, false);
-
- ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test");
-
- managedBroker.deleteQueue(getName());
-
- List<String> results = waitAndFindMatches("QUE-1002");
-
- assertEquals("More than one queue deletion found", 1, results.size());
-
- String log = getLog(results.get(0));
-
- // Validate correct binding
- String subject = fromSubject(log);
- assertEquals("Incorrect queue named in delete", getName(), AbstractTestLogSubject.getSlice("qu", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
-
- }
-
- /**
- * Description:
- * The binding of a Queue and an Exchange is done via a Binding. When this Binding is created via the Management Console a BND-1001 Create message will be logged.
- * Input:
- *
- * 1. Running Broker
- * 2. Connected Management Console
- * 3. Use Management Console to perform binding
- * Output:
- *
- * <date> BND-1001 : Create
- *
- * Validation Steps:
- * 4. The BND ID is correct
- * 5. This will be the first message for the given binding
- *
- * @throws java.io.IOException - if there is a problem reseting the log monitor
- * @throws javax.management.JMException - {@see #createQueue and ManagedExchange.createNewBinding}
- */
- public void testBindingCreateOnDirectViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createQueue("test", getName(), null, false);
-
- ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.direct");
-
- managedExchange.createNewBinding(getName(), getName());
-
- List<String> results = waitAndFindMatches("BND-1001");
-
- assertEquals("Unexpected number of bindings logged", 2, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct binding
- String subject = fromSubject(log);
- assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject));
- assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
- public void testBindingCreateOnTopicViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createQueue("test", getName(), null, false);
-
- ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.topic");
-
- managedExchange.createNewBinding(getName(), getName());
-
- List<String> results = waitAndFindMatches("BND-1001");
-
- assertEquals("Unexpected number of bindings logged", 2, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct binding
- String subject = fromSubject(log);
- assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject));
- assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
- public void testBindingCreateOnFanoutViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createQueue("test", getName(), null, false);
-
- ManagedExchange managedExchange = _jmxUtils.getManagedExchange("amq.fanout");
-
- managedExchange.createNewBinding(getName(), getName());
-
- List<String> results = waitAndFindMatches("BND-1001");
-
- assertEquals("Unexpected number of bindings logged", 2, results.size());
-
- String log = getLogMessage(results, 0);
-
- // Validate correct binding
- String subject = fromSubject(log);
- assertEquals("Incorrect queue named in create", getName(), AbstractTestLogSubject.getSlice("qu", subject));
- assertEquals("Incorrect routing key in create", getName(), AbstractTestLogSubject.getSlice("rk", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
- /**
- * Description:
- * Bindings can be deleted so that a queue can be rebound with a different set of values. This can be performed via the Management Console
- * Input:
- *
- * 1. Running Broker
- * 2. Management Console connected
- * 3. Management Console is used to perform unbind.
- * Output:
- *
- * <date> BND-1002 : Deleted
- *
- * Validation Steps:
- * 4. The BND ID is correct
- * 5. There must have been a BND-1001 Create message first.
- * 6. This will be the last message for the given binding
- *
- * @throws java.io.IOException - if there is a problem reseting the log monitor or an issue with the JMX Connection
- * @throws javax.management.JMException - {@see #createExchange and ManagedBroker.unregisterExchange}
- */
- public void testUnRegisterExchangeViaManagementConsole() throws IOException, JMException
- {
- //Remove any previous queue declares
- _monitor.markDiscardPoint();
-
- _jmxUtils.createExchange("test", getName(), "direct", false);
-
- ManagedBroker managedBroker = _jmxUtils.getManagedBroker("test");
-
- managedBroker.unregisterExchange(getName());
-
- List<String> results = waitAndFindMatches("EXH-1002");
-
- assertEquals("More than one exchange deletion found", 1, results.size());
-
- String log = getLog(results.get(0));
-
- // Validate correct binding
- String subject = fromSubject(log);
- assertEquals("Incorrect exchange named in delete", "direct/" + getName(), AbstractTestLogSubject.getSlice("ex", subject));
-
- // Validate it was a management actor.
- String actor = fromActor(log);
- assertTrue("Actor is not a manangement actor:" + actor, actor.startsWith("mng"));
- }
-
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
deleted file mode 100644
index 9465749226..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageConnectionStatisticsTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-
-import javax.jms.Connection;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Test enabling generation of message statistics on a per-connection basis.
- */
-public class MessageConnectionStatisticsTest extends MessageStatisticsTestCase
-{
- public void configureStatistics() throws Exception
- {
- // no statistics generation configured
- }
-
- /**
- * Test statistics on a single connection
- */
- public void testEnablingStatisticsPerConnection() throws Exception
- {
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
-
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- List<String> addresses = new ArrayList<String>();
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
-
- addresses.add(mc.getRemoteAddress());
- }
- assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
-
- Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
- test.start();
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- if (addresses.contains(mc.getRemoteAddress()))
- {
- continue;
- }
- mc.setStatisticsEnabled(true);
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- }
-
- sendUsing(test, 5, 200);
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- if (addresses.contains(mc.getRemoteAddress()))
- {
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
- }
- else
- {
- assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
- assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
- }
- }
- assertEquals("Incorrect vhost total", 0, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
- assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
-
- test.close();
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
deleted file mode 100644
index 383c4c00a8..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsConfigurationTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-
-/**
- * Test enabling generation of message statistics on a per-connection basis.
- */
-public class MessageStatisticsConfigurationTest extends MessageStatisticsTestCase
-{
- public void configureStatistics() throws Exception
- {
- setConfigurationProperty("statistics.generation.broker", Boolean.toString(getName().contains("Broker")));
- setConfigurationProperty("statistics.generation.virtualhosts", Boolean.toString(getName().contains("Virtualhost")));
- setConfigurationProperty("statistics.generation.connections", Boolean.toString(getName().contains("Connection")));
- }
-
- /**
- * Just broker statistics.
- */
- public void testGenerateBrokerStatistics() throws Exception
- {
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
- }
-
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
- assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
- assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
-
- assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
- assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
- }
-
- /**
- * Just virtualhost statistics.
- */
- public void testGenerateVirtualhostStatistics() throws Exception
- {
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
- }
-
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
- assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
- assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
-
- assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived());
- assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
- }
-
- /**
- * Just connection statistics.
- */
- public void testGenerateConnectionStatistics() throws Exception
- {
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
- assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
- }
-
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
- assertEquals("Incorrect vhost data", 0, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 0, vhost.getTotalDataReceived());
- assertFalse("Vhost statistics should not be enabled", vhost.isStatisticsEnabled());
-
- assertEquals("Incorrect server total messages", 0, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 0, _jmxUtils.getServerInformation().getTotalDataReceived());
- assertFalse("Server statistics should not be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
- }
-
- /**
- * Both broker and virtualhost statistics.
- */
- public void testGenerateBrokerAndVirtualhostStatistics() throws Exception
- {
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 0, mc.getTotalDataReceived());
- assertFalse("Connection statistics should not be enabled", mc.isStatisticsEnabled());
- }
-
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
- assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
- assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
-
- assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
- assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
- }
-
- /**
- * Broker, virtualhost and connection statistics.
- */
- public void testGenerateBrokerVirtualhostAndConnectionStatistics() throws Exception
- {
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection total", 5, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection data", 1000, mc.getTotalDataReceived());
- assertTrue("Connection statistics should be enabled", mc.isStatisticsEnabled());
- }
-
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
- assertEquals("Incorrect vhost data", 5, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost data", 1000, vhost.getTotalDataReceived());
- assertTrue("Vhost statistics should be enabled", vhost.isStatisticsEnabled());
-
- assertEquals("Incorrect server total messages", 5, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 1000, _jmxUtils.getServerInformation().getTotalDataReceived());
- assertTrue("Server statistics should be enabled", _jmxUtils.getServerInformation().isStatisticsEnabled());
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java
deleted file mode 100644
index bdfd1e2c14..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsDeliveryTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Test statistics for delivery and receipt.
- */
-public class MessageStatisticsDeliveryTest extends MessageStatisticsTestCase
-{
- public void configureStatistics() throws Exception
- {
- setConfigurationProperty("statistics.generation.broker", "true");
- setConfigurationProperty("statistics.generation.virtualhosts", "true");
- setConfigurationProperty("statistics.generation.connections", "true");
- }
-
- public void testDeliveryAndReceiptStatistics() throws Exception
- {
- ManagedBroker vhost = _jmxUtils.getManagedBroker("test");
-
- sendUsing(_test, 5, 200);
- Thread.sleep(1000);
-
- List<String> addresses = new ArrayList<String>();
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered());
- assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered());
- assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived());
-
- addresses.add(mc.getRemoteAddress());
- }
-
- assertEquals("Incorrect vhost delivery total", 0, vhost.getTotalMessagesDelivered());
- assertEquals("Incorrect vhost delivery data", 0, vhost.getTotalDataDelivered());
- assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived());
-
- Connection test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
- test.start();
- receiveUsing(test, 5);
-
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- if (addresses.contains(mc.getRemoteAddress()))
- {
- assertEquals("Incorrect connection delivery total", 0, mc.getTotalMessagesDelivered());
- assertEquals("Incorrect connection delivery data", 0, mc.getTotalDataDelivered());
- assertEquals("Incorrect connection receipt total", 5, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection receipt data", 1000, mc.getTotalDataReceived());
- }
- else
- {
- assertEquals("Incorrect connection delivery total", 5, mc.getTotalMessagesDelivered());
- assertEquals("Incorrect connection delivery data", 1000, mc.getTotalDataDelivered());
- assertEquals("Incorrect connection receipt total", 0, mc.getTotalMessagesReceived());
- assertEquals("Incorrect connection receipt data", 0, mc.getTotalDataReceived());
- }
- }
- assertEquals("Incorrect vhost delivery total", 5, vhost.getTotalMessagesDelivered());
- assertEquals("Incorrect vhost delivery data", 1000, vhost.getTotalDataDelivered());
- assertEquals("Incorrect vhost receipt total", 5, vhost.getTotalMessagesReceived());
- assertEquals("Incorrect vhost receipt data", 1000, vhost.getTotalDataReceived());
-
- test.close();
- }
-
- protected void receiveUsing(Connection con, int number) throws Exception
- {
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createQueue(session);
- MessageConsumer consumer = session.createConsumer(_queue);
- for (int i = 0; i < number; i++)
- {
- Message msg = consumer.receive(1000);
- assertNotNull("Message " + i + " was not received", msg);
- }
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
deleted file mode 100644
index de4567624d..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.management.common.mbeans.ManagedBroker;
-import org.apache.qpid.management.common.mbeans.ManagedConnection;
-
-import javax.jms.Connection;
-
-/**
- * Test generation of message statistics.
- */
-public class MessageStatisticsTest extends MessageStatisticsTestCase
-{
- public void configureStatistics() throws Exception
- {
- setConfigurationProperty("statistics.generation.broker", "true");
- setConfigurationProperty("statistics.generation.virtualhosts", "true");
- setConfigurationProperty("statistics.generation.connections", "true");
- }
-
- /**
- * Test message totals.
- */
- public void testMessageTotals() throws Exception
- {
- sendUsing(_test, 10, 100);
- sendUsing(_dev, 20, 100);
- sendUsing(_local, 5, 100);
- sendUsing(_local, 5, 100);
- sendUsing(_local, 5, 100);
- Thread.sleep(2000);
-
- ManagedBroker test = _jmxUtils.getManagedBroker("test");
- ManagedBroker dev = _jmxUtils.getManagedBroker("development");
- ManagedBroker local = _jmxUtils.getManagedBroker("localhost");
-
- if (!isBroker010())
- {
- long total = 0;
- long data = 0;
- for (ManagedConnection mc : _jmxUtils.getAllManagedConnections())
- {
- total += mc.getTotalMessagesReceived();
- data += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect connection total", 45, total);
- assertEquals("Incorrect connection data", 4500, data);
- }
- assertEquals("Incorrect server total", 45, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server data", 4500, _jmxUtils.getServerInformation().getTotalDataReceived());
-
- if (!isBroker010())
- {
- long testTotal = 0;
- long testData = 0;
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- testTotal += mc.getTotalMessagesReceived();
- testData += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect test connection total", 10, testTotal);
- assertEquals("Incorrect test connection data", 1000, testData);
- }
- assertEquals("Incorrect test vhost total", 10, test.getTotalMessagesReceived());
- assertEquals("Incorrect test vhost data", 1000, test.getTotalDataReceived());
-
- if (!isBroker010())
- {
- long devTotal = 0;
- long devData = 0;
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("development"))
- {
- devTotal += mc.getTotalMessagesReceived();
- devData += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect test connection total", 20, devTotal);
- assertEquals("Incorrect test connection data", 2000, devData);
- }
- assertEquals("Incorrect development total", 20, dev.getTotalMessagesReceived());
- assertEquals("Incorrect development data", 2000, dev.getTotalDataReceived());
-
- if (!isBroker010())
- {
- long localTotal = 0;
- long localData = 0;
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("localhost"))
- {
- localTotal += mc.getTotalMessagesReceived();
- localData += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect test connection total", 15, localTotal);
- assertEquals("Incorrect test connection data", 1500, localData);
- }
- assertEquals("Incorrect localhost total", 15, local.getTotalMessagesReceived());
- assertEquals("Incorrect localhost data", 1500, local.getTotalDataReceived());
- }
-
- /**
- * Test message totals when a connection is closed.
- */
- public void testMessageTotalsWithClosedConnections() throws Exception
- {
- Connection temp = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
- temp.start();
-
- sendUsing(_test, 10, 100);
- sendUsing(temp, 10, 100);
- sendUsing(_test, 10, 100);
- Thread.sleep(2000);
-
- temp.close();
-
- ManagedBroker test = _jmxUtils.getManagedBroker("test");
-
- if (!isBroker010())
- {
- long total = 0;
- long data = 0;
- for (ManagedConnection mc : _jmxUtils.getAllManagedConnections())
- {
- total += mc.getTotalMessagesReceived();
- data += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect active connection total", 20, total);
- assertEquals("Incorrect active connection data", 2000, data);
- }
- assertEquals("Incorrect server total", 30, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server data", 3000, _jmxUtils.getServerInformation().getTotalDataReceived());
-
- if (!isBroker010())
- {
- long testTotal = 0;
- long testData = 0;
- for (ManagedConnection mc : _jmxUtils.getManagedConnections("test"))
- {
- testTotal += mc.getTotalMessagesReceived();
- testData += mc.getTotalDataReceived();
- }
- assertEquals("Incorrect test active connection total", 20, testTotal);
- assertEquals("Incorrect test active connection data", 20 * 100, testData);
- }
- assertEquals("Incorrect test vhost total", 30, test.getTotalMessagesReceived());
- assertEquals("Incorrect test vhost data", 30 * 100, test.getTotalDataReceived());
- }
-
- /**
- * Test message totals when a vhost has its statistics reset
- */
- public void testMessageTotalVhostReset() throws Exception
- {
- sendUsing(_test, 10, 10);
- sendUsing(_dev, 10, 10);
- Thread.sleep(2000);
-
- ManagedBroker test = _jmxUtils.getManagedBroker("test");
- ManagedBroker dev = _jmxUtils.getManagedBroker("development");
-
- assertEquals("Incorrect test vhost total messages", 10, test.getTotalMessagesReceived());
- assertEquals("Incorrect test vhost total data", 100, test.getTotalDataReceived());
- assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived());
- assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived());
-
- assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived());
-
- test.resetStatistics();
-
- assertEquals("Incorrect test vhost total messages", 0, test.getTotalMessagesReceived());
- assertEquals("Incorrect test vhost total data", 0, test.getTotalDataReceived());
- assertEquals("Incorrect dev vhost total messages", 10, dev.getTotalMessagesReceived());
- assertEquals("Incorrect dev vhost total data", 100, dev.getTotalDataReceived());
-
- assertEquals("Incorrect server total messages", 20, _jmxUtils.getServerInformation().getTotalMessagesReceived());
- assertEquals("Incorrect server total data", 200, _jmxUtils.getServerInformation().getTotalDataReceived());
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java b/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
deleted file mode 100644
index 45200ba476..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsTestCase.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.management.jmx;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-/**
- * Test generation of message statistics.
- */
-public abstract class MessageStatisticsTestCase extends QpidBrokerTestCase
-{
- protected static final String USER = "admin";
-
- protected JMXTestUtils _jmxUtils;
- protected Connection _test, _dev, _local;
- protected String _queueName = "statistics";
- protected Destination _queue;
- protected String _brokerUrl;
-
- @Override
- public void setUp() throws Exception
- {
- _jmxUtils = new JMXTestUtils(this, USER, USER);
- _jmxUtils.setUp();
-
- configureStatistics();
-
- super.setUp();
-
- _brokerUrl = getBroker().toString();
- _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
- _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development");
- _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost");
-
- _test.start();
- _dev.start();
- _local.start();
-
- _jmxUtils.open();
- }
-
- protected void createQueue(Session session) throws AMQException, JMSException
- {
- _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName);
- if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue))
- {
- ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null);
- ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName));
- }
- }
-
-
- @Override
- public void tearDown() throws Exception
- {
- _jmxUtils.close();
-
- _test.close();
- _dev.close();
- _local.close();
-
- super.tearDown();
- }
-
- /**
- * Configure statistics generation properties on the broker.
- */
- public abstract void configureStatistics() throws Exception;
-
- protected void sendUsing(Connection con, int number, int size) throws Exception
- {
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- createQueue(session);
- MessageProducer producer = session.createProducer(_queue);
- String content = new String(new byte[size]);
- TextMessage msg = session.createTextMessage(content);
- for (int i = 0; i < number; i++)
- {
- producer.send(msg);
- }
- }
-
- /**
- * Asserts that the actual value is within the expected value plus or
- * minus the given error.
- */
- public void assertApprox(String message, double error, double expected, double actual)
- {
- double min = expected * (1.0d - error);
- double max = expected * (1.0d + error);
- String assertion = String.format("%s: expected %f +/- %d%%, actual %f",
- message, expected, (int) (error * 100.0d), actual);
- assertTrue(assertion, actual > min && actual < max);
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java b/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java
new file mode 100644
index 0000000000..c8116d8cef
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/ra/QpidRAXAResourceTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ra;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+import org.apache.qpid.client.AMQXAResource;
+
+import org.apache.qpid.ra.QpidRAConnectionFactoryImpl;
+import org.apache.qpid.ra.QpidRAManagedConnectionFactory;
+import org.apache.qpid.ra.QpidResourceAdapter;
+
+public class QpidRAXAResourceTest extends QpidBrokerTestCase
+{
+ private static final String FACTORY_NAME = "default";
+ private static final String BROKER_PORT = "15672";
+ private static final String URL = "amqp://guest:guest@client/test?brokerlist='tcp://localhost:" + BROKER_PORT + "?sasl_mechs='PLAIN''";
+
+ public void testXAResourceIsSameRM() throws Exception
+ {
+ QpidResourceAdapter ra = new QpidResourceAdapter();
+ QpidRAManagedConnectionFactory mcf = new QpidRAManagedConnectionFactory();
+ mcf.setConnectionURL(URL);
+ mcf.setResourceAdapter(ra);
+ QpidRAManagedConnection mc = (QpidRAManagedConnection)mcf.createManagedConnection(null, null);
+ AMQXAResource xa1 = (AMQXAResource)mc.getXAResource();
+
+ XAConnectionFactory factory = getConnectionFactory(FACTORY_NAME);
+ XAConnection connection = factory.createXAConnection("guest", "guest");
+ XASession s2 = connection.createXASession();
+ AMQXAResource xaResource = (AMQXAResource)connection.createXASession().getXAResource();
+
+ assertTrue("QpidRAXAResource and XAResource should be from the same RM", xa1.isSameRM(xaResource));
+ assertTrue("XAResource and QpidRAXAResource should be from the same RM", xaResource.isSameRM(xa1));
+
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java b/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
index 0e3a658e32..e8d72c13bd 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/SupportedProtocolVersionsTest.java
@@ -38,6 +38,13 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
// No-op, we call super.setUp() from test methods after appropriate config overrides
}
+ private void clearProtocolSupportManipulations()
+ {
+ //Remove the QBTC provided protocol manipulations, giving only the protocols which default to enabled
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES, null);
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES, null);
+ }
+
/**
* Test that 0-10, 0-9-1, 0-9, and 0-8 support is present when no
* attempt has yet been made to disable them, and forcing the client
@@ -46,7 +53,8 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
*/
public void testDefaultProtocolSupport() throws Exception
{
- //Start the broker without modifying its supported protocols
+ clearProtocolSupportManipulations();
+
super.setUp();
//Verify requesting a 0-10 connection works
@@ -74,11 +82,13 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
connection.close();
}
- public void testDisabling010() throws Exception
+ public void testDisabling010and10() throws Exception
{
- //disable 0-10 support
- setConfigurationProperty("connector.amqp10enabled", "false");
- setConfigurationProperty("connector.amqp010enabled", "false");
+ clearProtocolSupportManipulations();
+
+ //disable 0-10 and 1-0 support
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
super.setUp();
@@ -90,9 +100,11 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
connection.close();
}
- public void testDisabling091and010() throws Exception
+ public void testDisabling091and010and10() throws Exception
{
- //disable 0-91 and 0-10 support
+ clearProtocolSupportManipulations();
+
+ //disable 0-91 and 0-10 and 1-0 support
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
@@ -107,9 +119,11 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
connection.close();
}
- public void testDisabling09and091and010() throws Exception
+ public void testDisabling09and091and010and10() throws Exception
{
- //disable 0-9, 0-91 and 0-10 support
+ clearProtocolSupportManipulations();
+
+ //disable 0-9, 0-91, 0-10 and 1-0 support
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP09ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP091ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
@@ -127,6 +141,8 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
public void testConfiguringReplyingToUnsupported010ProtocolInitiationWith09insteadOf091() throws Exception
{
+ clearProtocolSupportManipulations();
+
//disable 0-10 support, and set the default unsupported protocol initiation reply to 0-9
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_9");
@@ -147,4 +163,72 @@ public class SupportedProtocolVersionsTest extends QpidBrokerTestCase
assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion());
connection.close();
}
+
+ public void testProtocolInclusionThroughQBTCSystemPropertiesOverridesProtocolExclusion() throws Exception
+ {
+ testProtocolInclusionOverridesProtocolExclusion(false);
+ }
+
+ public void testProtocolInclusionThroughConfigOverridesProtocolExclusion() throws Exception
+ {
+ testProtocolInclusionOverridesProtocolExclusion(true);
+ }
+
+ private void testProtocolInclusionOverridesProtocolExclusion(boolean useConfig) throws Exception
+ {
+ clearProtocolSupportManipulations();
+
+ //selectively exclude 0-10 and 1-0 on the test port
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_EXCLUDES,"--exclude-0-10 @PORT --exclude-1-0 @PORT");
+
+ super.setUp();
+
+ //Verify initially requesting a 0-10 connection negotiates a 0-9-1 connection
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ AMQConnection connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_91, connection.getProtocolVersion());
+ connection.close();
+
+ stopBroker();
+
+ if(useConfig)
+ {
+ //selectively include 0-10 support again on the test port through config
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort()));
+ }
+ else
+ {
+ //selectively include 0-10 support again on the test port through QBTC sys props
+ setTestSystemProperty(QpidBrokerTestCase.BROKER_PROTOCOL_INCLUDES,"--include-0-10 @PORT");
+ }
+
+ startBroker();
+
+ //Verify requesting a 0-10 connection now returns one
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion());
+ connection.close();
+ }
+
+ public void testProtocolInclusionOverridesProtocolDisabling() throws Exception
+ {
+ clearProtocolSupportManipulations();
+
+ //disable 0-10 and 1-0
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP010ENABLED, "false");
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_AMQP10ENABLED, "false");
+
+ //selectively include 0-10 support again on the test port
+ setConfigurationProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, String.valueOf(getPort()));
+
+ super.setUp();
+
+ //Verify initially requesting a 0-10 connection still works
+ setTestClientSystemProperty(ClientProperties.AMQP_VERSION, "0-10");
+ AMQConnection connection = (AMQConnection) getConnection();
+ assertEquals("Unexpected protocol version in use", ProtocolVersion.v0_10, connection.getProtocolVersion());
+ connection.close();
+ }
+
} \ No newline at end of file
diff --git a/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java b/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java
deleted file mode 100644
index c8a6d02761..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/server/logging/ManagementLoggingTest.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.logging;
-
-
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.test.utils.JMXTestUtils;
-import org.apache.qpid.util.LogMonitor;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * Management Console Test Suite
- *
- * The Management Console test suite validates that the follow log messages as specified in the Functional Specification.
- *
- * This suite of tests validate that the management console messages occur correctly and according to the following format:
- *
- * MNG-1001 : Startup
- * MNG-1002 : Starting : <service> : Listening on port <Port>
- * MNG-1003 : Shutting down : <service> : port <Port>
- * MNG-1004 : Ready
- * MNG-1005 : Stopped
- * MNG-1006 : Using SSL Keystore : <path>
- * MNG-1007 : Open : User <username>
- * MNG-1008 : Close : User <username>
- */
-public class ManagementLoggingTest extends AbstractTestLogging
-{
- private static final String MNG_PREFIX = "MNG-";
-
- public void setUp() throws Exception
- {
- setLogMessagePrefix();
-
- // We either do this here or have a null check in tearDown.
- // As when this test is run against profiles other than java it will NPE
- _monitor = new LogMonitor(_outputFile);
- //We explicitly do not call super.setUp as starting up the broker is
- //part of the test case.
-
- }
-
- /**
- * Description:
- * Using the startup configuration validate that the management startup
- * message is logged correctly.
- * Input:
- * Standard configuration with management enabled
- * Output:
- *
- * <date> MNG-1001 : Startup
- *
- * Constraints:
- * This is the FIRST message logged by MNG
- * Validation Steps:
- *
- * 1. The BRK ID is correct
- * 2. This is the FIRST message logged by MNG
- */
- public void testManagementStartupEnabled() throws Exception
- {
- // This test only works on java brokers
- if (isJavaBroker())
- {
- startBrokerAndCreateMonitor(true, false);
-
- // Ensure we have received the MNG log msg.
- waitForMessage("MNG-1001");
-
- List<String> results = findMatches(MNG_PREFIX);
- // Validation
-
- assertTrue("MNGer message not logged", results.size() > 0);
-
- String log = getLogMessage(results, 0);
-
- //1
- validateMessageID("MNG-1001", log);
-
- //2
- //There will be 2 copies of the startup message (one via SystemOut, and one via Log4J)
- results = findMatches("MNG-1001");
- assertEquals("Unexpected startup message count.",
- 2, results.size());
-
- //3
- assertEquals("Startup log message is not 'Startup'.", "Startup",
- getMessageString(log));
- }
- }
-
- /**
- * Description:
- * Verify that when management is disabled in the configuration file the
- * startup message is not logged.
- * Input:
- * Standard configuration with management disabled
- * Output:
- * NO MNG messages
- * Validation Steps:
- *
- * 1. Validate that no MNG messages are produced.
- */
- public void testManagementStartupDisabled() throws Exception
- {
- if (isJavaBroker())
- {
- startBrokerAndCreateMonitor(false, false);
-
- List<String> results = findMatches(MNG_PREFIX);
- // Validation
-
- assertEquals("MNGer messages logged", 0, results.size());
- }
- }
-
- /**
- * The two MNG-1002 messages are logged at the same time so lets test them
- * at the same time.
- *
- * Description:
- * Using the default configuration validate that the RMI Registry socket is
- * correctly reported as being opened
- *
- * Input:
- * The default configuration file
- * Output:
- *
- * <date> MESSAGE MNG-1002 : Starting : RMI Registry : Listening on port 8999
- *
- * Constraints:
- * The RMI ConnectorServer and Registry log messages do not have a prescribed order
- * Validation Steps:
- *
- * 1. The MNG ID is correct
- * 2. The specified port is the correct '8999'
- *
- * Description:
- * Using the default configuration validate that the RMI ConnectorServer
- * socket is correctly reported as being opened
- *
- * Input:
- * The default configuration file
- * Output:
- *
- * <date> MESSAGE MNG-1002 : Starting : RMI ConnectorServer : Listening on port 9099
- *
- * Constraints:
- * The RMI ConnectorServer and Registry log messages do not have a prescribed order
- * Validation Steps:
- *
- * 1. The MNG ID is correct
- * 2. The specified port is the correct '9099'
- */
- public void testManagementStartupRMIEntries() throws Exception
- {
- if (isJavaBroker())
- {
- startBrokerAndCreateMonitor(true, false);
-
- List<String> results = waitAndFindMatches("MNG-1002");
- // Validation
-
- //There will be 4 startup messages (two via SystemOut, and two via Log4J)
- assertEquals("Unexpected MNG-1002 message count", 4, results.size());
-
- String log = getLogMessage(results, 0);
-
- //1
- validateMessageID("MNG-1002", log);
-
- //Check the RMI Registry port is as expected
- int mPort = getManagementPort(getPort());
- assertTrue("RMI Registry port not as expected(" + mPort + ").:" + getMessageString(log),
- getMessageString(log).endsWith(String.valueOf(mPort)));
-
- log = getLogMessage(results, 2);
-
- //1
- validateMessageID("MNG-1002", log);
-
- // We expect the RMI Registry port (the defined 'management port') to be
- // 100 lower than the JMX RMIConnector Server Port (the actual JMX server)
- int jmxPort = mPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET;
- assertTrue("JMX RMIConnectorServer port not as expected(" + jmxPort + ").:" + getMessageString(log),
- getMessageString(log).endsWith(String.valueOf(jmxPort)));
- }
- }
-
- /**
- * Description:
- * Using the default configuration with SSL enabled for the management port the SSL Keystore path should be reported via MNG-1006
- * Input:
- * Management SSL enabled default configuration.
- * Output:
- *
- * <date> MESSAGE MNG-1006 : Using SSL Keystore : test_resources/ssl/keystore.jks
- *
- * Validation Steps:
- *
- * 1. The MNG ID is correct
- * 2. The keystore path is as specified in the configuration
- */
- public void testManagementStartupSSLKeystore() throws Exception
- {
- if (isJavaBroker())
- {
- startBrokerAndCreateMonitor(true, true);
-
- List<String> results = waitAndFindMatches("MNG-1006");
-
- assertTrue("MNGer message not logged", results.size() > 0);
-
- String log = getLogMessage(results, 0);
-
- //1
- validateMessageID("MNG-1006", log);
-
- // Validate we only have two MNG-1002 (one via stdout, one via log4j)
- results = findMatches("MNG-1006");
- assertEquals("Upexpected SSL Keystore message count",
- 2, results.size());
-
- // Validate the keystore path is as expected
- assertTrue("SSL Keystore entry expected.:" + getMessageString(log),
- getMessageString(log).endsWith(new File(getConfigurationStringProperty("management.ssl.keyStorePath")).getName()));
- }
- }
-
- /**
- * Description: Tests the management connection open/close are logged correctly.
- *
- * Output:
- *
- * <date> MESSAGE MNG-1007 : Open : User <username>
- * <date> MESSAGE MNG-1008 : Close : User <username>
- *
- * Validation Steps:
- *
- * 1. The MNG ID is correct
- * 2. The message and username are correct
- */
- public void testManagementUserOpenClose() throws Exception
- {
- if (isJavaBroker())
- {
- startBrokerAndCreateMonitor(true, false);
-
- final JMXTestUtils jmxUtils = new JMXTestUtils(this);
- List<String> openResults = null;
- List<String> closeResults = null;
- try
- {
- jmxUtils.setUp();
- jmxUtils.open();
- openResults = waitAndFindMatches("MNG-1007");
- }
- finally
- {
- if (jmxUtils != null)
- {
- jmxUtils.close();
- closeResults = waitAndFindMatches("MNG-1008");
- }
- }
-
- assertNotNull("Management Open results null", openResults.size());
- assertEquals("Management Open logged unexpected number of times", 1, openResults.size());
-
- assertNotNull("Management Close results null", closeResults.size());
- assertEquals("Management Close logged unexpected number of times", 1, closeResults.size());
-
- final String openMessage = getMessageString(getLogMessage(openResults, 0));
- assertTrue("Unexpected open message " + openMessage, openMessage.endsWith("Open : User admin"));
- final String closeMessage = getMessageString(getLogMessage(closeResults, 0));
- assertTrue("Unexpected close message " + closeMessage, closeMessage.endsWith("Close : User admin"));
- }
- }
-
- private void startBrokerAndCreateMonitor(boolean managementEnabled, boolean useManagementSSL) throws Exception
- {
- //Ensure management is on
- setConfigurationProperty("management.enabled", String.valueOf(managementEnabled));
-
- if(useManagementSSL)
- {
- // This test requires we have an ssl connection
- setConfigurationProperty("management.ssl.enabled", "true");
- }
-
- startBroker();
-
- // Now we can create the monitor as _outputFile will now be defined
- _monitor = new LogMonitor(_outputFile);
- }
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
index ae7be6f7f4..0e59e9cceb 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ConflationQueueTest.java
@@ -22,7 +22,8 @@
package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
@@ -37,82 +38,65 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class ConflationQueueTest extends QpidBrokerTestCase
{
- private static final int TIMEOUT = 1500;
-
-
- private static final Logger _logger = Logger.getLogger(ConflationQueueTest.class);
+ private static final Logger LOGGER = Logger.getLogger(ConflationQueueTest.class);
-
-
- protected final String VHOST = "/test";
- protected final String QUEUE = "ConflationQueue";
+ private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
+ private static final String KEY_PROPERTY = "key";
private static final int MSG_COUNT = 400;
- private Connection producerConnection;
- private MessageProducer producer;
- private Session producerSession;
- private Queue queue;
- private Connection consumerConnection;
- private Session consumerSession;
-
-
- private MessageConsumer consumer;
+ private String _queueName;
+ private Queue _queue;
+ private Connection _producerConnection;
+ private MessageProducer _producer;
+ private Session _producerSession;
+ private Connection _consumerConnection;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
protected void setUp() throws Exception
{
super.setUp();
- producerConnection = getConnection();
- producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- producerConnection.start();
-
-
- }
-
- protected void tearDown() throws Exception
- {
- producerConnection.close();
- consumerConnection.close();
- super.tearDown();
+ _queueName = getTestQueueName();
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void testConflation() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -122,40 +106,33 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
-
-
}
-
public void testConflationWithRelease() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -165,31 +142,31 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -199,39 +176,34 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
-
public void testConflationWithReleaseAfterNewPublish() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT/2; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -241,35 +213,35 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- consumer.close();
+ _consumer.close();
for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
// HACK to do something synchronous
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
// this causes the "old" messages to be released
- consumerSession.close();
- consumerConnection.close();
+ _consumerSession.close();
+ _consumerConnection.close();
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = consumerSession.createConsumer(queue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -279,40 +251,54 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ }
+
+ public void testConflatedQueueDepth() throws Exception
+ {
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
+
+ for (int msg = 0; msg < MSG_COUNT; msg++)
+ {
+ _producer.send(nextMessage(msg, _producerSession));
}
+ final long queueDepth = ((AMQSession<?, ?>)_producerSession).getQueueDepth((AMQDestination)_queue, true);
+
+ assertEquals(10, queueDepth);
}
public void testConflationBrowser() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
Message received;
List<Message> messages = new ArrayList<Message>();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
@@ -322,62 +308,53 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
messages.clear();
- producer.send(nextMessage(MSG_COUNT, producerSession));
+ _producer.send(nextMessage(MSG_COUNT, _producerSession));
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- while((received = consumer.receive(1000))!=null)
+ while((received = _consumer.receive(1000))!=null)
{
messages.add(received);
}
assertEquals("Unexpected number of messages received",1,messages.size());
- assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty("msg"));
-
-
- producer.close();
- producerSession.close();
- producerConnection.close();
-
+ assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
}
-
public void testConflation2Browsers() throws Exception
{
- consumerConnection = getConnection();
- consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("qpid.last_value_queue_key","key");
- ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), false, true, false, arguments);
- queue = new org.apache.qpid.client.AMQQueue("amq.direct",QUEUE);
- ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
- producer = producerSession.createProducer(queue);
+ createConflationQueue(_producerSession);
+ _producer = _producerSession.createProducer(_queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- producer.send(nextMessage(msg, producerSession));
-
+ _producer.send(nextMessage(msg, _producerSession));
}
- ((AMQSession)producerSession).sync();
+ ((AMQSession<?,?>)_producerSession).sync();
- AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+QUEUE+"?browse='true'&durable='true'");
+ AMQBindingURL url = new AMQBindingURL("direct://amq.direct//"+_queueName+"?browse='true'&durable='true'");
AMQQueue browseQueue = new AMQQueue(url);
- consumer = consumerSession.createConsumer(browseQueue);
- MessageConsumer consumer2 = consumerSession.createConsumer(browseQueue);
- consumerConnection.start();
+ _consumer = _consumerSession.createConsumer(browseQueue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(browseQueue);
+ _consumerConnection.start();
List<Message> messages = new ArrayList<Message>();
List<Message> messages2 = new ArrayList<Message>();
- Message received = consumer.receive(1000);
+ Message received = _consumer.receive(1000);
Message received2 = consumer2.receive(1000);
while(received!=null || received2!=null)
@@ -392,7 +369,7 @@ public class ConflationQueueTest extends QpidBrokerTestCase
}
- received = consumer.receive(1000);
+ received = _consumer.receive(1000);
received2 = consumer2.receive(1000);
}
@@ -403,33 +380,195 @@ public class ConflationQueueTest extends QpidBrokerTestCase
for(int i = 0 ; i < 10; i++)
{
Message msg = messages.get(i);
- assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
msg = messages2.get(i);
- assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty("msg"));
+ assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _producer.close();
+ _producerSession.close();
+ _producerConnection.close();
+ }
+
+ public void testParallelProductionAndConsumption() throws Exception
+ {
+ createConflationQueue(_producerSession);
+
+ // Start producing threads that send messages
+ BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
+ messageProducer1.startSendingMessages();
+ BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2");
+ messageProducer2.startSendingMessages();
+
+ Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1);
+ messageProducer1.join();
+ messageProducer2.join();
+ final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
+ assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
+ final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
+ assertEquals(lastSentMessages1, lastSentMessages2);
+ assertEquals("The last message sent for each key should match the last message received for that key",
+ lastSentMessages1, lastReceivedMessages);
+
+ assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
}
+ private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception
+ {
+ producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
+ _consumerConnection = getConnection();
+ int smallPrefetchToEncourageConflation = 1;
+ _consumerSession = ((AMQConnection)_consumerConnection).createSession(false, Session.AUTO_ACKNOWLEDGE, smallPrefetchToEncourageConflation);
- private Message nextMessage(int msg, Session producerSession) throws JMSException
+ LOGGER.info("Starting to receive");
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<String, Integer>();
+
+ Message message;
+ int numberOfShutdownsReceived = 0;
+ int numberOfMessagesReceived = 0;
+ while(numberOfShutdownsReceived < 2)
+ {
+ message = _consumer.receive(10000);
+ assertNotNull(message);
+
+ if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
+ {
+ numberOfShutdownsReceived++;
+ }
+ else
+ {
+ numberOfMessagesReceived++;
+ putMessageInMap(message, messageSequenceNumbersByKey);
+ }
+ }
+
+ LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
+
+ return messageSequenceNumbersByKey;
+ }
+
+ private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
{
- Message send = producerSession.createTextMessage("Message: " + msg);
+ String keyValue = message.getStringProperty(KEY_PROPERTY);
+ Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
+ messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
+ }
- send.setStringProperty("key", String.valueOf(msg % 10));
- send.setIntProperty("msg", msg);
+ private class BackgroundMessageProducer
+ {
+ static final String SHUTDOWN = "SHUTDOWN";
- return send;
+ private final String _threadName;
+
+ private volatile Exception _exception;
+
+ private Thread _thread;
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<String, Integer>();
+ private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
+
+ public BackgroundMessageProducer(String threadName)
+ {
+ _threadName = threadName;
+ }
+
+ public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
+ {
+ final long latchTimeout = 60000;
+ boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
+ assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success);
+ LOGGER.info("Quarter of messages sent");
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ public Map<String, Integer> getMessageSequenceNumbersByKey()
+ {
+ return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
+ }
+
+ public void startSendingMessages()
+ {
+ Runnable messageSender = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Starting to send in background thread");
+ Connection producerConnection = getConnection();
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer backgroundProducer = producerSession.createProducer(_queue);
+ for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
+ {
+ Message message = nextMessage(messageNumber, producerSession, 2);
+ backgroundProducer.send(message);
+
+ putMessageInMap(message, _messageSequenceNumbersByKey);
+ _quarterOfMessagesSentLatch.countDown();
+ }
+
+ Message shutdownMessage = producerSession.createMessage();
+ shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+ backgroundProducer.send(shutdownMessage);
+
+ LOGGER.info("Finished sending in background thread");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ _thread = new Thread(messageSender);
+ _thread.setName(_threadName);
+ _thread.start();
+ }
+
+ public void join() throws InterruptedException
+ {
+ final int timeoutInMillis = 120000;
+ _thread.join(timeoutInMillis);
+ assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
+ }
+ }
+
+ private void createConflationQueue(Session session) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.last_value_queue_key",KEY_PROPERTY);
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination)_queue);
}
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ return nextMessage(msg, producerSession, 10);
+ }
-}
+ private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+ send.setStringProperty(KEY_PROPERTY, String.valueOf(msg % numberOfUniqueKeyValues));
+ send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
+ return send;
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
index ab0d88c737..782709b24f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/ModelTest.java
@@ -333,7 +333,7 @@ public class ModelTest extends QpidBrokerTestCase
queueName));
assertEquals(queueName, managedQueue.getName());
- assertEquals(String.valueOf(owner), managedQueue.getOwner());
+ assertEquals(owner, managedQueue.getOwner());
assertEquals(durable, managedQueue.isDurable());
assertEquals(autoDelete, managedQueue.isAutoDelete());
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java
index 6e4f12b9f3..ceff2b998a 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/security/acl/ExternalACLJMXTest.java
@@ -19,7 +19,6 @@
package org.apache.qpid.server.security.acl;
import org.apache.qpid.management.common.mbeans.ServerInformation;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.test.utils.JMXTestUtils;
@@ -30,7 +29,7 @@ import java.lang.management.RuntimeMXBean;
* Tests that access to the JMX interface is governed only by {@link ObjectType#METHOD}/{@link ObjectType#ALL}
* rules and AMQP rights have no effect.
*
- * Ensures that objects outside the Qpid domain ({@link ManagedObject#DOMAIN}) are not governed by the ACL model.
+ * Ensures that objects outside the Qpid domain are not governed by the ACL model.
*/
public class ExternalACLJMXTest extends AbstractACLTestCase
{
diff --git a/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java b/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java
new file mode 100644
index 0000000000..858b32c24c
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/security/auth/manager/MultipleAuthenticationManagersTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.security.auth.manager;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class MultipleAuthenticationManagersTest extends QpidBrokerTestCase
+{
+ private static final String KEYSTORE = "test-profiles/test_resources/ssl/java_client_keystore.jks";
+ private static final String KEYSTORE_PASSWORD = "password";
+ private static final String TRUSTSTORE = "test-profiles/test_resources/ssl/java_client_truststore.jks";
+ private static final String TRUSTSTORE_PASSWORD = "password";
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ setConfigurationProperty("connector.ssl.enabled", "true");
+ setConfigurationProperty("connector.ssl.sslOnly", "false");
+ setConfigurationProperty("security.anonymous-auth-manager", "");
+ setConfigurationProperty("security.default-auth-manager", "PrincipalDatabaseAuthenticationManager");
+ setConfigurationProperty("security.port-mappings.port-mapping.port", String.valueOf(QpidBrokerTestCase.DEFAULT_SSL_PORT));
+ setConfigurationProperty("security.port-mappings.port-mapping.auth-manager", "AnonymousAuthenticationManager");
+
+ // set the ssl system properties
+ setSystemProperty("javax.net.ssl.keyStore", KEYSTORE);
+ setSystemProperty("javax.net.ssl.keyStorePassword", KEYSTORE_PASSWORD);
+ setSystemProperty("javax.net.ssl.trustStore", TRUSTSTORE);
+ setSystemProperty("javax.net.ssl.trustStorePassword", TRUSTSTORE_PASSWORD);
+ setSystemProperty("javax.net.debug", "ssl");
+ super.setUp();
+ }
+
+ private Connection getAnonymousSSLConnection() throws Exception
+ {
+ String url = "amqp://:@test/?brokerlist='tcp://localhost:%s?ssl='true''";
+
+ url = String.format(url,QpidBrokerTestCase.DEFAULT_SSL_PORT);
+
+ return new AMQConnection(url);
+
+ }
+
+ private Connection getAnonymousConnection() throws Exception
+ {
+ String url = "amqp://:@test/?brokerlist='tcp://localhost:%s'";
+
+ url = String.format(url,QpidBrokerTestCase.DEFAULT_PORT);
+
+ return new AMQConnection(url);
+
+ }
+
+
+ public void testMultipleAuthenticationManagers() throws Exception
+ {
+ try
+ {
+ Connection conn = getConnection();
+ assertNotNull("Connection unexpectedly null", conn);
+ }
+ catch(JMSException e)
+ {
+ fail("Should be able to create a connection with credentials to the standard port. " + e.getMessage());
+ }
+
+ try
+ {
+ Connection conn = getAnonymousSSLConnection();
+ assertNotNull("Connection unexpectedly null", conn);
+ }
+ catch(JMSException e)
+ {
+ fail("Should be able to create a anonymous connection to the SSL port. " + e.getMessage());
+ }
+
+ try
+ {
+ Connection conn = getAnonymousConnection();
+ fail("Should not be able to create anonymous connection to the standard port");
+ }
+ catch(AMQException e)
+ {
+ // pass
+ }
+
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java b/java/systests/src/main/java/org/apache/qpid/server/stats/StatisticsReportingTest.java
index 786ef11956..c38fcd9199 100644
--- a/java/systests/src/main/java/org/apache/qpid/management/jmx/MessageStatisticsReportingTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/stats/StatisticsReportingTest.java
@@ -18,30 +18,75 @@
* under the License.
*
*/
-package org.apache.qpid.management.jmx;
+package org.apache.qpid.server.stats;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.LogMonitor;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
/**
- * Test generation of message statistics reporting.
+ * Test generation of message/data statistics reporting and the ability
+ * to control from the configuration file.
*/
-public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
+public class StatisticsReportingTest extends QpidBrokerTestCase
{
protected LogMonitor _monitor;
-
- public void configureStatistics() throws Exception
+ protected static final String USER = "admin";
+
+ protected Connection _test, _dev, _local;
+ protected String _queueName = "statistics";
+ protected Destination _queue;
+ protected String _brokerUrl;
+
+ @Override
+ public void setUp() throws Exception
{
setConfigurationProperty("statistics.generation.broker", "true");
setConfigurationProperty("statistics.generation.virtualhosts", "true");
-
+
if (getName().equals("testEnabledStatisticsReporting"))
{
setConfigurationProperty("statistics.reporting.period", "10");
}
-
+
_monitor = new LogMonitor(_outputFile);
+
+ super.setUp();
+
+ _brokerUrl = getBroker().toString();
+ _test = new AMQConnection(_brokerUrl, USER, USER, "clientid", "test");
+ _dev = new AMQConnection(_brokerUrl, USER, USER, "clientid", "development");
+ _local = new AMQConnection(_brokerUrl, USER, USER, "clientid", "localhost");
+
+ _test.start();
+ _dev.start();
+ _local.start();
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ _test.close();
+ _dev.close();
+ _local.close();
+
+ super.tearDown();
}
/**
@@ -52,14 +97,14 @@ public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
sendUsing(_test, 10, 100);
sendUsing(_dev, 20, 100);
sendUsing(_local, 15, 100);
-
+
Thread.sleep(10 * 1000); // 15s
-
+
List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
-
+
assertEquals("Incorrect number of broker data stats log messages", 2, brokerStatsData.size());
assertEquals("Incorrect number of broker message stats log messages", 2, brokerStatsMessages.size());
assertEquals("Incorrect number of virtualhost data stats log messages", 6, vhostStatsData.size());
@@ -74,17 +119,40 @@ public class MessageStatisticsReportingTest extends MessageStatisticsTestCase
sendUsing(_test, 10, 100);
sendUsing(_dev, 20, 100);
sendUsing(_local, 15, 100);
-
+
Thread.sleep(10 * 1000); // 15s
-
+
List<String> brokerStatsData = _monitor.findMatches("BRK-1008");
List<String> brokerStatsMessages = _monitor.findMatches("BRK-1009");
List<String> vhostStatsData = _monitor.findMatches("VHT-1003");
List<String> vhostStatsMessages = _monitor.findMatches("VHT-1004");
-
+
assertEquals("Incorrect number of broker data stats log messages", 0, brokerStatsData.size());
assertEquals("Incorrect number of broker message stats log messages", 0, brokerStatsMessages.size());
assertEquals("Incorrect number of virtualhost data stats log messages", 0, vhostStatsData.size());
assertEquals("Incorrect number of virtualhost message stats log messages", 0, vhostStatsMessages.size());
}
+
+ private void sendUsing(Connection con, int number, int size) throws Exception
+ {
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ createQueue(session);
+ MessageProducer producer = session.createProducer(_queue);
+ String content = new String(new byte[size]);
+ TextMessage msg = session.createTextMessage(content);
+ for (int i = 0; i < number; i++)
+ {
+ producer.send(msg);
+ }
+ }
+
+ private void createQueue(Session session) throws AMQException, JMSException
+ {
+ _queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName);
+ if (!((AMQSession<?,?>) session).isQueueBound((AMQDestination) _queue))
+ {
+ ((AMQSession<?,?>) session).createQueue(new AMQShortString(_queueName), false, true, false, null);
+ ((AMQSession<?,?>) session).declareAndBind((AMQDestination) new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, _queueName));
+ }
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
new file mode 100644
index 0000000000..07965cfa95
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/QuotaMessageStore.java
@@ -0,0 +1,180 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+
+public class QuotaMessageStore extends NullMessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+
+ private long _totalStoreSize;;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
+ private final StateManager _stateManager;
+ private final EventManager _eventManager = new EventManager();
+
+ public QuotaMessageStore()
+ {
+ _stateManager = new StateManager(_eventManager);
+ }
+
+ @Override
+ public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config)
+ throws Exception
+ {
+ _persistentSizeHighThreshold = config.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, Long.MAX_VALUE);
+ _persistentSizeLowThreshold = config.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY,
+ _persistentSizeHighThreshold);
+ if (_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+ _stateManager.attainState(State.INITIALISING);
+ }
+
+ @Override
+ public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler,
+ TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
+ {
+ _stateManager.attainState(State.INITIALISED);
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ _stateManager.attainState(State.ACTIVATING);
+ _stateManager.attainState(State.ACTIVE);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public StoredMessage<StorableMessageMetaData> addMessage(StorableMessageMetaData metaData)
+ {
+ final long id = _messageId.getAndIncrement();
+ return new StoredMemoryMessage(id, metaData);
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new Transaction()
+ {
+ private AtomicLong _storeSizeIncrease = new AtomicLong();
+
+ @Override
+ public StoreFuture commitTranAsync() throws AMQStoreException
+ {
+ QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue());
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ _storeSizeIncrease.addAndGet(((MessageContentSource)message).getSize());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+ {
+ _storeSizeIncrease.addAndGet(-((MessageContentSource)message).getSize());
+ }
+
+ @Override
+ public void commitTran() throws AMQStoreException
+ {
+ QuotaMessageStore.this.storedSizeChange(_storeSizeIncrease.intValue());
+ }
+
+ @Override
+ public void abortTran() throws AMQStoreException
+ {
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ }
+ };
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ _stateManager.attainState(State.CLOSING);
+ _closed.getAndSet(true);
+ _stateManager.attainState(State.CLOSED);
+ }
+
+ @Override
+ public void addEventListener(EventListener eventListener, Event... events)
+ {
+ _eventManager.addEventListener(eventListener, events);
+ }
+
+ private void storedSizeChange(final int delta)
+ {
+ if(_persistentSizeHighThreshold > 0)
+ {
+ synchronized (this)
+ {
+ long newSize = _totalStoreSize += delta;
+ if(!_limitBusted && newSize > _persistentSizeHighThreshold)
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ else if(_limitBusted && newSize < _persistentSizeHighThreshold)
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return "QUOTA";
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
index f2d4a513be..9db04b64b3 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
@@ -24,7 +24,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
@@ -370,4 +369,10 @@ public class SlowMessageStore implements MessageStore
return _realStore.getStoreLocation();
}
+ @Override
+ public String getStoreType()
+ {
+ return "SLOW";
+ }
+
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java b/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
deleted file mode 100644
index 6497a640d2..0000000000
--- a/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStoreFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-public class SlowMessageStoreFactory implements MessageStoreFactory
-{
-
- @Override
- public MessageStore createMessageStore()
- {
- return new SlowMessageStore();
- }
-
- @Override
- public String getStoreClassName()
- {
- return SlowMessageStore.class.getSimpleName();
- }
-
-}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java b/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
new file mode 100644
index 0000000000..9fb1db3a4f
--- /dev/null
+++ b/java/systests/src/main/java/org/apache/qpid/server/store/StoreOverfullTest.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class StoreOverfullTest extends QpidBrokerTestCase
+{
+ /** Number of messages to send*/
+ public static final int TEST_SIZE = 15;
+
+ /** Message payload*/
+ private static final byte[] BYTE_32K = new byte[32*1024];
+
+ private Connection _producerConnection;
+ private Connection _consumerConnection;
+ private Session _producerSession;
+ private Session _consumerSession;
+ private MessageProducer _producer;
+ private MessageConsumer _consumer;
+ private Queue _queue;
+
+ private static final int OVERFULL_SIZE = 400000;
+ private static final int UNDERFULL_SIZE = 350000;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.class", QuotaMessageStore.class.getName());
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.overfull-size", String.valueOf(OVERFULL_SIZE));
+ setConfigurationProperty("virtualhosts.virtualhost.test.store.underfull-size", String.valueOf(UNDERFULL_SIZE));
+
+ super.setUp();
+
+ _producerConnection = getConnection();
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producerConnection.start();
+
+ _consumerConnection = getConnection();
+ _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ _producerConnection.close();
+ _consumerConnection.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ /**
+ * Test:
+ *
+ * Send > threshold amount of data : Sender is blocked
+ * Remove 90% of data : Sender is unblocked
+ *
+ */
+ public void testCapacityExceededCausesBlock() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ _queue = getTestQueue();
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ MessageSender sender = sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ int mostMessages = (int) (0.9 * sentCount);
+ for(int i = 0; i < mostMessages; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ long targetTime = System.currentTimeMillis() + 5000l;
+ while(sentMessages.get() == sentCount && System.currentTimeMillis() < targetTime)
+ {
+ Thread.sleep(100l);
+ }
+
+ assertFalse("Did not unblock on consuming messages", sentMessages.get() == sentCount);
+
+ for(int i = mostMessages; i < TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertTrue("Not all messages were sent", sentMessages.get() == TEST_SIZE);
+ assertNull("Unexpected exception on message sending:" + sender.getException(), sender.getException());
+ }
+
+ /**
+ * Two producers on different queues
+ */
+ public void testCapacityExceededCausesBlockTwoConnections() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+ AMQQueue queue2 = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, getTestQueueName() + "_2");
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)queue2);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(queue2);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+
+ while(!((AMQSession<?,?>)secondProducerSession).isFlowBlocked())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ MessageConsumer consumer2 = _consumerSession.createConsumer(queue2);
+ _consumerConnection.start();
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(1000l) == null
+ && consumer2.receive(1000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+ }
+
+ /**
+ * New producers are blocked
+ */
+ public void testCapacityExceededCausesBlockNewConnection() throws Exception
+ {
+ AtomicInteger sentMessages = new AtomicInteger(0);
+ AtomicInteger sentMessages2 = new AtomicInteger(0);
+
+ _queue = getTestQueue();
+
+ ((AMQSession<?,?>) _producerSession).declareAndBind((AMQDestination)_queue);
+
+ _producer = _producerSession.createProducer(_queue);
+
+ Connection secondProducerConnection = getConnection();
+ Session secondProducerSession = secondProducerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondProducerSession.createProducer(_queue);
+
+ sendMessagesAsync(_producer, _producerSession, TEST_SIZE, 50L, sentMessages);
+
+ while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount = sentMessages.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount);
+
+ sendMessagesAsync(secondProducer, secondProducerSession, TEST_SIZE, 50L, sentMessages2);
+
+ while(!((AMQSession<?,?>)_producerSession).isFlowBlocked())
+ {
+ Thread.sleep(100l);
+ }
+ int sentCount2 = sentMessages2.get();
+ assertFalse("Did not block before sending all messages", TEST_SIZE == sentCount2);
+
+ _consumer = _consumerSession.createConsumer(_queue);
+ _consumerConnection.start();
+
+ for(int i = 0; i < 2*TEST_SIZE; i++)
+ {
+ if(_consumer.receive(2000l) == null)
+ {
+ break;
+ }
+ }
+
+ assertEquals("Not all messages were sent from the first sender", TEST_SIZE, sentMessages.get());
+ assertEquals("Not all messages were sent from the second sender", TEST_SIZE, sentMessages2.get());
+
+ }
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod,
+ AtomicInteger sentMessages)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod, sentMessages);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _senderProducer;
+ private final Session _senderSession;
+ private final int _numMessages;
+ private volatile JMSException _exception;
+ private CountDownLatch _exceptionThrownLatch = new CountDownLatch(1);
+ private long _sleepPeriod;
+ private final AtomicInteger _sentMessages;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ {
+ _senderProducer = producer;
+ _senderSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ _sentMessages = sentMessages;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_senderProducer, _senderSession, _numMessages, _sleepPeriod, _sentMessages);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ _exceptionThrownLatch.countDown();
+ }
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod, AtomicInteger sentMessages)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ sentMessages.incrementAndGet();
+
+ try
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_32K);
+ send.setIntProperty("msg", msg);
+ return send;
+ }
+
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
index e06ed6e171..fa36d73283 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.client;
import org.apache.qpid.client.AMQDestination;
@@ -16,26 +36,6 @@ import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
public class DupsOkTest extends QpidBrokerTestCase
{
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
index 2c7f426306..626592dc10 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/message/SelectorTest.java
@@ -233,7 +233,7 @@ public class SelectorTest extends QpidBrokerTestCase implements MessageListener
{
}
- assertTrue("Connection should be closed", _connection.isClosed());
+ assertFalse("Connection should not be closed", _connection.isClosed());
}
public void testSelectorWithJMSMessageID() throws Exception
diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
index 91f5cb7770..ee81e7c372 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
@@ -54,7 +54,7 @@ public class SyncWaitDelayTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
- setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.factoryclass", "org.apache.qpid.server.store.SlowMessageStoreFactory");
+ setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.class", "org.apache.qpid.server.store.SlowMessageStore");
setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST+".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
setConfigurationProperty("management.enabled", "false");
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index fa0fe7e0b5..bc1eead8b4 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -41,6 +41,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -67,6 +68,9 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
private static final int MAX_DELIVERY_COUNT = 2;
private CountDownLatch _awaitCompletion;
+ /** index numbers of messages to be redelivered */
+ private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14);
+
public void setUp() throws Exception
{
//enable DLQ/maximumDeliveryCount support for all queues at the vhost level
@@ -144,13 +148,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testAsynchronousClientAckSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
-
- doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, false, false);
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false);
}
/**
@@ -159,13 +157,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testAsynchronousTransactedSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
-
- doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, false);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false);
}
/**
@@ -174,13 +166,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testAsynchronousAutoAckSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
-
- doTest(Session.AUTO_ACKNOWLEDGE, redeliverMsgs, false, false);
+ doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false);
}
/**
@@ -189,13 +175,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testAsynchronousDupsOkSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
-
- doTest(Session.DUPS_OK_ACKNOWLEDGE, redeliverMsgs, false, false);
+ doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false);
}
/**
@@ -204,13 +184,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testSynchronousClientAckSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(3);
- redeliverMsgs.add(14);
-
- doTest(Session.CLIENT_ACKNOWLEDGE, redeliverMsgs, true, false);
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false);
}
/**
@@ -219,27 +193,22 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
*/
public void testSynchronousTransactedSession() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
-
- doTest(Session.SESSION_TRANSACTED, redeliverMsgs, true, false);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
}
public void testDurableSubscription() throws Exception
{
- final ArrayList<Integer> redeliverMsgs = new ArrayList<Integer>();
- redeliverMsgs.add(1);
- redeliverMsgs.add(2);
- redeliverMsgs.add(5);
- redeliverMsgs.add(14);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true);
+ }
+
+ public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception
+ {
+ restartBroker();
- doTest(Session.SESSION_TRANSACTED, redeliverMsgs, false, true);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
}
- public void doTest(final int deliveryMode, final ArrayList<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception
+ private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception
{
final Connection clientConnection = getConnection();
@@ -311,7 +280,6 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
restartBroker();
final Connection clientConnection2 = getConnection();
- final Session clientSession2 = clientConnection2.createSession(transacted, deliveryMode);
clientConnection2.start();
//verify the messages on the DLQ
@@ -406,7 +374,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
}
private void addMessageListener(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
- final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException
+ final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException
{
if(deliveryMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE
|| deliveryMode == org.apache.qpid.jms.Session.PRE_ACKNOWLEDGE)
@@ -567,7 +535,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
}
private void doSynchronousTest(final Session session, final MessageConsumer consumer, final int deliveryMode, final int maxDeliveryCount,
- final int expectedTotalNumberOfDeliveries, final ArrayList<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException
+ final int expectedTotalNumberOfDeliveries, final List<Integer> redeliverMsgs) throws JMSException, AMQException, InterruptedException
{
if(deliveryMode == Session.AUTO_ACKNOWLEDGE
|| deliveryMode == Session.DUPS_OK_ACKNOWLEDGE
@@ -637,7 +605,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
//sleep then do a synchronous op to give the broker
//time to resend all the messages
Thread.sleep(500);
- ((AMQSession) session).sync();
+ ((AMQSession<?,?>) session).sync();
break;
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
index 5b3bca7033..2cd7520ae4 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.management.jmx.ManagedConnectionMBeanTest;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.transport.ConnectionException;
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java
index a313475b11..bf1fbbf1a3 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionFactoryTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.test.unit.client.connection;
+import javax.jms.Connection;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -57,5 +59,20 @@ public class ConnectionFactoryTest extends QpidBrokerTestCase
assertEquals("Usernames used is different from the one in URL","guest",con3.getConnectionURL().getUsername());
assertEquals("Password used is different from the one in URL","guest",con3.getConnectionURL().getPassword());
}
-
+
+ /**
+ * Verifies that a connection can be made using an instance of AMQConnectionFactory created with the
+ * default constructor and provided with the connection url via setter.
+ */
+ public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception
+ {
+ String broker = getBroker().toString();
+ String url = "amqp://guest:guest@clientID/test?brokerlist='" + broker + "'";
+
+ AMQConnectionFactory factory = new AMQConnectionFactory();
+ factory.setConnectionURLString(url);
+
+ Connection con = factory.createConnection();
+ con.close();
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index cc76d89a67..b11df5a2a0 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -30,7 +30,7 @@ import javax.jms.Queue;
* This tests the behaviour of transactional sessions when the {@code transactionTimeout} configuration
* is set for a virtual host.
*
- * A producer that is idle for too long or open for too long will have its connection closed and
+ * A producer that is idle for too long or open for too long will have its connection/session(0-10) closed and
* any further operations will fail with a 408 resource timeout exception. Consumers will not
* be affected by the transaction timeout configuration.
*/
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
index e940a73bbb..39973e12c7 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/xa/FaultTest.java
@@ -1,6 +1,5 @@
-package org.apache.qpid.test.unit.xa;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,17 +7,18 @@ package org.apache.qpid.test.unit.xa;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
+package org.apache.qpid.test.unit.xa;
import junit.framework.TestSuite;
@@ -344,7 +344,7 @@ public class FaultTest extends AbstractXATestCase
{
assertEquals("Wrong error code: ", XAException.XAER_PROTO, e.errorCode);
}
- }
+ }
/**
* Strategy:
@@ -366,27 +366,28 @@ public class FaultTest extends AbstractXATestCase
/**
* Strategy:
* Check that a transaction timeout as expected
- * - set timeout to 10ms
- * - sleep 1000ms
+ * - set timeout to 1s
+ * - sleep 1500ms
* - call end and check that the expected exception is thrown
*/
public void testTransactionTimeout() throws Exception
{
+ _xaResource.setTransactionTimeout(1);
+
Xid xid = getNewXid();
try
{
_xaResource.start(xid, XAResource.TMNOFLAGS);
- assertEquals("Wrong timeout", _xaResource.getTransactionTimeout(), 0);
- _xaResource.setTransactionTimeout(10);
- Thread.sleep(1000);
+ Thread.sleep(1500);
_xaResource.end(xid, XAResource.TMSUCCESS);
+ fail("Timeout expected ");
}
catch (XAException e)
{
assertEquals("Wrong error code: ", XAException.XA_RBTIMEOUT, e.errorCode);
}
}
-
+
/**
* Strategy:
* Set the transaction timeout to 1000
@@ -394,18 +395,18 @@ public class FaultTest extends AbstractXATestCase
public void testTransactionTimeoutAfterCommit() throws Exception
{
Xid xid = getNewXid();
-
+
_xaResource.start(xid, XAResource.TMNOFLAGS);
_xaResource.setTransactionTimeout(1000);
assertEquals("Wrong timeout", 1000,_xaResource.getTransactionTimeout());
-
+
//_xaResource.prepare(xid);
_xaResource.end(xid, XAResource.TMSUCCESS);
_xaResource.commit(xid, true);
-
+
_xaResource.setTransactionTimeout(2000);
assertEquals("Wrong timeout", 2000,_xaResource.getTransactionTimeout());
-
+
xid = getNewXid();
_xaResource.start(xid, XAResource.TMNOFLAGS);
assertEquals("Wrong timeout", 2000, _xaResource.getTransactionTimeout());
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
index 66b3fe0c6a..3af57327d2 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/BrokerHolder.java
@@ -25,4 +25,5 @@ public interface BrokerHolder
String getWorkingDirectory();
void shutdown();
void kill();
+ String dumpThreads();
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
index adda9ca3ec..a71a4ef517 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.test.utils;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Set;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.Broker;
@@ -31,7 +36,9 @@ public class InternalBrokerHolder implements BrokerHolder
private final Broker _broker;
private final String _workingDirectory;
- public InternalBrokerHolder(final Broker broker, String workingDirectory)
+ private Set<Integer> _portsUsedByBroker;
+
+ public InternalBrokerHolder(final Broker broker, String workingDirectory, Set<Integer> portsUsedByBroker)
{
if(broker == null)
{
@@ -40,6 +47,7 @@ public class InternalBrokerHolder implements BrokerHolder
_broker = broker;
_workingDirectory = workingDirectory;
+ _portsUsedByBroker = portsUsedByBroker;
}
@Override
@@ -53,7 +61,9 @@ public class InternalBrokerHolder implements BrokerHolder
LOGGER.info("Shutting down Broker instance");
_broker.shutdown();
-
+
+ waitUntilPortsAreFree();
+
LOGGER.info("Broker instance shutdown");
}
@@ -64,5 +74,42 @@ public class InternalBrokerHolder implements BrokerHolder
shutdown();
}
+ private void waitUntilPortsAreFree()
+ {
+ new PortHelper().waitUntilPortsAreFree(_portsUsedByBroker);
+ }
+
+ @Override
+ public String dumpThreads()
+ {
+ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+ ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
+ StringBuilder dump = new StringBuilder();
+ dump.append(String.format("%n"));
+ for (ThreadInfo threadInfo : threadInfos)
+ {
+ dump.append(threadInfo);
+ }
+
+ long[] deadLocks = threadMXBean.findDeadlockedThreads();
+ if (deadLocks != null && deadLocks.length > 0)
+ {
+ ThreadInfo[] deadlockedThreads = threadMXBean.getThreadInfo(deadLocks);
+ dump.append(String.format("%n"));
+ dump.append("Deadlock is detected!");
+ dump.append(String.format("%n"));
+ for (ThreadInfo threadInfo : deadlockedThreads)
+ {
+ dump.append(threadInfo);
+ }
+ }
+ return dump.toString();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "InternalBrokerHolder [_portsUsedByBroker=" + _portsUsedByBroker + "]";
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
index d9c259c389..43b80b45fb 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
@@ -33,11 +33,15 @@ import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.ServerInformation;
import org.apache.qpid.management.common.mbeans.UserManagement;
+import javax.management.InstanceNotFoundException;
import javax.management.JMException;
+import javax.management.ListenerNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import java.io.IOException;
@@ -50,8 +54,8 @@ import java.util.Set;
*/
public class JMXTestUtils
{
- private static final String DEFAULT_PASSWORD = "admin";
- private static final String DEFAULT_USERID = "admin";
+ public static final String DEFAULT_PASSWORD = "admin";
+ public static final String DEFAULT_USERID = "admin";
private MBeanServerConnection _mbsc;
private JMXConnector _jmxc;
@@ -79,8 +83,16 @@ public class JMXTestUtils
public void open() throws Exception
{
+ open(0); // Zero signifies default broker to QBTC.
+ }
+
+ public void open(final int brokerPort) throws Exception
+ {
+ int actualBrokerPort = _test.getPort(brokerPort);
+ int managementPort = _test.getManagementPort(actualBrokerPort);
+
_jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
- _test.getManagementPort(_test.getPort()), _user, _password);
+ managementPort, _user, _password);
_mbsc = _jmxc.getMBeanServerConnection();
}
@@ -93,6 +105,18 @@ public class JMXTestUtils
}
}
+ public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object handback)
+ throws InstanceNotFoundException, IOException
+ {
+ _mbsc.addNotificationListener(name, listener, filter, handback);
+ }
+
+ public void removeNotificationListener(ObjectName name, NotificationListener listener)
+ throws InstanceNotFoundException, IOException, ListenerNotFoundException
+ {
+ _mbsc.removeNotificationListener(name, listener);
+ }
+
/**
* Create a non-durable exchange with the requested name
*
@@ -136,7 +160,6 @@ public class JMXTestUtils
throws IOException, JMException, MBeanException
{
ManagedBroker managedBroker = getManagedBroker(virtualHostName);
-
managedBroker.unregisterExchange(exchange);
}
@@ -152,87 +175,81 @@ public class JMXTestUtils
throws IOException, JMException, MBeanException
{
ManagedBroker managedBroker = getManagedBroker(virtualHostName);
-
managedBroker.deleteQueue(queueName);
}
-
+
/**
- * Sets the logging level.
+ * Sets the logging level.
*
* @throws JMException
* @throws IOException if there is a problem with the JMX Connection
* @throws MBeanException
*/
public void setRuntimeLoggerLevel(String logger, String level)
- throws IOException, JMException, MBeanException
+ throws IOException, JMException, MBeanException
{
LoggingManagement loggingManagement = getLoggingManagement();
-
loggingManagement.setRuntimeLoggerLevel(logger, level);
}
-
+
/**
- * Reload logging config file.
+ * Reload logging config file.
*
* @throws JMException
* @throws IOException if there is a problem with the JMX Connection
* @throws MBeanException
*/
public void reloadConfigFile()
- throws IOException, JMException, MBeanException
+ throws IOException, JMException, MBeanException
{
LoggingManagement loggingManagement = getLoggingManagement();
-
loggingManagement.reloadConfigFile();
}
/**
- * Get list of available logger levels.
+ * Get list of available logger levels.
*
* @throws JMException
* @throws IOException if there is a problem with the JMX Connection
* @throws MBeanException
*/
public String[] getAvailableLoggerLevels()
- throws IOException, JMException, MBeanException
+ throws IOException, JMException, MBeanException
{
LoggingManagement loggingManagement = getLoggingManagement();
-
return loggingManagement.getAvailableLoggerLevels();
}
-
+
/**
- * Set root logger level.
+ * Set root logger level.
*
* @throws JMException
* @throws IOException if there is a problem with the JMX Connection
* @throws MBeanException
*/
public void setRuntimeRootLoggerLevel(String level)
- throws IOException, JMException, MBeanException
+ throws IOException, JMException, MBeanException
{
LoggingManagement loggingManagement = getLoggingManagement();
-
loggingManagement.setRuntimeRootLoggerLevel(level);
}
-
+
/**
- * Get root logger level.
+ * Get root logger level.
*
* @throws JMException
* @throws IOException if there is a problem with the JMX Connection
* @throws MBeanException
*/
public String getRuntimeRootLoggerLevel()
- throws IOException, JMException, MBeanException
+ throws IOException, JMException, MBeanException
{
LoggingManagement loggingManagement = getLoggingManagement();
-
return loggingManagement.getRuntimeRootLoggerLevel();
}
/**
- * Retrive the ObjectName for a Virtualhost.
+ * Retrieve the ObjectName for a Virtualhost.
*
* This is then used to create a proxy to the ManagedBroker MBean.
*
@@ -253,12 +270,12 @@ public class JMXTestUtils
// We have verified we have only one value in objectNames so return it
ObjectName objectName = objectNames.iterator().next();
- _test.getLogger().info("Loading: " + objectName);
+ _test.getLogger().info("Loading: " + objectName);
return objectName;
}
/**
- * Retrive the ObjectName for the given Queue on a Virtualhost.
+ * Retrieve the ObjectName for the given Queue on a Virtualhost.
*
* This is then used to create a proxy to the ManagedQueue MBean.
*
@@ -281,7 +298,7 @@ public class JMXTestUtils
// We have verified we have only one value in objectNames so return it
ObjectName objectName = objectNames.iterator().next();
- _test.getLogger().info("Loading: " + objectName);
+ _test.getLogger().info("Loading: " + objectName);
return objectName;
}
@@ -309,7 +326,7 @@ public class JMXTestUtils
// We have verified we have only one value in objectNames so return it
ObjectName objectName = objectNames.iterator().next();
- _test.getLogger().info("Loading: " + objectName);
+ _test.getLogger().info("Loading: " + objectName);
return objectName;
}
@@ -319,10 +336,10 @@ public class JMXTestUtils
Set<ObjectName> objectNames = queryObjects(query);
_test.assertNotNull("Null ObjectName Set returned", objectNames);
- _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
+ _test.assertEquals("Unexpected number of objects matching " + managedClass + " returned", 1, objectNames.size());
ObjectName objectName = objectNames.iterator().next();
- _test.getLogger().info("Loading: " + objectName);
+ _test.getLogger().info("Loading: " + objectName);
return getManagedObject(managedClass, objectName);
}
@@ -355,34 +372,34 @@ public class JMXTestUtils
{
return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost));
}
-
+
public ManagedExchange getManagedExchange(String exchangeName)
{
- ObjectName objectName = getExchangeObjectName("test", exchangeName);
+ ObjectName objectName = getExchangeObjectName("test", exchangeName);
return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, ManagedExchange.class, false);
}
-
+
public ManagedQueue getManagedQueue(String queueName)
{
ObjectName objectName = getQueueObjectName("test", queueName);
return getManagedObject(ManagedQueue.class, objectName);
}
- public LoggingManagement getLoggingManagement() throws MalformedObjectNameException
+ public LoggingManagement getLoggingManagement() throws MalformedObjectNameException
{
- ObjectName objectName = new ObjectName("org.apache.qpid:type=LoggingManagement,name=LoggingManagement");
+ ObjectName objectName = new ObjectName("org.apache.qpid:type=LoggingManagement,name=LoggingManagement");
return getManagedObject(LoggingManagement.class, objectName);
}
-
- public ConfigurationManagement getConfigurationManagement() throws MalformedObjectNameException
+
+ public ConfigurationManagement getConfigurationManagement() throws MalformedObjectNameException
{
- ObjectName objectName = new ObjectName("org.apache.qpid:type=ConfigurationManagement,name=ConfigurationManagement");
+ ObjectName objectName = new ObjectName("org.apache.qpid:type=ConfigurationManagement,name=ConfigurationManagement");
return getManagedObject(ConfigurationManagement.class, objectName);
}
-
- public UserManagement getUserManagement() throws MalformedObjectNameException
+
+ public UserManagement getUserManagement() throws MalformedObjectNameException
{
- ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement");
+ ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement");
return getManagedObject(UserManagement.class, objectName);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index c070fb4de0..aa909a6674 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -17,11 +17,41 @@
*/
package org.apache.qpid.test.utils;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQQueue;
@@ -33,59 +63,33 @@ import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.ProtocolExclusion;
+import org.apache.qpid.server.ProtocolInclusion;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.store.MessageStoreConstants;
-import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory;
+import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
import org.apache.qpid.util.LogMonitor;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
/**
* Qpid base class for system testing test cases.
*/
public class QpidBrokerTestCase extends QpidTestCase
{
-
public enum BrokerType
{
EXTERNAL /** Test case relies on a Broker started independently of the test-suite */,
INTERNAL /** Test case starts an embedded broker within this JVM */,
SPAWNED /** Test case spawns a new broker as a separate process */
}
+
+ public static final String GUEST_USERNAME = "guest";
+ public static final String GUEST_PASSWORD = "guest";
+
protected final static String QpidHome = System.getProperty("QPID_HOME");
protected File _configFile = new File(System.getProperty("broker.config"));
+ protected File _logConfigFile = new File(System.getProperty("log4j.configuration"));
protected static final Logger _logger = Logger.getLogger(QpidBrokerTestCase.class);
protected static final int LOGMONITOR_TIMEOUT = 5000;
@@ -113,8 +117,10 @@ public class QpidBrokerTestCase extends QpidTestCase
}
// system properties
+ private static final String TEST_VIRTUALHOSTS = "test.virtualhosts";
+ private static final String TEST_CONFIG = "test.config";
private static final String BROKER_LANGUAGE = "broker.language";
- private static final String BROKER_TYPE = "broker.type";
+ protected static final String BROKER_TYPE = "broker.type";
private static final String BROKER_COMMAND = "broker.command";
private static final String BROKER_CLEAN_BETWEEN_TESTS = "broker.clean.between.tests";
private static final String BROKER_EXISTING_QPID_WORK = "broker.existing.qpid.work";
@@ -125,7 +131,8 @@ public class QpidBrokerTestCase extends QpidTestCase
private static final String BROKER_LOG_INTERLEAVE = "broker.log.interleave";
private static final String BROKER_LOG_PREFIX = "broker.log.prefix";
private static final String BROKER_PERSITENT = "broker.persistent";
- private static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes";
+ public static final String BROKER_PROTOCOL_EXCLUDES = "broker.protocol.excludes";
+ public static final String BROKER_PROTOCOL_INCLUDES = "broker.protocol.includes";
// values
protected static final String JAVA = "java";
@@ -146,7 +153,6 @@ public class QpidBrokerTestCase extends QpidTestCase
private final AmqpProtocolVersion _brokerVersion = AmqpProtocolVersion.valueOf(System.getProperty(BROKER_VERSION, ""));
protected String _output = System.getProperty(TEST_OUTPUT, System.getProperty("java.io.tmpdir"));
protected Boolean _brokerPersistent = Boolean.getBoolean(BROKER_PERSITENT);
- private String _brokerProtocolExcludes = System.getProperty(BROKER_PROTOCOL_EXCLUDES);
protected static String _brokerLogPrefix = System.getProperty(BROKER_LOG_PREFIX,"BROKER: ");
protected static boolean _interleaveBrokerLog = Boolean.getBoolean(BROKER_LOG_INTERLEAVE);
@@ -164,13 +170,13 @@ public class QpidBrokerTestCase extends QpidTestCase
protected List<Connection> _connections = new ArrayList<Connection>();
public static final String QUEUE = "queue";
public static final String TOPIC = "topic";
-
+
/** Map to hold test defined environment properties */
private Map<String, String> _env;
/** Ensure our messages have some sort of size */
protected static final int DEFAULT_MESSAGE_SIZE = 1024;
-
+
/** Size to create our message*/
private int _messageSize = DEFAULT_MESSAGE_SIZE;
/** Type of message*/
@@ -193,7 +199,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
super();
}
-
+
public Logger getLogger()
{
return QpidBrokerTestCase._logger;
@@ -219,10 +225,6 @@ public class QpidBrokerTestCase extends QpidTestCase
out = new PrintStream(new FileOutputStream(_outputFile), true);
err = new PrintStream(String.format("%s/TEST-%s.err", _output, qname));
- // This is relying on behaviour specific to log4j 1.2.12. If we were to upgrade to 1.2.13 or
- // beyond we must change either code (or config) to ensure that ConsoleAppender#setFollow
- // is set to true otherwise log4j logging will not respect the following reassignment.
-
System.setOut(out);
System.setErr(err);
@@ -313,6 +315,24 @@ public class QpidBrokerTestCase extends QpidTestCase
}
/**
+ * The returned set of port numbers is only a guess because it assumes no ports have been overridden
+ * using system properties.
+ */
+ protected Set<Integer> guessAllPortsUsedByBroker(int mainPort)
+ {
+ Set<Integer> ports = new HashSet<Integer>();
+ int managementPort = getManagementPort(mainPort);
+ int connectorServerPort = managementPort + ServerConfiguration.JMXPORT_CONNECTORSERVER_OFFSET;
+
+ ports.add(mainPort);
+ ports.add(managementPort);
+ ports.add(connectorServerPort);
+ ports.add(DEFAULT_SSL_PORT);
+
+ return ports;
+ }
+
+ /**
* Get the Port that is use by the current broker
*
* @return the current port
@@ -338,12 +358,16 @@ public class QpidBrokerTestCase extends QpidTestCase
{
final int sslPort = port-1;
final String protocolExcludesList = getProtocolExcludesList(port, sslPort);
+ final String protocolIncludesList = getProtocolIncludesList(port, sslPort);
+
return _brokerCommand
.replace("@PORT", "" + port)
.replace("@SSL_PORT", "" + sslPort)
.replace("@MPORT", "" + getManagementPort(port))
.replace("@CONFIG_FILE", _configFile.toString())
- .replace("@EXCLUDES", protocolExcludesList);
+ .replace("@LOG_CONFIG_FILE", _logConfigFile.toString())
+ .replace("@EXCLUDES", protocolExcludesList)
+ .replace("@INCLUDES", protocolIncludesList);
}
public void startBroker() throws Exception
@@ -353,39 +377,51 @@ public class QpidBrokerTestCase extends QpidTestCase
public void startBroker(int port) throws Exception
{
+ startBroker(port, _testConfiguration, _testVirtualhosts);
+ }
+
+ public void startBroker(int port, XMLConfiguration testConfiguration, XMLConfiguration virtualHosts) throws Exception
+ {
port = getPort(port);
// Save any configuration changes that have been made
- saveTestConfiguration();
- saveTestVirtualhosts();
+ String testConfig = saveTestConfiguration(port, testConfiguration);
+ String virtualHostsConfig = saveTestVirtualhosts(port, virtualHosts);
if(_brokers.get(port) != null)
{
throw new IllegalStateException("There is already an existing broker running on port " + port);
}
+ Set<Integer> portsUsedByBroker = guessAllPortsUsedByBroker(port);
+
if (_brokerType.equals(BrokerType.INTERNAL) && !existingInternalBroker())
{
setConfigurationProperty(ServerConfiguration.MGMT_CUSTOM_REGISTRY_SOCKET, String.valueOf(false));
- saveTestConfiguration();
+ testConfig = saveTestConfiguration(port, testConfiguration);
+ _logger.info("Set test.config property to: " + testConfig);
+ _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig);
+ setSystemProperty(TEST_CONFIG, testConfig);
+ setSystemProperty(TEST_VIRTUALHOSTS, virtualHostsConfig);
BrokerOptions options = new BrokerOptions();
options.setConfigFile(_configFile.getAbsolutePath());
options.addPort(port);
addExcludedPorts(port, DEFAULT_SSL_PORT, options);
+ addIncludedPorts(port, DEFAULT_SSL_PORT, options);
options.setJmxPortRegistryServer(getManagementPort(port));
//Set the log config file, relying on the log4j.configuration system property
//set on the JVM by the JUnit runner task in module.xml.
- options.setLogConfigFile(new URL(System.getProperty("log4j.configuration")).getFile());
+ options.setLogConfigFile(_logConfigFile.getAbsolutePath());
Broker broker = new Broker();
_logger.info("starting internal broker (same JVM)");
broker.startup(options);
- _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK")));
+ _brokers.put(port, new InternalBrokerHolder(broker, System.getProperty("QPID_WORK"), portsUsedByBroker));
}
else if (!_brokerType.equals(BrokerType.EXTERNAL))
{
@@ -395,16 +431,16 @@ public class QpidBrokerTestCase extends QpidTestCase
_logger.info("starting external broker: " + cmd);
ProcessBuilder pb = new ProcessBuilder(cmd.split("\\s+"));
pb.redirectErrorStream(true);
- Map<String, String> env = pb.environment();
+ Map<String, String> processEnv = pb.environment();
String qpidHome = System.getProperty(QPID_HOME);
- env.put(QPID_HOME, qpidHome);
+ processEnv.put(QPID_HOME, qpidHome);
//Augment Path with bin directory in QPID_HOME.
- env.put("PATH", env.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
+ processEnv.put("PATH", processEnv.get("PATH").concat(File.pathSeparator + qpidHome + "/bin"));
//Add the test name to the broker run.
// DON'T change PNAME, qpid.stop needs this value.
- env.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\"");
- env.put("QPID_WORK", qpidWork);
+ processEnv.put("QPID_PNAME", "-DPNAME=QPBRKR -DTNAME=\"" + getTestName() + "\"");
+ processEnv.put("QPID_WORK", qpidWork);
// Use the environment variable to set amqj.logging.level for the broker
// The value used is a 'server' value in the test configuration to
@@ -419,7 +455,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
for (Map.Entry<String, String> entry : _env.entrySet())
{
- env.put(entry.getKey(), entry.getValue());
+ processEnv.put(entry.getKey(), entry.getValue());
}
}
@@ -436,25 +472,25 @@ public class QpidBrokerTestCase extends QpidTestCase
setSystemProperty("root.logging.level");
}
+ // set test.config and test.virtualhosts
+ String qpidOpts = " -D" + TEST_CONFIG + "=" + testConfig + " -D" + TEST_VIRTUALHOSTS + "=" + virtualHostsConfig;
- String QPID_OPTS = " ";
// Add all the specified system properties to QPID_OPTS
if (!_propertiesSetForBroker.isEmpty())
{
for (String key : _propertiesSetForBroker.keySet())
{
- QPID_OPTS += "-D" + key + "=" + _propertiesSetForBroker.get(key) + " ";
- }
-
- if (env.containsKey("QPID_OPTS"))
- {
- env.put("QPID_OPTS", env.get("QPID_OPTS") + QPID_OPTS);
- }
- else
- {
- env.put("QPID_OPTS", QPID_OPTS);
+ qpidOpts += " -D" + key + "=" + _propertiesSetForBroker.get(key);
}
}
+ if (processEnv.containsKey("QPID_OPTS"))
+ {
+ qpidOpts = processEnv.get("QPID_OPTS") + qpidOpts;
+ }
+ processEnv.put("QPID_OPTS", qpidOpts);
+
+ _logger.info("Set test.config property to: " + testConfig);
+ _logger.info("Set test.virtualhosts property to: " + virtualHostsConfig);
// cpp broker requires that the work directory is created
createBrokerWork(qpidWork);
@@ -492,14 +528,14 @@ public class QpidBrokerTestCase extends QpidTestCase
// this is expect if the broker started successfully
}
- _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork));
+ _brokers.put(port, new SpawnedBrokerHolder(process, qpidWork, portsUsedByBroker));
}
}
private void addExcludedPorts(int port, int sslPort, BrokerOptions options)
{
final String protocolExcludesList = getProtocolExcludesList(port, sslPort);
-
+
if (protocolExcludesList.equals(""))
{
return;
@@ -522,9 +558,36 @@ public class QpidBrokerTestCase extends QpidTestCase
protected String getProtocolExcludesList(int port, int sslPort)
{
- final String protocolExcludesList =
- _brokerProtocolExcludes.replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort);
- return protocolExcludesList;
+ return System.getProperty(BROKER_PROTOCOL_EXCLUDES,"").replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort);
+ }
+
+ private String getProtocolIncludesList(int port, int sslPort)
+ {
+ return System.getProperty(BROKER_PROTOCOL_INCLUDES, "").replace("@PORT", "" + port).replace("@SSL_PORT", "" + sslPort);
+ }
+
+ private void addIncludedPorts(int port, int sslPort, BrokerOptions options)
+ {
+ final String protocolIncludesList = getProtocolIncludesList(port, sslPort);
+
+ if (protocolIncludesList.equals(""))
+ {
+ return;
+ }
+ final String[] toks = protocolIncludesList.split("\\s");
+
+ if(toks.length % 2 != 0)
+ {
+ throw new IllegalArgumentException("Must be an even number of tokens in '" + protocolIncludesList + "'");
+ }
+ for (int i = 0; i < toks.length; i=i+2)
+ {
+ String includeArg = toks[i];
+ final int includedPort = Integer.parseInt(toks[i+1]);
+ options.addIncludedPort(ProtocolInclusion.lookup(includeArg), includedPort);
+
+ _logger.info("Adding protocol inclusion " + includeArg + " " + includedPort);
+ }
}
private boolean existingInternalBroker()
@@ -552,12 +615,17 @@ public class QpidBrokerTestCase extends QpidTestCase
public String getTestConfigFile()
{
- return _output + "/" + getTestQueueName() + "-config.xml";
+ return getTestConfigFile(getPort());
+ }
+
+ public String getTestConfigFile(int port)
+ {
+ return _output + "/" + getTestQueueName() + "-" + port + "-config.xml";
}
- public String getTestVirtualhostsFile()
+ public String getTestVirtualhostsFile(int port)
{
- return _output + "/" + getTestQueueName() + "-virtualhosts.xml";
+ return _output + "/" + getTestQueueName() + "-" + port + "-virtualhosts.xml";
}
private String relativeToQpidHome(String file)
@@ -567,38 +635,50 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void saveTestConfiguration() throws ConfigurationException
{
+ String relative = saveTestConfiguration(getPort(), _testConfiguration);
+ _logger.info("Set test.config property to: " + relative);
+ setSystemProperty(TEST_CONFIG, relative);
+ }
+
+ protected String saveTestConfiguration(int port, XMLConfiguration testConfiguration) throws ConfigurationException
+ {
// Specify the test config file
- String testConfig = getTestConfigFile();
+ String testConfig = getTestConfigFile(port);
String relative = relativeToQpidHome(testConfig);
- setSystemProperty("test.config", relative);
- _logger.info("Set test.config property to: " + relative);
_logger.info("Saving test virtualhosts file at: " + testConfig);
// Create the file if configuration does not exist
- if (_testConfiguration.isEmpty())
+ if (testConfiguration.isEmpty())
{
- _testConfiguration.addProperty("__ignore", "true");
+ testConfiguration.addProperty("__ignore", "true");
}
- _testConfiguration.save(testConfig);
+ testConfiguration.save(testConfig);
+ return relative;
}
protected void saveTestVirtualhosts() throws ConfigurationException
{
+ String relative = saveTestVirtualhosts(getPort(), _testVirtualhosts);
+ _logger.info("Set test.virtualhosts property to: " + relative);
+ setSystemProperty(TEST_VIRTUALHOSTS, relative);
+ }
+
+ protected String saveTestVirtualhosts(int port, XMLConfiguration virtualHostConfiguration) throws ConfigurationException
+ {
// Specify the test virtualhosts file
- String testVirtualhosts = getTestVirtualhostsFile();
+ String testVirtualhosts = getTestVirtualhostsFile(port);
String relative = relativeToQpidHome(testVirtualhosts);
- setSystemProperty("test.virtualhosts", relative);
- _logger.info("Set test.virtualhosts property to: " + relative);
- _logger.info("Saving test virtualhosts file at: " + testVirtualhosts);
+ _logger.info("Set test.virtualhosts property to: " + testVirtualhosts);
// Create the file if configuration does not exist
- if (_testVirtualhosts.isEmpty())
+ if (virtualHostConfiguration.isEmpty())
{
- _testVirtualhosts.addProperty("__ignore", "true");
+ virtualHostConfiguration.addProperty("__ignore", "true");
}
- _testVirtualhosts.save(testVirtualhosts);
+ virtualHostConfiguration.save(testVirtualhosts);
+ return relative;
}
protected void cleanBrokerWork(final String qpidWork)
@@ -639,11 +719,55 @@ public class QpidBrokerTestCase extends QpidTestCase
public void stopAllBrokers()
{
+ boolean exceptionOccured = false;
Set<Integer> runningBrokerPorts = new HashSet<Integer>(getBrokerPortNumbers());
for (int brokerPortNumber : runningBrokerPorts)
{
+ if (!stopBrokerSafely(brokerPortNumber))
+ {
+ exceptionOccured = true;
+ }
+ }
+ if (exceptionOccured)
+ {
+ throw new RuntimeException("Exception occured on stopping of test broker. Please, examine logs for details");
+ }
+ }
+
+ protected boolean stopBrokerSafely(int brokerPortNumber)
+ {
+ boolean success = true;
+ BrokerHolder broker = _brokers.get(brokerPortNumber);
+ try
+ {
stopBroker(brokerPortNumber);
}
+ catch(Exception e)
+ {
+ success = false;
+ _logger.error("Failed to stop broker " + broker + " at port " + brokerPortNumber, e);
+ if (broker != null)
+ {
+ // save the thread dump in case of dead locks
+ try
+ {
+ _logger.error("Broker " + broker + " thread dump:" + broker.dumpThreads());
+ }
+ finally
+ {
+ // try to kill broker
+ try
+ {
+ broker.kill();
+ }
+ catch(Exception killException)
+ {
+ // ignore
+ }
+ }
+ }
+ }
+ return success;
}
public void stopBroker(int port)
@@ -705,21 +829,21 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void makeVirtualHostPersistent(String virtualhost)
throws ConfigurationException, IOException
{
- Class<?> storeFactoryClass = null;
+ Class<?> storeClass = null;
try
{
// Try and lookup the BDB class
- storeFactoryClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStoreFactory");
+ storeClass = Class.forName("org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
}
catch (ClassNotFoundException e)
{
// No BDB store, we'll use Derby instead.
- storeFactoryClass = DerbyMessageStoreFactory.class;
+ storeClass = DerbyMessageStore.class;
}
- setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.factoryclass",
- storeFactoryClass.getName());
+ setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store.class",
+ storeClass.getName());
setConfigurationProperty("virtualhosts.virtualhost." + virtualhost + ".store." + MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY,
"${QPID_WORK}/" + virtualhost);
}
@@ -974,10 +1098,10 @@ public class QpidBrokerTestCase extends QpidTestCase
{
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
-
+
public Connection getConnection() throws JMSException, NamingException
{
- return getConnection("guest", "guest");
+ return getConnection(GUEST_USERNAME, GUEST_PASSWORD);
}
public Connection getConnection(ConnectionURL url) throws JMSException
@@ -1268,14 +1392,14 @@ public class QpidBrokerTestCase extends QpidTestCase
/**
* Reloads the broker security configuration using the ApplicationRegistry (InVM brokers) or the
- * ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be
+ * ConfigurationManagementMBean via the JMX interface (Standalone brokers, management must be
* enabled before calling the method).
*/
public void reloadBrokerSecurityConfig() throws Exception
{
JMXTestUtils jmxu = new JMXTestUtils(this);
jmxu.open();
-
+
try
{
ConfigurationManagement configMBean = jmxu.getConfigurationManagement();
@@ -1285,7 +1409,7 @@ public class QpidBrokerTestCase extends QpidTestCase
{
jmxu.close();
}
-
+
LogMonitor _monitor = new LogMonitor(_outputFile);
assertTrue("The expected server security configuration reload did not occur",
_monitor.waitForMessage(ServerConfiguration.SECURITY_CONFIG_RELOADED, LOGMONITOR_TIMEOUT));
@@ -1295,4 +1419,24 @@ public class QpidBrokerTestCase extends QpidTestCase
{
return FAILING_PORT;
}
+
+ public XMLConfiguration getTestVirtualhosts()
+ {
+ return _testVirtualhosts;
+ }
+
+ public void setTestVirtualhosts(XMLConfiguration testVirtualhosts)
+ {
+ _testVirtualhosts = testVirtualhosts;
+ }
+
+ public XMLConfiguration getTestConfiguration()
+ {
+ return _testConfiguration;
+ }
+
+ public void setTestConfiguration(XMLConfiguration testConfiguration)
+ {
+ _testConfiguration = testConfiguration;
+ }
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
index 787fc164d5..bce97a574a 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/utils/SpawnedBrokerHolder.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.test.utils;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.Set;
import org.apache.log4j.Logger;
@@ -32,8 +35,9 @@ public class SpawnedBrokerHolder implements BrokerHolder
private final Process _process;
private final Integer _pid;
private final String _workingDirectory;
+ private Set<Integer> _portsUsedByBroker;
- public SpawnedBrokerHolder(final Process process, final String workingDirectory)
+ public SpawnedBrokerHolder(final Process process, final String workingDirectory, Set<Integer> portsUsedByBroker)
{
if(process == null)
{
@@ -43,6 +47,7 @@ public class SpawnedBrokerHolder implements BrokerHolder
_process = process;
_pid = retrieveUnixPidIfPossible();
_workingDirectory = workingDirectory;
+ _portsUsedByBroker = portsUsedByBroker;
}
@Override
@@ -57,6 +62,8 @@ public class SpawnedBrokerHolder implements BrokerHolder
_process.destroy();
reapChildProcess();
+
+ waitUntilPortsAreFree();
}
@Override
@@ -74,6 +81,8 @@ public class SpawnedBrokerHolder implements BrokerHolder
}
reapChildProcess();
+
+ waitUntilPortsAreFree();
}
private void sendSigkillForImmediateShutdown(Integer pid)
@@ -146,4 +155,37 @@ public class SpawnedBrokerHolder implements BrokerHolder
}
}
+ private void waitUntilPortsAreFree()
+ {
+ new PortHelper().waitUntilPortsAreFree(_portsUsedByBroker);
+ }
+
+ @Override
+ public String dumpThreads()
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ Process process = Runtime.getRuntime().exec("jstack " + _pid);
+ InputStream is = process.getInputStream();
+ byte[] buffer = new byte[1024];
+ int length = -1;
+ while ((length = is.read(buffer)) != -1)
+ {
+ baos.write(buffer, 0, length);
+ }
+ }
+ catch (Exception e)
+ {
+ LOGGER.error("Error whilst collecting thread dump for " + _pid, e);
+ }
+ return new String(baos.toByteArray());
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SpawnedBrokerHolder [_pid=" + _pid + ", _portsUsedByBroker="
+ + _portsUsedByBroker + "]";
+ }
}