summaryrefslogtreecommitdiff
path: root/qpid/java/qpid-perftests-systests/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/qpid-perftests-systests/src/test/java')
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/ConfigFileTestHelper.java48
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/DistributedTestSystemTestBase.java72
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java100
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/SystemTestConstants.java28
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/BasicDistributedClientTest.java186
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ConsumerParticipantTest.java156
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java110
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java325
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/MessageProviderTest.java119
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ProducerParticipantTest.java132
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java263
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java157
-rw-r--r--qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java100
13 files changed, 1796 insertions, 0 deletions
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/ConfigFileTestHelper.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/ConfigFileTestHelper.java
new file mode 100644
index 0000000000..a4f4cab018
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/ConfigFileTestHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.qpid.disttest.controller.config.Config;
+import org.apache.qpid.disttest.controller.config.ConfigReader;
+
+public class ConfigFileTestHelper
+{
+ public static Reader getConfigFileReader(Class<?> testClass, String resourceName)
+ {
+ InputStream inputStream = testClass.getResourceAsStream(resourceName);
+ if(inputStream == null)
+ {
+ throw new RuntimeException("Can't find resource " + resourceName + " using classloader of class " + testClass);
+ }
+ Reader reader = new InputStreamReader(inputStream);
+ return reader;
+ }
+
+ public static Config getConfigFromResource(Class<?> testClass, String resourceName)
+ {
+ ConfigReader configReader = new ConfigReader();
+ Config config = configReader.readConfig(getConfigFileReader(testClass, resourceName));
+ return config;
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/DistributedTestSystemTestBase.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/DistributedTestSystemTestBase.java
new file mode 100644
index 0000000000..96daf64526
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/DistributedTestSystemTestBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest;
+
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class DistributedTestSystemTestBase extends QpidBrokerTestCase
+{
+ protected Context _context;
+
+ protected Connection _connection;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ final Properties properties = new Properties();
+ properties.load(DistributedTestSystemTestBase.class.getResourceAsStream("perftests.systests.properties"));
+ _context = new InitialContext(properties);
+
+ _connection = getConnection();
+ _connection.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ // no need to close connections - this is done by superclass
+
+ super.tearDown();
+ }
+
+ public Context getContext()
+ {
+ return _context;
+ }
+
+ @Override
+ public Connection getConnection() throws JMSException, NamingException
+ {
+ final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup("connectionfactory");
+ final Connection connection = connectionFactory.createConnection();
+ _connections.add(connection);
+ return connection;
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java
new file mode 100644
index 0000000000..59396d46c0
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.disttest.jms.QpidQueueCreator;
+
+public class QpidQueueCreatorTest extends DistributedTestSystemTestBase
+{
+ private static final Map<String, Object> EMPTY_ATTRIBUTES = Collections.emptyMap();
+
+ private static final boolean QUEUE_DURABILITY = true;
+
+ private Connection _connection;
+ private QpidQueueCreator _creator;
+ private Session _session;
+ private List<QueueConfig> _configs;
+ private String _queueName;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _creator = new QpidQueueCreator();
+ _configs = new ArrayList<QueueConfig>();
+ _queueName = "direct://amq.direct//" + getTestQueueName() + "?durable='" + QUEUE_DURABILITY + "'";
+ }
+
+ public void testCreateQueueWithoutAttributes() throws Exception
+ {
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
+
+ assertQueueBound(_queueName, false);
+
+ _creator.createQueues(_connection, _session, _configs);
+
+ assertQueueBound(_queueName, true);
+ }
+
+ public void testCreateWithAttributes() throws Exception
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put("x-qpid-priorities", Integer.valueOf(5));
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, attributes));
+
+ assertQueueBound(_queueName, false);
+
+ _creator.createQueues(_connection, _session, _configs);
+
+ assertQueueBound(_queueName, true);
+ }
+
+ public void testDeleteQueues() throws Exception
+ {
+ _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES));
+
+ assertQueueBound(_queueName, false);
+
+ _creator.createQueues(_connection, _session, _configs);
+ assertQueueBound(_queueName, true);
+
+ _creator.deleteQueues(_connection, _session, _configs);
+ assertQueueBound(_queueName, false);
+ }
+
+ private void assertQueueBound(String queueName, boolean isBound) throws Exception
+ {
+ AMQDestination destination = (AMQDestination)_session.createQueue(queueName);
+ assertEquals("Queue is not in expected bound state", isBound, ((AMQSession<?, ?>)_session).isQueueBound(destination));
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/SystemTestConstants.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/SystemTestConstants.java
new file mode 100644
index 0000000000..b06ab0c735
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/SystemTestConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest;
+
+public abstract class SystemTestConstants
+{
+ public static final long REGISTRATION_TIMEOUT = 20000;
+ public static final long COMMAND_RESPONSE_TIMEOUT = 30000;
+ public static final long TEST_RESULT_TIMEOUT = 20000;
+
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/BasicDistributedClientTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/BasicDistributedClientTest.java
new file mode 100644
index 0000000000..d599bdc5c4
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/BasicDistributedClientTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.ClientState;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.jms.JmsMessageAdaptor;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+import org.apache.qpid.disttest.message.NoOpCommand;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.disttest.message.StopClientCommand;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+
+public class BasicDistributedClientTest extends DistributedTestSystemTestBase
+{
+ private Session _session = null;
+ private MessageProducer _clientQueueProducer;
+ private Client _client;
+ private ControllerQueue _controllerQueue;
+ private ClientJmsDelegate _clientJmsDelegate = null;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _controllerQueue = new ControllerQueue(_connection, _context);
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _clientJmsDelegate = new ClientJmsDelegate(_context);
+ _client = new Client(_clientJmsDelegate);
+ _client.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _controllerQueue.close();
+ if (_session != null)
+ {
+ _session.close();
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testClientSendsRegistrationMessage() throws Exception
+ {
+ final RegisterClientCommand regClientCommand = _controllerQueue.getNext();
+
+ assertNotNull("Client must have a non-null name", regClientCommand.getClientName());
+ assertEquals("Unexpected client name", _clientJmsDelegate.getClientName(), regClientCommand.getClientName());
+ assertNotNull("Client queue name should not be null", regClientCommand.getClientQueueName());
+ }
+
+ public void testClientSendsCommandResponses() throws Exception
+ {
+ final RegisterClientCommand registrationCommand = _controllerQueue.getNext();
+ createClientQueueProducer(registrationCommand);
+
+ sendCommandToClient(new NoOpCommand());
+
+ final Response responseCommand = _controllerQueue.getNext();
+ assertEquals("Incorrect client message type", CommandType.RESPONSE, responseCommand.getType());
+ }
+
+ public void testClientCanBeStoppedViaCommand() throws Exception
+ {
+ assertEquals("Expected client to be in STARTED state", ClientState.READY, _client.getState());
+
+ final RegisterClientCommand registrationCommand = _controllerQueue.getNext();
+ createClientQueueProducer(registrationCommand);
+
+ final Command stopClientCommand = new StopClientCommand();
+ sendCommandToClient(stopClientCommand);
+
+ _client.waitUntilStopped(1000);
+
+ Response response = _controllerQueue.getNext();
+ assertNotNull(response);
+ assertFalse("response shouldn't contain error", response.hasError());
+
+ assertEquals("Expected client to be in STOPPED state", ClientState.STOPPED, _client.getState());
+ }
+
+ public void testClientCanCreateTestConnection() throws Exception
+ {
+ assertEquals("Unexpected number of test connections", 0, _clientJmsDelegate.getNoOfTestConnections());
+
+ final RegisterClientCommand registration = _controllerQueue.getNext();
+ createClientQueueProducer(registration);
+
+ final CreateConnectionCommand createConnectionCommand = new CreateConnectionCommand();
+ createConnectionCommand.setConnectionName("newTestConnection");
+ createConnectionCommand.setConnectionFactoryName("connectionfactory");
+
+ sendCommandToClient(createConnectionCommand);
+ Response response = _controllerQueue.getNext();
+
+ assertFalse("Response message should not have indicated an error", response.hasError());
+ assertEquals("Unexpected number of test connections", 1, _clientJmsDelegate.getNoOfTestConnections());
+ }
+
+ public void testClientCanCreateTestSession() throws Exception
+ {
+ assertEquals("Unexpected number of test sessions", 0, _clientJmsDelegate.getNoOfTestSessions());
+
+ final RegisterClientCommand registration = _controllerQueue.getNext();
+ createClientQueueProducer(registration);
+
+ final CreateConnectionCommand createConnectionCommand = new CreateConnectionCommand();
+ createConnectionCommand.setConnectionName("newTestConnection");
+ createConnectionCommand.setConnectionFactoryName("connectionfactory");
+
+ sendCommandToClient(createConnectionCommand);
+ Response response = _controllerQueue.getNext();
+ assertFalse("Response message should not have indicated an error", response.hasError());
+
+ final CreateSessionCommand createSessionCommand = new CreateSessionCommand();
+ createSessionCommand.setConnectionName("newTestConnection");
+ createSessionCommand.setSessionName("newTestSession");
+ createSessionCommand.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+
+ sendCommandToClient(createSessionCommand);
+ response = _controllerQueue.getNext();
+
+ assertFalse("Response message should not have indicated an error", response.hasError());
+ assertEquals("Unexpected number of test sessions", 1, _clientJmsDelegate.getNoOfTestSessions());
+ }
+
+ private void sendCommandToClient(final Command command) throws JMSException
+ {
+ final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
+ _clientQueueProducer.send(message);
+ }
+
+ private void createClientQueueProducer(
+ final RegisterClientCommand registration) throws JMSException
+ {
+ final Destination clientCommandQueue = createDestinationFromRegistration(registration);
+ _clientQueueProducer = _session.createProducer(clientCommandQueue);
+ }
+
+ private Queue createDestinationFromRegistration(
+ final RegisterClientCommand registrationCommand)
+ throws JMSException
+ {
+ String clientQueueName = registrationCommand.getClientQueueName();
+ assertNotNull("Null client queue in register message", clientQueueName);
+ return _session.createQueue(clientQueueName);
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ConsumerParticipantTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ConsumerParticipantTest.java
new file mode 100644
index 0000000000..a3c0430865
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ConsumerParticipantTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.ConsumerParticipant;
+import org.apache.qpid.disttest.client.ParticipantExecutor;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+import org.apache.qpid.systest.disttest.clientonly.ProducerParticipantTest.TestClientJmsDelegate;
+
+public class ConsumerParticipantTest extends DistributedTestSystemTestBase
+{
+ private MessageProducer _producer;
+ private Session _session;
+ private TestClientJmsDelegate _delegate;
+ private Client _client;
+ private ControllerQueue _controllerQueue;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _controllerQueue = new ControllerQueue(_connection, _context);
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _producer = _session.createProducer(getTestQueue());
+
+ _delegate = new TestClientJmsDelegate(getContext());
+ _client = new Client(_delegate);
+ }
+
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ _controllerQueue.close();
+ super.tearDown();
+ }
+
+ public void testConsumeNumberOfMessagesSynchronously() throws Exception
+ {
+ runTest(Session.AUTO_ACKNOWLEDGE, 10, 0, true);
+ }
+
+ public void testConsumeNumberOfMessagesAsynchronously() throws Exception
+ {
+ runTest(Session.AUTO_ACKNOWLEDGE, 10, 0, false);
+ }
+
+ public void testSelectors() throws Exception
+ {
+ final CreateConsumerCommand command = new CreateConsumerCommand();
+ command.setNumberOfMessages(10);
+ command.setSessionName("testSession");
+ command.setDestinationName(getTestQueueName());
+ command.setSelector("id=1");
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _delegate.addConnection("name-does-not-matter", _connection);
+ _delegate.addSession(command.getSessionName(), session);
+
+ ConsumerParticipant consumerParticipant = new ConsumerParticipant(_delegate, command);
+ _delegate.createConsumer(command);
+
+ for (int i = 0; i < 20; i++)
+ {
+ Message message = _session.createMessage();
+ if (i % 2 == 0)
+ {
+ message.setIntProperty("id", 0);
+ }
+ else
+ {
+ message.setIntProperty("id", 1);
+ }
+ _producer.send(message);
+ }
+
+ new ParticipantExecutor(consumerParticipant).start(_client);
+
+ ParticipantResult results = _controllerQueue.getNext();
+ assertNotNull("No results message recieved", results);
+ assertEquals("Unexpected number of messages received", 10, results.getNumberOfMessagesProcessed());
+
+ Session testQueueConsumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer testQueueConsumer = testQueueConsumerSession.createConsumer(getTestQueue());
+ for (int i = 0; i < 10; i++)
+ {
+ Message message = testQueueConsumer.receive(2000);
+ assertNotNull("Message is not received: " + message, message);
+ assertEquals("Unexpected id value", 0, message.getIntProperty("id"));
+ }
+ Message message = testQueueConsumer.receive(2000);
+ assertNull("Unexpected message remaining on test queue: " + message, message);
+
+ _connection.stop();
+ }
+
+ protected void runTest(int acknowledgeMode, int numberOfMessages, int batchSize, boolean synchronous) throws Exception
+ {
+ final CreateConsumerCommand command = new CreateConsumerCommand();
+ command.setNumberOfMessages(numberOfMessages);
+ command.setBatchSize(batchSize);
+ command.setSessionName("testSession");
+ command.setDestinationName(getTestQueueName());
+ command.setSynchronous(synchronous);
+
+ Session session = _connection.createSession(Session.SESSION_TRANSACTED == acknowledgeMode, acknowledgeMode);
+
+ _delegate.addConnection("name-does-not-matter", _connection);
+ _delegate.addSession(command.getSessionName(), session);
+
+ ConsumerParticipant consumerParticipant = new ConsumerParticipant(_delegate, command);
+ _delegate.createConsumer(command);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ _producer.send(_session.createMessage());
+ }
+
+ new ParticipantExecutor(consumerParticipant).start(_client);
+
+ ParticipantResult results = _controllerQueue.getNext();
+ assertNotNull("No results message recieved", results);
+
+ Session testQueueConsumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer testQueueConsumer = testQueueConsumerSession.createConsumer(getTestQueue());
+ Message message = testQueueConsumer.receive(2000);
+ assertNull("Unexpected message remaining on test queue: " + message, message);
+
+ _connection.stop();
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java
new file mode 100644
index 0000000000..2a108721b0
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ControllerQueue.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.naming.Context;
+
+import org.junit.Assert;
+
+import org.apache.qpid.disttest.DistributedTestConstants;
+import org.apache.qpid.disttest.jms.JmsMessageAdaptor;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+
+/**
+ * Helper for unit tests to simplify access to the Controller Queue.
+ *
+ * Implicitly creates the queue, so you must create a {@link ControllerQueue} object before
+ * trying to use the underlying queue.
+ */
+public class ControllerQueue
+{
+ private MessageConsumer _controllerQueueMessageConsumer;
+ private Session _controllerQueueSession;
+
+ /**
+ * Implicitly creates the queue, so you must create a {@link ControllerQueue} object before
+ * trying to use the underlying queue.
+ *
+ * @param context used for looking up the controller queue {@link Destination}
+ */
+ public ControllerQueue(Connection connection, Context context) throws Exception
+ {
+ _controllerQueueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME);
+ _controllerQueueMessageConsumer = _controllerQueueSession.createConsumer(controllerQueue);
+ }
+
+ public <T extends Command> T getNext(long timeout) throws JMSException
+ {
+ final Message message = _controllerQueueMessageConsumer.receive(timeout);
+ if(message == null)
+ {
+ return null;
+ }
+
+ return (T) JmsMessageAdaptor.messageToCommand(message);
+ }
+
+ public void addNextResponse(Map<CommandType, Command> responses) throws JMSException
+ {
+ Command nextResponse = getNext();
+ responses.put(nextResponse.getType(), nextResponse);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Command> T getNext() throws JMSException
+ {
+ return (T)getNext(true);
+ }
+
+ public <T extends Command> T getNext(boolean assertMessageExists) throws JMSException
+ {
+ final Message message = _controllerQueueMessageConsumer.receive(2000);
+ if(assertMessageExists)
+ {
+ Assert.assertNotNull("No message received from control queue", message);
+ }
+
+ if(message == null)
+ {
+ return null;
+ }
+
+ T command = (T) JmsMessageAdaptor.messageToCommand(message);
+
+ return command;
+ }
+
+ public void close() throws Exception
+ {
+ _controllerQueueMessageConsumer.close();
+ _controllerQueueSession.close();
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java
new file mode 100644
index 0000000000..5b5a60ac43
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/DistributedClientTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import static org.apache.qpid.disttest.client.ClientState.READY;
+import static org.apache.qpid.disttest.client.ClientState.RUNNING_TEST;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.ClientState;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.jms.JmsMessageAdaptor;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.CreateConnectionCommand;
+import org.apache.qpid.disttest.message.CreateConsumerCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.CreateSessionCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.disttest.message.StartTestCommand;
+import org.apache.qpid.disttest.message.TearDownTestCommand;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+
+public class DistributedClientTest extends DistributedTestSystemTestBase
+{
+ private static final String TEST_CONSUMER = "newTestConsumer";
+ private static final String TEST_DESTINATION = "newDestination";
+ private static final String TEST_PRODUCER_NAME = "newTestProducer";
+ private static final String TEST_SESSION_NAME = "newTestSession";
+ private static final String TEST_CONNECTION_NAME = "newTestConnection";
+
+ private Session _session = null;
+ private MessageProducer _clientQueueProducer;
+ private Client _client;
+ private ControllerQueue _controllerQueue;
+ protected ClientJmsDelegate _clientJmsDelegate;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _controllerQueue = new ControllerQueue(_connection, _context);
+
+ _clientJmsDelegate = new ClientJmsDelegate(_context);
+ _client = new Client(_clientJmsDelegate);
+ _client.start();
+
+ final RegisterClientCommand registrationCommand = _controllerQueue.getNext();
+ createClientQueueProducer(registrationCommand);
+
+ createTestConnection(TEST_CONNECTION_NAME);
+ createTestSession(TEST_CONNECTION_NAME, TEST_SESSION_NAME);
+
+ assertEquals("Expected no test producers at start of test", 0, _clientJmsDelegate.getNoOfTestProducers());
+ assertEquals("Expected no test consumers at start of test", 0, _clientJmsDelegate.getNoOfTestConsumers());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _controllerQueue.close();
+ if (_session != null)
+ {
+ _session.close();
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testClientCanCreateTestProducer() throws Exception
+ {
+ assertEquals("Should initially have zero producers", 0, _clientJmsDelegate.getNoOfTestProducers());
+
+ createTestProducer(TEST_SESSION_NAME, TEST_PRODUCER_NAME, TEST_DESTINATION);
+
+ assertEquals("Should now have one test producer", 1, _clientJmsDelegate.getNoOfTestProducers());
+ }
+
+ public void testClientCanCreateTestConsumer() throws Exception
+ {
+ assertEquals("Should initially have no test consumers", 0, _clientJmsDelegate.getNoOfTestConsumers());
+
+ createTestConsumer(TEST_SESSION_NAME, TEST_CONSUMER, TEST_DESTINATION);
+
+ assertEquals("Should now have one test consumer", 1, _clientJmsDelegate.getNoOfTestConsumers());
+ }
+
+ public void testClientFailsToCreateSessionUsingInvalidConnection() throws Exception
+ {
+ int initialNoOfTestSessions = _clientJmsDelegate.getNoOfTestSessions();
+
+ createTestSession("nonExistentConnection", TEST_SESSION_NAME, false /* shouldSucceed */);
+
+ assertEquals("Number of test sessions should not have changed", initialNoOfTestSessions, _clientJmsDelegate.getNoOfTestSessions());
+ }
+
+ public void testClientFailsToCreateProducerUsingInvalidSession() throws Exception
+ {
+ int initialNoOfTestProducers = _clientJmsDelegate.getNoOfTestProducers();
+
+ createTestProducer("invalidSessionName", TEST_PRODUCER_NAME, TEST_DESTINATION, false /* shouldSucceed */);
+
+ assertEquals("Number of test producers should not have changed", initialNoOfTestProducers, _clientJmsDelegate.getNoOfTestProducers());
+ }
+
+ public void testClientFailsToCreateConsumerUsingInvalidSession() throws Exception
+ {
+ int initialNoOfTestConsumers = _clientJmsDelegate.getNoOfTestConsumers();
+
+ createTestConsumer("invalidSessionName", TEST_CONSUMER, TEST_DESTINATION, false /* shouldSucceed */);
+
+ assertEquals("Number of test consumers should not have changed", initialNoOfTestConsumers, _clientJmsDelegate.getNoOfTestConsumers());
+ }
+
+ public void testClientCanStartPerformingTests() throws Exception
+ {
+ createTestProducer(TEST_SESSION_NAME, TEST_PRODUCER_NAME, TEST_DESTINATION);
+
+ sendCommandToClient(new StartTestCommand());
+
+ validateStartTestResponseAndParticipantResults(CommandType.PRODUCER_PARTICIPANT_RESULT);
+
+ assertState(_client, RUNNING_TEST);
+ }
+
+ public void testParticipantsSendResults() throws Exception
+ {
+ createTestProducer(TEST_SESSION_NAME, TEST_PRODUCER_NAME, TEST_DESTINATION);
+
+ sendCommandToClient(new StartTestCommand());
+
+ validateStartTestResponseAndParticipantResults(CommandType.PRODUCER_PARTICIPANT_RESULT);
+ }
+
+ /**
+ * Need to validate both of these responses together because their order is non-deterministic
+ * @param expectedParticipantResultCommandType TODO
+ */
+ private void validateStartTestResponseAndParticipantResults(CommandType expectedParticipantResultCommandType) throws JMSException
+ {
+ Map<CommandType, Command> responses = new HashMap<CommandType, Command>();
+ _controllerQueue.addNextResponse(responses);
+ _controllerQueue.addNextResponse(responses);
+
+ ParticipantResult results = (ParticipantResult) responses.get(expectedParticipantResultCommandType);
+ validateResponse(null, results, true);
+
+ Response startTestResponse = (Response) responses.get(CommandType.RESPONSE);
+ validateResponse(CommandType.START_TEST, startTestResponse, true);
+ }
+
+ public void testClientCannotStartPerformingTestsInNonReadyState() throws Exception
+ {
+ assertState(_client, READY);
+ sendCommandAndValidateResponse(new StartTestCommand(), true);
+ assertState(_client, RUNNING_TEST);
+
+ // Send another start test command
+ sendCommandAndValidateResponse(new StartTestCommand(), false /*should reject duplicate start command*/);
+ assertState(_client, RUNNING_TEST);
+ }
+
+ public void testNonRunningClientIsUnaffectedByStopTestCommand() throws Exception
+ {
+ assertState(_client, READY);
+
+ sendCommandAndValidateResponse(new TearDownTestCommand(), false);
+
+ assertState(_client, READY);
+ }
+
+ private void sendCommandToClient(final Command command) throws Exception
+ {
+ final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
+ _clientQueueProducer.send(message);
+ ((AMQSession<?, ?>)_session).sync();
+ }
+
+ private void sendCommandAndValidateResponse(final Command command, boolean shouldSucceed) throws Exception
+ {
+ sendCommandToClient(command);
+ Response response = _controllerQueue.getNext();
+ validateResponse(command.getType(), response, shouldSucceed);
+ }
+
+ private void sendCommandAndValidateResponse(final Command command) throws Exception
+ {
+ sendCommandAndValidateResponse(command, true);
+ }
+
+ private void createTestConnection(String connectionName) throws Exception
+ {
+ int initialNumberOfConnections = _clientJmsDelegate.getNoOfTestConnections();
+
+ final CreateConnectionCommand createConnectionCommand = new CreateConnectionCommand();
+ createConnectionCommand.setConnectionName(connectionName);
+ createConnectionCommand.setConnectionFactoryName("connectionfactory");
+
+ sendCommandAndValidateResponse(createConnectionCommand);
+
+ int expectedNumberOfConnections = initialNumberOfConnections + 1;
+
+ assertEquals("unexpected number of test connections", expectedNumberOfConnections, _clientJmsDelegate.getNoOfTestConnections());
+ }
+
+ private void createTestSession(String connectionName, String sessionName, boolean shouldSucceed) throws Exception
+ {
+ int initialNumberOfSessions = _clientJmsDelegate.getNoOfTestSessions();
+
+ final CreateSessionCommand createSessionCommand = new CreateSessionCommand();
+ createSessionCommand.setConnectionName(connectionName);
+ createSessionCommand.setSessionName(sessionName);
+ createSessionCommand.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+
+ sendCommandAndValidateResponse(createSessionCommand, shouldSucceed);
+
+ int expectedNumberOfSessions = initialNumberOfSessions + (shouldSucceed ? 1 : 0);
+
+ assertEquals("unexpected number of test sessions", expectedNumberOfSessions, _clientJmsDelegate.getNoOfTestSessions());
+ }
+
+ private void createTestSession(String connectionName, String sessionName) throws Exception
+ {
+ createTestSession(connectionName, sessionName, true);
+ }
+
+ private void createTestProducer(String sessionName, String producerName, String destinationName, boolean shouldSucceed) throws Exception
+ {
+ final CreateProducerCommand createProducerCommand = new CreateProducerCommand();
+ createProducerCommand.setParticipantName(producerName);
+ createProducerCommand.setSessionName(sessionName);
+ createProducerCommand.setDestinationName(destinationName);
+ createProducerCommand.setNumberOfMessages(100);
+
+ sendCommandAndValidateResponse(createProducerCommand, shouldSucceed);
+ }
+
+ private void createTestProducer(String sessionName, String producerName, String destinationName) throws Exception
+ {
+ createTestProducer(sessionName, producerName, destinationName, true);
+ }
+
+ private void createTestConsumer(String sessionName, String consumerName, String destinationName, boolean shouldSucceed) throws Exception
+ {
+ final CreateConsumerCommand createConsumerCommand = new CreateConsumerCommand();
+ createConsumerCommand.setSessionName(sessionName);
+ createConsumerCommand.setDestinationName(destinationName);
+ createConsumerCommand.setParticipantName(consumerName);
+ createConsumerCommand.setNumberOfMessages(1);
+
+ sendCommandAndValidateResponse(createConsumerCommand, shouldSucceed);
+ }
+
+ private void createTestConsumer(String sessionName, String consumerName, String destinationName) throws Exception
+ {
+ createTestConsumer(sessionName, consumerName, destinationName, true);
+ }
+
+ private void validateResponse(CommandType originatingCommandType, Response response, boolean shouldSucceed) throws JMSException
+ {
+ assertEquals("Response is a reply to the wrong command: " + response,
+ originatingCommandType,
+ response.getInReplyToCommandType());
+
+ boolean shouldHaveError = !shouldSucceed;
+ assertEquals("Response message " + response + " should have indicated hasError=" + shouldHaveError,
+ shouldHaveError,
+ response.hasError());
+ }
+
+ private void createClientQueueProducer(final RegisterClientCommand registration) throws JMSException
+ {
+ final Destination clientCommandQueue = createDestinationFromRegistration(registration);
+ _clientQueueProducer = _session.createProducer(clientCommandQueue);
+ }
+
+ private Queue createDestinationFromRegistration(final RegisterClientCommand registrationCommand) throws JMSException
+ {
+ String clientQueueName = registrationCommand.getClientQueueName();
+ assertNotNull("Null client queue in register message", clientQueueName);
+ return _session.createQueue(clientQueueName);
+ }
+
+ private static void assertState(Client client, ClientState expectedState)
+ {
+ ClientState clientState = client.getState();
+ assertEquals("Client should be in state: " + expectedState + " but is in state " + clientState, expectedState, clientState);
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/MessageProviderTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/MessageProviderTest.java
new file mode 100644
index 0000000000..dcbff6518b
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/MessageProviderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.disttest.client.MessageProvider;
+import org.apache.qpid.disttest.client.property.PropertyValue;
+import org.apache.qpid.disttest.client.property.SimplePropertyValue;
+import org.apache.qpid.disttest.message.CreateMessageProviderCommand;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+import org.apache.qpid.systest.disttest.clientonly.ProducerParticipantTest.TestClientJmsDelegate;
+
+public class MessageProviderTest extends DistributedTestSystemTestBase
+{
+ private MessageConsumer _consumer;
+ private Session _session;
+ private TestClientJmsDelegate _delegate;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = _session.createConsumer(getTestQueue());
+ _delegate = new TestClientJmsDelegate(getContext());
+ }
+
+ public void testMessageSize() throws Exception
+ {
+ runSizeTest(0);
+ runSizeTest(5);
+ runSizeTest(512);
+ }
+
+ public void runSizeTest(int size) throws Exception
+ {
+ CreateProducerCommand command = new CreateProducerCommand();
+ command.setMessageSize(size);
+ MessageProvider messageProvider = new MessageProvider(null);
+ Message message = messageProvider.nextMessage(_session, command);
+ assertNotNull("Message is not generated", message);
+ assertTrue("Wrong message type", message instanceof TextMessage);
+ TextMessage textMessage = (TextMessage)message;
+ String text = textMessage.getText();
+ assertNotNull("Message payload is not generated", text);
+ assertEquals("Message payload size is incorrect", size, text.length());
+ }
+
+ public void testCreateMessageProviderAndSendMessage() throws Exception
+ {
+ final CreateMessageProviderCommand messageProviderCommand = new CreateMessageProviderCommand();
+ messageProviderCommand.setProviderName("test1");
+ Map<String, PropertyValue> messageProperties = new HashMap<String, PropertyValue>();
+ messageProperties.put("test", new SimplePropertyValue("testValue"));
+ messageProperties.put("priority", new SimplePropertyValue(new Integer(9)));
+ messageProviderCommand.setMessageProperties(messageProperties);
+ _delegate.createMessageProvider(messageProviderCommand);
+
+ final CreateProducerCommand producerCommand = new CreateProducerCommand();
+ producerCommand.setNumberOfMessages(1);
+ producerCommand.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producerCommand.setPriority(6);
+ producerCommand.setParticipantName("test");
+ producerCommand.setMessageSize(10);
+ producerCommand.setSessionName("testSession");
+ producerCommand.setDestinationName(getTestQueueName());
+ producerCommand.setMessageProviderName(messageProviderCommand.getProviderName());
+
+ Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _delegate.addConnection("name-does-not-matter", _connection);
+ _delegate.addSession(producerCommand.getSessionName(), session);
+ _delegate.createProducer(producerCommand);
+
+ Message message = _delegate.sendNextMessage(producerCommand);
+ session.commit();
+ assertMessage(message);
+
+ _connection.start();
+ Message receivedMessage = _consumer.receive(1000l);
+ assertMessage(receivedMessage);
+ }
+
+ protected void assertMessage(Message message) throws JMSException
+ {
+ assertNotNull("Message should not be null", message);
+ assertEquals("Unexpected test property", "testValue", message.getStringProperty("test"));
+ assertEquals("Unexpected priority property", 9, message.getJMSPriority());
+ assertTrue("Unexpected message type", message instanceof TextMessage);
+ String text = ((TextMessage)message).getText();
+ assertNotNull("Message text should not be null", text);
+ assertNotNull("Unexpected message size ", text.length());
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ProducerParticipantTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ProducerParticipantTest.java
new file mode 100644
index 0000000000..54bb9efa98
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/clientonly/ProducerParticipantTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest.clientonly;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.ParticipantExecutor;
+import org.apache.qpid.disttest.client.ProducerParticipant;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.CreateProducerCommand;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+
+public class ProducerParticipantTest extends DistributedTestSystemTestBase
+{
+ private MessageConsumer _consumer;
+ private TestClientJmsDelegate _delegate;
+ private Client _client;
+ private ControllerQueue _controllerQueue;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _controllerQueue = new ControllerQueue(_connection, _context);
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _consumer = session.createConsumer(getTestQueue());
+
+ _delegate = new TestClientJmsDelegate(getContext());
+ _client = new Client(_delegate);
+ }
+
+
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ _controllerQueue.close();
+ super.tearDown();
+ }
+
+
+
+ public void testProduceNumberOfMessages() throws Exception
+ {
+ runTest(Session.AUTO_ACKNOWLEDGE, 100, 10, 0, 0);
+ }
+
+ protected void runTest(int acknowledgeMode, int messageSize, int numberOfMessages, int batchSize, long publishInterval) throws Exception
+ {
+ final CreateProducerCommand command = new CreateProducerCommand();
+ command.setNumberOfMessages(numberOfMessages);
+ command.setDeliveryMode(DeliveryMode.PERSISTENT);
+ command.setParticipantName("test");
+ command.setMessageSize(messageSize);
+ command.setBatchSize(batchSize);
+ command.setInterval(publishInterval);
+ command.setSessionName("testSession");
+ command.setDestinationName(getTestQueueName());
+
+ Session session = _connection.createSession(Session.SESSION_TRANSACTED == acknowledgeMode, acknowledgeMode);
+
+ _delegate.addConnection("name-does-not-matter", _connection);
+ _delegate.addSession(command.getSessionName(), session);
+ _delegate.createProducer(command);
+
+ final ProducerParticipant producer = new ProducerParticipant(_delegate, command);
+
+ new ParticipantExecutor(producer).start(_client);
+
+ _connection.start();
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ final Message m = _consumer.receive(1000l);
+ assertNotNull("Expected message [" + i + "] is not received", m);
+ assertTrue("Unexpected message", m instanceof TextMessage);
+ }
+ Message m = _consumer.receive(500l);
+ assertNull("Unexpected message", m);
+
+ ParticipantResult results = _controllerQueue.getNext();
+
+ assertNotNull("no results", results);
+ assertFalse(results.getStartInMillis() == 0);
+ assertFalse(results.getEndInMillis() == 0);
+ }
+
+ static class TestClientJmsDelegate extends ClientJmsDelegate
+ {
+
+ public TestClientJmsDelegate(Context context)
+ {
+ super(context);
+ }
+
+ @Override
+ public void addSession(final String sessionName, final Session newSession)
+ {
+ super.addSession(sessionName, newSession);
+ }
+
+ @Override
+ public void addConnection(final String connectionName, final Connection newConnection)
+ {
+ super.addConnection(connectionName, newConnection);
+ }
+ }
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
new file mode 100644
index 0000000000..75d0941c57
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest.controllerandclient;
+
+import static org.apache.qpid.systest.disttest.SystemTestConstants.COMMAND_RESPONSE_TIMEOUT;
+import static org.apache.qpid.systest.disttest.SystemTestConstants.REGISTRATION_TIMEOUT;
+import static org.apache.qpid.systest.disttest.SystemTestConstants.TEST_RESULT_TIMEOUT;
+
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.apache.qpid.systest.disttest.ConfigFileTestHelper;
+import org.apache.qpid.disttest.client.Client;
+import org.apache.qpid.disttest.client.ClientState;
+import org.apache.qpid.disttest.controller.Controller;
+import org.apache.qpid.disttest.controller.ResultsForAllTests;
+import org.apache.qpid.disttest.controller.TestResult;
+import org.apache.qpid.disttest.controller.config.Config;
+import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
+import org.apache.qpid.disttest.message.ParticipantResult;
+import org.apache.qpid.disttest.message.ProducerParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.ITestResult;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ControllerAndClientTest extends DistributedTestSystemTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ControllerAndClientTest.class);
+ private static final long CLIENT_BACKGROUND_THREAD_WAIT_TIME = 5000;
+
+ private Controller _controller;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _controller = new Controller(new ControllerJmsDelegate(_context), REGISTRATION_TIMEOUT, COMMAND_RESPONSE_TIMEOUT);
+ _controller.setTestResultTimeout(TEST_RESULT_TIMEOUT);
+ }
+
+ public void testProducerAndConsumerInSeparateClients() throws Exception
+ {
+ List<TestResult> resultList = runTestsForTwoClients("producerAndConsumerInSeparateClients.json", 1);
+
+ TestResult testResult1 = resultList.get(0);
+ assertEquals("Unexpected test name", "Test 1", testResult1.getName());
+ List<ParticipantResult> test1ParticipantResults = testResult1.getParticipantResults();
+ assertEquals("Unexpected number of participant results for test 1", 2, test1ParticipantResults.size());
+ assertParticipantNames(test1ParticipantResults, "participantConsumer1", "participantProducer1");
+ ConsumerParticipantResult result = null;
+ for (ParticipantResult participantResult : test1ParticipantResults)
+ {
+ if (participantResult instanceof ConsumerParticipantResult)
+ {
+ result = (ConsumerParticipantResult)participantResult;
+ break;
+ }
+ }
+ assertNotNull("Consumer results not recived", result);
+ Collection<Long> latencies = result.getMessageLatencies();
+ assertNotNull("Latency results are not collected", latencies);
+ assertEquals("Unexpected latency results", 1, latencies.size());
+ }
+
+ public void testProducerClient() throws Exception
+ {
+ Session session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("producerClient");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // queue is not declared in configuration
+ // controller is not able to clean it
+ // cleaning manually
+ while(consumer.receive(1000l) != null);
+
+ final Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), "produceClient.json");
+ _controller.setConfig(config);
+ final Client client1 = new Client(new ClientJmsDelegate(_context));
+ final Thread client1Thread = createBackgroundClientThread(client1);
+ _controller.awaitClientRegistrations();
+
+ ResultsForAllTests results = _controller.runAllTests();
+ _controller.stopAllRegisteredClients();
+
+ assertClientThreadsShutdown(client1Thread);
+ assertClientsStopped(ClientState.STOPPED, client1);
+ assertFalse("Test should have no errors", results.hasErrors());
+ List<ITestResult> allTestResults = results.getTestResults();
+ assertEquals("Unexpected number of test results", 1, allTestResults.size());
+ ITestResult testResult1 = allTestResults.get(0);
+ assertEquals("Unexpected test name", "Test 1", testResult1.getName());
+ List<ParticipantResult> test1ParticipantResults = testResult1.getParticipantResults();
+ assertEquals("Unexpected number of participant results for test 1", 1, test1ParticipantResults.size());
+ assertParticipantNames(test1ParticipantResults, "participantProducer1");
+
+ // check message properties
+ for (int i=0; i< 10; i++)
+ {
+ Message message = consumer.receive(1000l);
+ assertNotNull("Message " + i + " is not received", message);
+ assertEquals("Unexpected priority", i, message.getJMSPriority());
+ assertEquals("Unexpected id", i, message.getIntProperty("id"));
+ assertEquals("Unexpected test", "test-value", message.getStringProperty("test"));
+ }
+ }
+
+ public void testProducerAndThreeConsumersInSeparateClients() throws Exception
+ {
+ List<TestResult> resultList = runTestsForTwoClients("producerAndThreeConsumersInSeparateClients.json", 1);
+
+ TestResult testResult1 = resultList.get(0);
+ List<ParticipantResult> test1ParticipantResults = testResult1.getParticipantResults();
+ assertEquals("Unexpected number of participant results for test", 4, test1ParticipantResults.size());
+
+ assertParticipantNames(test1ParticipantResults, "participantConsumer1", "participantConsumer2", "participantConsumer3", "participantProducer1");
+
+ ConsumerParticipantResult consumer1 = (ConsumerParticipantResult) test1ParticipantResults.get(0);
+ assertEquals(3, consumer1.getNumberOfMessagesProcessed());
+ assertEquals(true, consumer1.isSynchronousConsumer());
+
+ ProducerParticipantResult producer1 = (ProducerParticipantResult) test1ParticipantResults.get(3);
+ assertEquals(9, producer1.getNumberOfMessagesProcessed());
+ assertEquals(2, producer1.getBatchSize());
+ assertEquals(50, producer1.getInterval());
+ }
+
+ public void testIteratingFeature() throws Exception
+ {
+ List<TestResult> resultList = runTestsForTwoClients("iteratingFeature.json", 2);
+
+ assertTestResultMessageSize(resultList.get(0), 0, 100, 10);
+ assertTestResultMessageSize(resultList.get(1), 1, 200, 5);
+
+ }
+
+ private void assertTestResultMessageSize(TestResult testResult, int iterationNumber, int expectedMessageSize, int expectedNumberOfMessages)
+ {
+ List<ParticipantResult> test1ParticipantResults = testResult.getParticipantResults();
+ assertEquals("Unexpected number of participant results for test", 2, test1ParticipantResults.size());
+
+ ParticipantResult producer1 = test1ParticipantResults.get(1);
+
+ assertEquals(expectedMessageSize, producer1.getPayloadSize());
+ assertEquals(iterationNumber, producer1.getIterationNumber());
+ }
+
+ public void testTwoTests() throws Exception
+ {
+ List<TestResult> resultList = runTestsForTwoClients("testWithTwoTests.json", 2);
+
+ assertEquals("Test 1", resultList.get(0).getName());
+ assertEquals("Test 2", resultList.get(1).getName());
+ }
+
+ private List<TestResult> runTestsForTwoClients(String jsonConfigFile, int expectedNumberOfTests) throws NamingException, InterruptedException
+ {
+ final Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), jsonConfigFile);
+ _controller.setConfig(config);
+
+ final Client client1 = new Client(new ClientJmsDelegate(_context));
+ final Client client2 = new Client(new ClientJmsDelegate(_context));
+
+ final Thread client1Thread = createBackgroundClientThread(client1);
+ final Thread client2Thread = createBackgroundClientThread(client2);
+
+ _controller.awaitClientRegistrations();
+
+ ResultsForAllTests results = _controller.runAllTests();
+ _controller.stopAllRegisteredClients();
+
+ assertClientThreadsShutdown(client1Thread, client2Thread);
+ assertClientsStopped(ClientState.STOPPED, client1, client2);
+
+ assertFalse("Test should have no errors", results.hasErrors());
+
+ List<TestResult> allTestResults = (List)results.getTestResults();
+ assertEquals("Unexpected number of test results", expectedNumberOfTests, allTestResults.size());
+
+ return allTestResults;
+ }
+
+
+ private void assertParticipantNames(List<ParticipantResult> participants, String... expectedOrderedParticipantNames)
+ {
+ assertEquals("Size of list of expected participant names is different from actual", expectedOrderedParticipantNames.length, participants.size());
+
+ for (int i = 0; i < expectedOrderedParticipantNames.length; i++)
+ {
+ String expectedParticipantName = expectedOrderedParticipantNames[i];
+ ParticipantResult participant = participants.get(i);
+ assertEquals(expectedParticipantName, participant.getParticipantName());
+ }
+ }
+
+ private void assertClientsStopped(ClientState expectedState, final Client... clients)
+ {
+ for (Client client : clients)
+ {
+ assertEquals(client.getClientName() + " in unexpected state", expectedState, client.getState());
+ }
+ }
+
+ private void assertClientThreadsShutdown(final Thread... clientThreads)
+ throws InterruptedException
+ {
+ for (Thread clientThread : clientThreads)
+ {
+ clientThread.join(2000);
+ assertFalse(clientThread.getName() + " should have shutdown", clientThread.isAlive());
+ }
+ }
+
+ private Thread createBackgroundClientThread(final Client client) throws NamingException
+ {
+ final String clientThreadName = client.getClientName() + "-thread";
+ final Thread clientThread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ client.start();
+ client.waitUntilStopped(CLIENT_BACKGROUND_THREAD_WAIT_TIME);
+ }
+ finally
+ {
+ LOGGER.debug("Client thread {} finished", clientThreadName);
+ }
+ }
+ }, clientThreadName);
+ clientThread.start();
+ return clientThread;
+ }
+
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java
new file mode 100644
index 0000000000..349ddb276e
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/controlleronly/DistributedControllerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest.disttest.controlleronly;
+
+import static org.apache.qpid.systest.disttest.SystemTestConstants.COMMAND_RESPONSE_TIMEOUT;
+import static org.apache.qpid.systest.disttest.SystemTestConstants.REGISTRATION_TIMEOUT;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.systest.disttest.ConfigFileTestHelper;
+import org.apache.qpid.disttest.controller.Controller;
+import org.apache.qpid.disttest.controller.config.Config;
+import org.apache.qpid.disttest.jms.ControllerJmsDelegate;
+import org.apache.qpid.disttest.jms.JmsMessageAdaptor;
+import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
+import org.apache.qpid.disttest.message.RegisterClientCommand;
+import org.apache.qpid.disttest.message.Response;
+import org.apache.qpid.systest.disttest.DistributedTestSystemTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedControllerTest extends DistributedTestSystemTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(DistributedControllerTest.class);
+
+ private static final String CLIENT1 = "client1";
+ private Controller _controller = null;
+ private Session _session = null;
+ private Connection _connection = null;
+ private Destination _controllerQueue = null;
+ private TemporaryQueue _clientQueue = null;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _controllerQueue = (Destination) _context.lookup("controllerqueue");
+
+ final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup("connectionfactory");
+ _connection = connectionFactory.createConnection();
+ _connection.start();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _clientQueue = _session.createTemporaryQueue();
+
+ _controller = new Controller(new ControllerJmsDelegate(_context), REGISTRATION_TIMEOUT, COMMAND_RESPONSE_TIMEOUT);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ if (_connection != null)
+ {
+ _connection.close();
+ }
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testControllerSendsOneCommandToSingleClient() throws Exception
+ {
+ Config config = ConfigFileTestHelper.getConfigFromResource(getClass(), "distributedControllerTest.json");
+ _controller.setConfig(config);
+
+ sendRegistration(CLIENT1);
+ _controller.awaitClientRegistrations();
+
+ final ArrayBlockingQueue<Command> commandList = new ArrayBlockingQueue<Command>(4);
+ final MessageConsumer clientConsumer = _session.createConsumer(_clientQueue);
+ final AtomicReference<Exception> listenerException = new AtomicReference<Exception>();
+ final MessageProducer producer = _session.createProducer(_controllerQueue);
+ clientConsumer.setMessageListener(new MessageListener()
+ {
+ @Override
+ public void onMessage(Message message)
+ {
+ try
+ {
+ Command command = JmsMessageAdaptor.messageToCommand(message);
+ LOGGER.debug("Test client received " + command);
+ commandList.add(command);
+ producer.send(JmsMessageAdaptor.commandToMessage(_session, new Response(CLIENT1, command.getType())));
+ }
+ catch(Exception e)
+ {
+ listenerException.set(e);
+ }
+ }
+ });
+
+ _controller.runAllTests();
+ assertCommandType(CommandType.CREATE_CONNECTION, commandList);
+ assertCommandType(CommandType.START_TEST, commandList);
+ assertCommandType(CommandType.TEAR_DOWN_TEST, commandList);
+
+ _controller.stopAllRegisteredClients();
+ assertCommandType(CommandType.STOP_CLIENT, commandList);
+ assertNull("Unexpected exception occured", listenerException.get());
+ Command command = commandList.poll(1l, TimeUnit.SECONDS);
+ assertNull("Unexpected command is received", command);
+ }
+
+ private void assertCommandType(CommandType expectedType, BlockingQueue<Command> commandList) throws InterruptedException
+ {
+ Command command = commandList.poll(1l, TimeUnit.SECONDS);
+ assertNotNull("Command of type " + expectedType + " is not received", command);
+ assertEquals("Unexpected command type", expectedType, command.getType());
+ }
+
+ private void sendRegistration(final String clientId) throws JMSException
+ {
+ final MessageProducer registrationProducer = _session.createProducer(_controllerQueue);
+
+ final Command command = new RegisterClientCommand(clientId, _clientQueue.getQueueName());
+ final Message registrationMessage = JmsMessageAdaptor.commandToMessage(_session, command);
+ registrationProducer.send(registrationMessage);
+ LOGGER.debug("sent registrationMessage: " + registrationMessage);
+ }
+
+}
diff --git a/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
new file mode 100644
index 0000000000..215536126e
--- /dev/null
+++ b/qpid/java/qpid-perftests-systests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systest.disttest.endtoend;
+
+import static org.apache.qpid.disttest.AbstractRunner.JNDI_CONFIG_PROP;
+import static org.apache.qpid.disttest.ControllerRunner.OUTPUT_DIR_PROP;
+import static org.apache.qpid.disttest.ControllerRunner.RUN_ID;
+import static org.apache.qpid.disttest.ControllerRunner.TEST_CONFIG_PROP;
+import static org.apache.qpid.disttest.ControllerRunner.WRITE_TO_DB;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.qpid.disttest.ControllerRunner;
+import org.apache.qpid.disttest.message.ParticipantAttribute;
+import org.apache.qpid.disttest.results.aggregation.TestResultAggregator;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+
+public class EndToEndTest extends QpidBrokerTestCase
+{
+ private ControllerRunner _runner;
+ private static final String TEST_CONFIG = "qpid-perftests-systests/src/test/resources/org/apache/qpid/systest/disttest/endtoend/endtoend.json";
+ private static final String JNDI_CONFIG_FILE = "qpid-perftests-systests/src/test/resources/org/apache/qpid/systest/disttest/perftests.systests.properties";
+ private static final String RUN1 = "run1";
+
+ public void testRunner() throws Exception
+ {
+ File csvOutputDir = createTemporaryCsvDirectory();
+ assertTrue("CSV output dir must not exist",csvOutputDir.isDirectory());
+
+ final String[] args = new String[] {TEST_CONFIG_PROP + "=" + TEST_CONFIG,
+ JNDI_CONFIG_PROP + "=" + JNDI_CONFIG_FILE,
+ WRITE_TO_DB + "=true",
+ RUN_ID + "=" + RUN1,
+ OUTPUT_DIR_PROP + "=" + csvOutputDir.getAbsolutePath()};
+ _runner = new ControllerRunner();
+ _runner.parseArgumentsIntoConfig(args);
+ _runner.runController();
+
+ File expectedCsvOutputFile = new File(csvOutputDir, "endtoend.csv");
+ assertTrue("CSV output file must exist", expectedCsvOutputFile.exists());
+ final String csvContents = FileUtils.readFileAsString(expectedCsvOutputFile);
+ final String[] csvLines = csvContents.split("\n");
+
+ int numberOfHeaders = 1;
+ int numberOfParticipants = 2;
+ int numberOfSummaries = 3;
+
+ int numberOfExpectedRows = numberOfHeaders + numberOfParticipants + numberOfSummaries;
+ assertEquals("Unexpected number of lines in CSV", numberOfExpectedRows, csvLines.length);
+
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "producingClient", "participantProducer1", csvLines[1], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "consumingClient", "participantConsumer1", csvLines[3], 1);
+
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PARTICIPANTS_NAME, csvLines[4], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME, csvLines[2], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PRODUCER_PARTICIPANTS_NAME, csvLines[5], 1);
+
+ }
+
+ private void assertDataRowsHaveCorrectTestAndClientName(String testName, String clientName, String participantName, String csvLine, int expectedNumberOfMessagesProcessed)
+ {
+ final int DONT_STRIP_EMPTY_LAST_FIELD_FLAG = -1;
+ String[] cells = csvLine.split(",", DONT_STRIP_EMPTY_LAST_FIELD_FLAG);
+ // All attributes become cells in the CSV, so this will be true
+ assertEquals("Unexpected number of cells in CSV line " + csvLine, ParticipantAttribute.values().length, cells.length);
+ assertEquals("Unexpected test name in CSV line " + csvLine, testName, cells[ParticipantAttribute.TEST_NAME.ordinal()]);
+ assertEquals("Unexpected client name in CSV line " + csvLine, clientName, cells[ParticipantAttribute.CONFIGURED_CLIENT_NAME.ordinal()]);
+ assertEquals("Unexpected participant name in CSV line " + csvLine, participantName, cells[ParticipantAttribute.PARTICIPANT_NAME.ordinal()]);
+ assertEquals("Unexpected number of messages processed in CSV line " + csvLine, String.valueOf(expectedNumberOfMessagesProcessed), cells[ParticipantAttribute.NUMBER_OF_MESSAGES_PROCESSED.ordinal()]);
+
+ }
+
+ private File createTemporaryCsvDirectory() throws IOException
+ {
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ File csvDir = new File(tmpDir, "csv");
+ csvDir.mkdir();
+ csvDir.deleteOnExit();
+ return csvDir;
+ }
+
+}