diff options
author | Alex Rudyy <orudyy@apache.org> | 2014-11-18 15:36:48 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2014-11-18 15:36:48 +0000 |
commit | a33a578d175767cf0129dcdb5c78d017dc93ee63 (patch) | |
tree | 3d2855497819b36d64a1f866c0f566a7eff8a6af | |
parent | ba593511ba9c0711b6b19e1b24112a7a207e74fc (diff) | |
download | qpid-python-a33a578d175767cf0129dcdb5c78d017dc93ee63.tar.gz |
QPID-6231: [Perftests framework] Make performance framework
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1640371 13f79535-47bb-0310-9956-ffa450edef68
7 files changed, 216 insertions, 14 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java index 0d5457c992..75b1e9050a 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/Client.java @@ -31,6 +31,7 @@ import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.Visitor; import org.apache.qpid.disttest.jms.ClientJmsDelegate; import org.apache.qpid.disttest.message.Command; +import org.apache.qpid.disttest.message.CommandType; import org.apache.qpid.disttest.message.ParticipantResult; import org.apache.qpid.disttest.message.Response; import org.slf4j.Logger; @@ -66,6 +67,7 @@ public class Client public void stop() { + _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), CommandType.STOP_CLIENT, null)); _state.set(ClientState.STOPPED); _latch.countDown(); } @@ -119,7 +121,10 @@ public class Client } finally { - _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage)); + if (_state.get() != ClientState.STOPPED) + { + _clientJmsDelegate.sendResponseMessage(new Response(_clientJmsDelegate.getClientName(), command.getType(), responseMessage)); + } } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java index 791897323e..6b81c12c22 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ClientCommandVisitor.java @@ -45,6 +45,7 @@ public class ClientCommandVisitor extends Visitor public void visit(final StopClientCommand command) { + _client.stop(); } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java index 4c223fab30..bb9d0327a8 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ClientJmsDelegate.java @@ -19,12 +19,14 @@ */ package org.apache.qpid.disttest.jms; +import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.UUID; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.ConnectionMetaData; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; @@ -63,6 +65,7 @@ public class ClientJmsDelegate private final Context _context; private final Destination _controllerQueue; private final Connection _controllerConnection; + private final Session _instructionListenerSession; private final Session _controllerSession; private final MessageProducer _controlQueueProducer; @@ -87,6 +90,7 @@ public class ClientJmsDelegate _controllerConnection = connectionFactory.createConnection(); _controllerConnection.start(); _controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME); + _instructionListenerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); _controllerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); _controlQueueProducer = _controllerSession.createProducer(_controllerQueue); _clientName = UUID.randomUUID().toString(); @@ -112,8 +116,8 @@ public class ClientJmsDelegate { try { - _instructionQueue = _controllerSession.createTemporaryQueue(); - final MessageConsumer instructionConsumer = _controllerSession.createConsumer(_instructionQueue); + _instructionQueue = _instructionListenerSession.createTemporaryQueue(); + final MessageConsumer instructionConsumer = _instructionListenerSession.createConsumer(_instructionQueue); instructionConsumer.setMessageListener(new MessageListener() { @Override @@ -170,6 +174,10 @@ public class ClientJmsDelegate .getConnectionFactoryName()); final Connection newConnection = connectionFactory.createConnection(); addConnection(command.getConnectionName(), newConnection); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Connection " + command.getConnectionName() + " is created " + metaDataToString(newConnection.getMetaData())); + } } catch (final NamingException ne) { @@ -183,6 +191,26 @@ public class ClientJmsDelegate } } + private String metaDataToString(ConnectionMetaData metaData) throws JMSException + { + StringBuilder sb = new StringBuilder("ConnectionMetaData["); + sb.append(" JMSProviderName : " + metaData.getJMSProviderName()); + sb.append(" JMSVersion : " + metaData.getJMSVersion() + " (" + metaData.getJMSMajorVersion() + "." + metaData.getJMSMinorVersion() +")"); + sb.append(" ProviderVersion : " + metaData.getProviderVersion()+ " (" + metaData.getProviderMajorVersion()+ "." + metaData.getProviderMinorVersion() +")" ); + sb.append(" JMSXPropertyNames : ["); + Enumeration en = metaData.getJMSXPropertyNames(); + while(en.hasMoreElements()) + { + sb.append(" ").append(en.nextElement()); + if( en.hasMoreElements()) + { + sb.append(","); + } + } + sb.append("]]"); + return sb.toString(); + } + public void createSession(final CreateSessionCommand command) { try @@ -312,6 +340,10 @@ public class ClientJmsDelegate // finish. _controllerConnection.stop(); + if (_instructionListenerSession != null) + { + _instructionListenerSession.close(); + } if (_controllerSession != null) { _controllerSession.close(); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java index 330407e375..2dfe1050df 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java @@ -54,7 +54,8 @@ public class ControllerJmsDelegate private final Map<String, Destination> _clientNameToQueueMap = new ConcurrentHashMap<String, Destination>(); private final Connection _connection; private final Destination _controllerQueue; - private final Session _session; + private final Session _controllerQueueListenerSession; + private final Session _commandSession; private QueueCreator _queueCreator; private List<CommandListener> _commandListeners = new CopyOnWriteArrayList<CommandListener>(); @@ -65,7 +66,8 @@ public class ControllerJmsDelegate _connection = connectionFactory.createConnection(); _connection.start(); _controllerQueue = (Destination) context.lookup("controllerqueue"); - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _controllerQueueListenerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _commandSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); createVendorSpecificQueueCreator(); } @@ -105,7 +107,7 @@ public class ControllerJmsDelegate { try { - final MessageConsumer consumer = _session.createConsumer(_controllerQueue); + final MessageConsumer consumer = _controllerQueueListenerSession.createConsumer(_controllerQueue); consumer.setMessageListener(new MessageListener() { @Override @@ -138,13 +140,25 @@ public class ControllerJmsDelegate /** ensures connections are closed, otherwise the JVM may be prevented from terminating */ public void closeConnections() { + if (_commandSession != null) + { + try + { + _commandSession.close(); + } + catch (JMSException e) + { + LOGGER.error("Unable to close command session", e); + } + } + try { - _session.close(); + _controllerQueueListenerSession.close(); } catch (JMSException e) { - LOGGER.error("Unable to close session", e); + LOGGER.error("Unable to close controller queue listener session", e); } try @@ -182,10 +196,11 @@ public class ControllerJmsDelegate + _clientNameToQueueMap.keySet()); } + MessageProducer producer = null; try { - final MessageProducer producer = _session.createProducer(clientQueue); - final Message message = JmsMessageAdaptor.commandToMessage(_session, command); + producer =_commandSession.createProducer(clientQueue); + Message message = JmsMessageAdaptor.commandToMessage(_commandSession, command); producer.send(message); } @@ -193,6 +208,20 @@ public class ControllerJmsDelegate { throw new DistributedTestException(e); } + finally + { + if (producer != null) + { + try + { + producer.close(); + } + catch (final JMSException e) + { + throw new DistributedTestException(e); + } + } + } } private void processCommandWithFirstSupportingListener(Command command) @@ -214,7 +243,7 @@ public class ControllerJmsDelegate Destination clientIntructionQueue; try { - clientIntructionQueue = _session.createQueue(clientQueueName); + clientIntructionQueue = _commandSession.createQueue(clientQueueName); } catch (JMSException e) { @@ -225,12 +254,12 @@ public class ControllerJmsDelegate public void createQueues(List<QueueConfig> queues) { - _queueCreator.createQueues(_connection, _session, queues); + _queueCreator.createQueues(_connection, _commandSession, queues); } public void deleteQueues(List<QueueConfig> queues) { - _queueCreator.deleteQueues(_connection, _session, queues); + _queueCreator.deleteQueues(_connection, _commandSession, queues); } public void addCommandListener(CommandListener commandListener) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java new file mode 100644 index 0000000000..7f8b3caa4f --- /dev/null +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ExistingQueueDrainer.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.disttest.jms; + +import org.apache.qpid.disttest.DistributedTestException; +import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import java.util.List; + +public class ExistingQueueDrainer implements QueueCreator +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueDrainer.class); + private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500); + + @Override + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) + { + } + + @Override + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) + { + for (QueueConfig queueConfig : configs) + { + drainQueue(connection, queueConfig.getName()); + } + } + + private void drainQueue(Connection connection, String queueName) + { + try + { + int counter = 0; + while (queueContainsMessages(connection, queueName)) + { + if (counter == 0) + { + LOGGER.debug("Draining queue {}", queueName); + } + counter += drain(connection, queueName); + } + if (counter > 0) + { + LOGGER.info("Drained {} message(s) from queue {} ", counter, queueName); + } + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to drain queue " + queueName, e); + } + } + + private int drain(Connection connection, String queueName) throws JMSException + { + int counter = 0; + Session session = null; + try + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName)); + try + { + while (messageConsumer.receive(_drainPollTimeout) != null) + { + counter++; + } + } + finally + { + messageConsumer.close(); + } + } + finally + { + if (session != null) + { + session.close(); + } + } + return counter; + } + + private boolean queueContainsMessages(Connection connection, String queueName) throws JMSException + { + Session session = null; + try + { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = null; + try + { + browser = session.createBrowser(session.createQueue(queueName)); + return browser.getEnumeration().hasMoreElements(); + } + finally + { + if (browser != null) + { + browser.close(); + } + } + } + finally + { + if (session != null) + { + session.close(); + } + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index ef2cfb6cd4..95a4772198 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -38,7 +38,6 @@ public class QpidQueueCreator implements QueueCreator { private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); - private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500); @Override diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java index a37cd7888c..16769699c1 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java @@ -27,6 +27,8 @@ import org.apache.qpid.disttest.controller.config.QueueConfig; public interface QueueCreator { + String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; + void createQueues(Connection connection, Session session, List<QueueConfig> configs); void deleteQueues(Connection connection, Session session, List<QueueConfig> configs); } |