summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java344
1 files changed, 344 insertions, 0 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java
new file mode 100644
index 0000000000..e0c0b00335
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/topic/TopicWithSelectorsTransientVolumeTest.java
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.topic;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.naming.NamingException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class TopicWithSelectorsTransientVolumeTest extends QpidBrokerTestCase
+{
+ private static final int NUM_MSG_PER_ITERATION = 50;//must be a multiple of 10
+ private static final int NUM_ITERATIONS = 1000;
+
+ private static final int NUM_CONSUMERS = 50;
+ private static final int MSG_SIZE = 1024;
+ private static final byte[] BYTE_ARRAY = new byte[MSG_SIZE];
+
+ ArrayList<MyMessageSubscriber> _subscribers = new ArrayList<MyMessageSubscriber>();
+ HashMap<String,Long> _queueMsgCounts = new HashMap<String,Long>();
+
+ private final static Object _lock=new Object();
+ private boolean _producerFailed;
+ private static int _finishedCount;
+ private static int _failedCount;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ init();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private void init()
+ {
+ _finishedCount = 0;
+ _failedCount = 0;
+ _producerFailed = false;
+ _subscribers.clear();
+ _queueMsgCounts.clear();
+ }
+
+
+ private Message createMessage(Session session) throws JMSException
+ {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(BYTE_ARRAY);
+
+ return message;
+ }
+
+ /**
+ * 1 Topic with 50 subscribers using a selector, and 1 producer sending 50,000 1K messages with 90% selector success ratio.
+ */
+ public void test50SubscribersWith90PercentMatched() throws Exception
+ {
+ Topic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "test50ConsumersWith10PercentUnmatched");
+
+ System.out.println("Creating consumers");
+
+ MyMessageSubscriber sub;
+
+ for(int i=1; i <= NUM_CONSUMERS; i++)
+ {
+ sub = new MyMessageSubscriber(topic, "consumer" + i, ((9 * NUM_MSG_PER_ITERATION * NUM_ITERATIONS) / 10));
+ _subscribers.add(sub);
+ }
+
+ System.out.println("Starting consumers");
+ for(MyMessageSubscriber s: _subscribers)
+ {
+ Thread consumer = new Thread(s);
+ consumer.start();
+ }
+
+ System.out.println("Creating producer");
+ MyMessageProducer prod = new MyMessageProducer(topic);
+
+ long startTime = System.currentTimeMillis();
+
+ System.out.println("Starting producer");
+ Thread producer = new Thread(prod);
+ producer.start();
+
+
+ // Wait for all the messageConsumers to have finished or failed
+ synchronized (_lock)
+ {
+ while (_finishedCount + _failedCount < NUM_CONSUMERS)
+ {
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+ System.out.println("Elapsed time for messaging: " + (endTime-startTime) + "ms");
+
+ assertFalse("Producer failed to send all messages", _producerFailed);
+
+ //check if all messages received by consumers, or if there were failures
+ if (_finishedCount != NUM_CONSUMERS)
+ {
+ fail(_failedCount + " consumers did not recieve all their expected messages");
+ }
+
+ //check if all queue depths were 0
+ for(String consumer: _queueMsgCounts.keySet())
+ {
+ long depth = _queueMsgCounts.get(consumer);
+ assertEquals(consumer + " subscription queue msg count was not 0", 0, depth);
+ }
+
+ }
+
+ private class MyMessageProducer implements Runnable
+ {
+ private TopicConnection _connection;
+ private TopicSession _session;
+ private TopicPublisher _messagePublisher;
+
+ public MyMessageProducer(Topic topic) throws JMSException, NamingException
+ {
+ _connection = (TopicConnection) getConnection();
+ _session = (TopicSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _messagePublisher = _session.createPublisher(topic);
+ }
+
+ public void run()
+ {
+ try
+ {
+ for(int iter = 0; iter < NUM_ITERATIONS; iter++)
+ {
+ int i = 0;
+
+ //send 90% matching messages
+ for (; i < (9 * NUM_MSG_PER_ITERATION)/10; i++)
+ {
+ Message message = createMessage(_session);
+ message.setStringProperty("testprop", "true");
+
+ _messagePublisher.publish(message, DeliveryMode.NON_PERSISTENT,
+ Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ Thread.yield();
+ }
+
+ //send remaining 10% non-matching messages
+ for (; i < NUM_MSG_PER_ITERATION; i++)
+ {
+ Message message = _session.createMessage();
+ message.setStringProperty("testprop", "false");
+
+ _messagePublisher.publish(message, DeliveryMode.NON_PERSISTENT,
+ Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ Thread.yield();
+ }
+ }
+
+ }
+ catch (Exception exp)
+ {
+ System.out.println("producer: caught an exception, probably exiting before all messages sent");
+ exp.printStackTrace();
+ synchronized (_lock)
+ {
+ _producerFailed=true;
+ _lock.notifyAll();
+ }
+ }
+ }
+ }
+
+
+ private class MyMessageSubscriber implements Runnable
+ {
+ /* The topic this subscriber is subscribing to */
+ private Topic _topic;
+ private String _consumerName;
+ private int _outstandingMsgCount;
+ private TopicConnection _connection;
+ private TopicSession _session;
+ private TopicSubscriber _durSub;
+
+ public MyMessageSubscriber(Topic topic, String consumerName, int messageCount) throws JMSException, NamingException
+ {
+ _outstandingMsgCount = messageCount;
+ _topic=topic;
+ _consumerName = consumerName;
+ _connection = (TopicConnection) getConnection();
+ _session = (TopicSession) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _durSub = _session.createDurableSubscriber(_topic, _consumerName,"testprop='true'", false);
+ _connection.start();
+ }
+
+ public void run()
+ {
+
+ boolean failed = false;
+ do
+ {
+ Message m = null;
+ try
+ {
+ m = _durSub.receive(10000);
+ }
+ catch (JMSException exp)
+ {
+ System.out.println(_consumerName + ": caught an exception handling a received message");
+ exp.printStackTrace();
+
+ failed = true;
+ break;
+ }
+
+ Thread.yield();
+
+ _outstandingMsgCount--;
+
+ if(_outstandingMsgCount % 500 == 0)
+ {
+ System.out.println(_consumerName + ": outstanding message count: " + _outstandingMsgCount);
+ }
+
+ if(m == null)
+ {
+ if(_outstandingMsgCount != 0)
+ {
+ failed = true;
+ }
+ break;
+ }
+ }
+ while(_outstandingMsgCount > 0);
+
+ System.out.println(_consumerName + ": outstanding message count: " + _outstandingMsgCount);
+
+ try
+ {
+ AMQQueue subcriptionQueue = new AMQQueue(ExchangeDefaults.TOPIC_EXCHANGE_NAME,"clientid" + ":" + _consumerName);
+
+ ((AMQSession)_session).sync();
+ Long depth = ((AMQSession)_session).getQueueDepth(subcriptionQueue);
+ _queueMsgCounts.put(_consumerName, depth);
+
+ System.out.println(_consumerName + ": completion queue msg count: " + depth);
+ }
+ catch (AMQException exp)
+ {
+ System.out.println(_consumerName + ": caught an exception determining completion queue depth");
+ exp.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ _session.unsubscribe(_consumerName);
+ }
+ catch (JMSException e)
+ {
+ System.out.println(_consumerName + ": caught an exception whilst unsubscribing");
+ e.printStackTrace();
+ }
+ }
+
+ synchronized (_lock)
+ {
+ if (_outstandingMsgCount == 0 && !failed)
+ {
+ _finishedCount++;
+ System.out.println(_consumerName + ": finished");
+ }
+ else
+ {
+ _failedCount++;
+ System.out.println(_consumerName + ": failed");
+ }
+ _lock.notifyAll();
+ }
+
+ }
+ }
+
+ //helper method to allow easily running against an external standalone broker
+// public static void main(String[] args) throws Exception
+// {
+// System.setProperty("broker.config", "/dev/null");
+// System.setProperty("broker", "external");
+// System.setProperty("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+// System.setProperty("java.naming.provider.url", "test-profiles/test-provider.properties");
+//
+// TopicWithSelectorsTransientVolumeTest test = new TopicWithSelectorsTransientVolumeTest();
+// test.init();
+// test.test50SubscribersWith90PercentMatched();
+// test.tearDown();
+// }
+}