summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-11-18 15:36:48 +0000
committerAlex Rudyy <orudyy@apache.org>2014-11-18 15:36:48 +0000
commita33a578d175767cf0129dcdb5c78d017dc93ee63 (patch)
tree3d2855497819b36d64a1f866c0f566a7eff8a6af
parentba593511ba9c0711b6b19e1b24112a7a207e74fc (diff)
downloadqpid-python-a33a578d175767cf0129dcdb5c78d017dc93ee63.tar.gz
QPID-6231: [Perftests framework] Make performance framework
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1640371 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java7
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java1
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java36
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java49
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java134
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java1
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java2
7 files changed, 216 insertions, 14 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
index 0d5457c992..75b1e9050a 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java
@@ -31,6 +31,7 @@ import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.Visitor;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
import org.apache.qpid.disttest.message.Command;
+import org.apache.qpid.disttest.message.CommandType;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.Response;
import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public class Client
public void stop()
{
+ _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), CommandType.STOP_CLIENT, null));
_state.set(ClientState.STOPPED);
_latch.countDown();
}
@@ -119,7 +121,10 @@ public class Client
}
finally
{
- _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage));
+ if (_state.get() != ClientState.STOPPED)
+ {
+ _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage));
+ }
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
index 791897323e..6b81c12c22 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java
@@ -45,6 +45,7 @@ public class ClientCommandVisitor extends Visitor
public void visit(final StopClientCommand command)
{
+
_client.stop();
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
index 4c223fab30..bb9d0327a8 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java
@@ -19,12 +19,14 @@
*/
package org.apache.qpid.disttest.jms;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.ConnectionMetaData;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -63,6 +65,7 @@ public class ClientJmsDelegate
private final Context _context;
private final Destination _controllerQueue;
private final Connection _controllerConnection;
+ private final Session _instructionListenerSession;
private final Session _controllerSession;
private final MessageProducer _controlQueueProducer;
@@ -87,6 +90,7 @@ public class ClientJmsDelegate
_controllerConnection = connectionFactory.createConnection();
_controllerConnection.start();
_controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME);
+ _instructionListenerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_controllerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_controlQueueProducer = _controllerSession.createProducer(_controllerQueue);
_clientName = UUID.randomUUID().toString();
@@ -112,8 +116,8 @@ public class ClientJmsDelegate
{
try
{
- _instructionQueue = _controllerSession.createTemporaryQueue();
- final MessageConsumer instructionConsumer = _controllerSession.createConsumer(_instructionQueue);
+ _instructionQueue = _instructionListenerSession.createTemporaryQueue();
+ final MessageConsumer instructionConsumer = _instructionListenerSession.createConsumer(_instructionQueue);
instructionConsumer.setMessageListener(new MessageListener()
{
@Override
@@ -170,6 +174,10 @@ public class ClientJmsDelegate
.getConnectionFactoryName());
final Connection newConnection = connectionFactory.createConnection();
addConnection(command.getConnectionName(), newConnection);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Connection " + command.getConnectionName() + " is created " + metaDataToString(newConnection.getMetaData()));
+ }
}
catch (final NamingException ne)
{
@@ -183,6 +191,26 @@ public class ClientJmsDelegate
}
}
+ private String metaDataToString(ConnectionMetaData metaData) throws JMSException
+ {
+ StringBuilder sb = new StringBuilder("ConnectionMetaData[");
+ sb.append(" JMSProviderName : " + metaData.getJMSProviderName());
+ sb.append(" JMSVersion : " + metaData.getJMSVersion() + " (" + metaData.getJMSMajorVersion() + "." + metaData.getJMSMinorVersion() +")");
+ sb.append(" ProviderVersion : " + metaData.getProviderVersion()+ " (" + metaData.getProviderMajorVersion()+ "." + metaData.getProviderMinorVersion() +")" );
+ sb.append(" JMSXPropertyNames : [");
+ Enumeration en = metaData.getJMSXPropertyNames();
+ while(en.hasMoreElements())
+ {
+ sb.append(" ").append(en.nextElement());
+ if( en.hasMoreElements())
+ {
+ sb.append(",");
+ }
+ }
+ sb.append("]]");
+ return sb.toString();
+ }
+
public void createSession(final CreateSessionCommand command)
{
try
@@ -312,6 +340,10 @@ public class ClientJmsDelegate
// finish.
_controllerConnection.stop();
+ if (_instructionListenerSession != null)
+ {
+ _instructionListenerSession.close();
+ }
if (_controllerSession != null)
{
_controllerSession.close();
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
index 330407e375..2dfe1050df 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
@@ -54,7 +54,8 @@ public class ControllerJmsDelegate
private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>();
private final Connection _connection;
private final Destination _controllerQueue;
- private final Session _session;
+ private final Session _controllerQueueListenerSession;
+ private final Session _commandSession;
private QueueCreator _queueCreator;
private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>();
@@ -65,7 +66,8 @@ public class ControllerJmsDelegate
_connection = connectionFactory.createConnection();
_connection.start();
_controllerQueue = (Destination) context.lookup("controllerqueue");
- _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _controllerQueueListenerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _commandSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
createVendorSpecificQueueCreator();
}
@@ -105,7 +107,7 @@ public class ControllerJmsDelegate
{
try
{
- final MessageConsumer consumer = _session.createConsumer(_controllerQueue);
+ final MessageConsumer consumer = _controllerQueueListenerSession.createConsumer(_controllerQueue);
consumer.setMessageListener(new MessageListener()
{
@Override
@@ -138,13 +140,25 @@ public class ControllerJmsDelegate
/** ensures connections are closed, otherwise the JVM may be prevented from terminating */
public void closeConnections()
{
+ if (_commandSession != null)
+ {
+ try
+ {
+ _commandSession.close();
+ }
+ catch (JMSException e)
+ {
+ LOGGER.error("Unable to close command session", e);
+ }
+ }
+
try
{
- _session.close();
+ _controllerQueueListenerSession.close();
}
catch (JMSException e)
{
- LOGGER.error("Unable to close session", e);
+ LOGGER.error("Unable to close controller queue listener session", e);
}
try
@@ -182,10 +196,11 @@ public class ControllerJmsDelegate
+ _clientNameToQueueMap.keySet());
}
+ MessageProducer producer = null;
try
{
- final MessageProducer producer = _session.createProducer(clientQueue);
- final Message message = JmsMessageAdaptor.commandToMessage(_session, command);
+ producer =_commandSession.createProducer(clientQueue);
+ Message message = JmsMessageAdaptor.commandToMessage(_commandSession, command);
producer.send(message);
}
@@ -193,6 +208,20 @@ public class ControllerJmsDelegate
{
throw new DistributedTestException(e);
}
+ finally
+ {
+ if (producer != null)
+ {
+ try
+ {
+ producer.close();
+ }
+ catch (final JMSException e)
+ {
+ throw new DistributedTestException(e);
+ }
+ }
+ }
}
private void processCommandWithFirstSupportingListener(Command command)
@@ -214,7 +243,7 @@ public class ControllerJmsDelegate
Destination clientIntructionQueue;
try
{
- clientIntructionQueue = _session.createQueue(clientQueueName);
+ clientIntructionQueue = _commandSession.createQueue(clientQueueName);
}
catch (JMSException e)
{
@@ -225,12 +254,12 @@ public class ControllerJmsDelegate
public void createQueues(List<QueueConfig> queues)
{
- _queueCreator.createQueues(_connection, _session, queues);
+ _queueCreator.createQueues(_connection, _commandSession, queues);
}
public void deleteQueues(List<QueueConfig> queues)
{
- _queueCreator.deleteQueues(_connection, _session, queues);
+ _queueCreator.deleteQueues(_connection, _commandSession, queues);
}
public void addCommandListener(CommandListener commandListener)
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
new file mode 100644
index 0000000000..7f8b3caa4f
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.disttest.jms;
+
+import org.apache.qpid.disttest.DistributedTestException;
+import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import java.util.List;
+
+public class ExistingQueueDrainer implements QueueCreator
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueDrainer.class);
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
+
+ @Override
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
+ {
+ }
+
+ @Override
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
+ {
+ for (QueueConfig queueConfig : configs)
+ {
+ drainQueue(connection, queueConfig.getName());
+ }
+ }
+
+ private void drainQueue(Connection connection, String queueName)
+ {
+ try
+ {
+ int counter = 0;
+ while (queueContainsMessages(connection, queueName))
+ {
+ if (counter == 0)
+ {
+ LOGGER.debug("Draining queue {}", queueName);
+ }
+ counter += drain(connection, queueName);
+ }
+ if (counter > 0)
+ {
+ LOGGER.info("Drained {} message(s) from queue {} ", counter, queueName);
+ }
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to drain queue " + queueName, e);
+ }
+ }
+
+ private int drain(Connection connection, String queueName) throws JMSException
+ {
+ int counter = 0;
+ Session session = null;
+ try
+ {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName));
+ try
+ {
+ while (messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+ }
+ finally
+ {
+ messageConsumer.close();
+ }
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ return counter;
+ }
+
+ private boolean queueContainsMessages(Connection connection, String queueName) throws JMSException
+ {
+ Session session = null;
+ try
+ {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueBrowser browser = null;
+ try
+ {
+ browser = session.createBrowser(session.createQueue(queueName));
+ return browser.getEnumeration().hasMoreElements();
+ }
+ finally
+ {
+ if (browser != null)
+ {
+ browser.close();
+ }
+ }
+ }
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index ef2cfb6cd4..95a4772198 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -38,7 +38,6 @@ public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
- private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
index a37cd7888c..16769699c1 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
@@ -27,6 +27,8 @@ import org.apache.qpid.disttest.controller.config.QueueConfig;
public interface QueueCreator
{
+ String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
+
void createQueues(Connection connection, Session session, List<QueueConfig> configs);
void deleteQueues(Connection connection, Session session, List<QueueConfig> configs);
}