diff options
Diffstat (limited to 'qpid/java/integrationtests/src/main/java')
12 files changed, 2720 insertions, 0 deletions
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java new file mode 100644 index 0000000000..db17c7aacc --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase1DummyRun.java @@ -0,0 +1,135 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.clienttestcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; + +/** + * Implements tet case 1, dummy run. This test case sends no test messages, it exists to confirm that the test harness + * is interacting with the coordinator correctly. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Perform test case actions. + * <tr><td> Generate test reports. + * </table> + */ +public class TestCase1DummyRun implements TestClientControlledTest +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase1DummyRun.class); + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC1_DummyRun"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage): called"); + + // Test parameters don't matter, accept all invites. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role, Message assignRoleMessage): called"); + + // Do nothing, both roles are the same. + } + + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @param numMessages The number of test messages to send. + */ + public void start(int numMessages) + { + log.debug("public void start(): called"); + + // Do nothing. + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Generate a dummy report, the coordinator expects a report but doesn't care what it is. + return session.createTextMessage("Dummy Run, Ok."); + } + + /** + * Handles incoming test messages. Does nothing. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Ignore any messages. + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java new file mode 100644 index 0000000000..36d3cce7f7 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase2BasicP2P.java @@ -0,0 +1,209 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.clienttestcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.distributedtesting.TestClient; +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; + +import javax.jms.*; + +/** + * Implements test case 2, basic P2P. Sends/received a specified number of messages to a specified route on the + * default direct exchange. Produces reports on the actual number of messages sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Send required number of test messages. + * <tr><td> Generate test reports. + * </table> + */ +public class TestCase2BasicP2P implements TestClientControlledTest, MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase2BasicP2P.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The connection to send the test messages on. */ + private Connection connection; + + /** The controlSession to send the test messages on. */ + private Session session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC2_BasicP2P"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Create a new connection to pass the test messages on. + connection = TestUtils.createConnection(TestClient.testContextProperties); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); + Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); + + log.debug("numMessages = " + numMessages); + log.debug("sendDestination = " + sendDestination); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + producer = session.createProducer(sendDestination); + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages. + case RECEIVER: + MessageConsumer consumer = session.createConsumer(sendDestination); + consumer.setMessageListener(this); + break; + } + + connection.start(); + } + + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @param numMessages The number of test messages to send. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = session.createTextMessage("test"); + + for (int i = 0; i < this.numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Close the test connection. + connection.close(); + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java new file mode 100644 index 0000000000..205472716b --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase3BasicPubSub.java @@ -0,0 +1,239 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.clienttestcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.distributedtesting.TestClient; +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; + +import javax.jms.*; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of + * messages sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Send required number of test messages using pub/sub. + * <tr><td> Generate test reports. + * </table> + */ +public class TestCase3BasicPubSub implements TestClientControlledTest, MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase3BasicPubSub.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC3_BasicPubSub"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. + * + * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); + int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); + String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); + + log.debug("numMessages = " + numMessages); + log.debug("numReceivers = " + numReceivers); + log.debug("sendKey = " + sendKey); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = TestUtils.createConnection(TestClient.testContextProperties); + session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + Destination sendDestination = session[0].createTopic(sendKey); + + producer = session[0].createProducer(sendDestination); + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number + // of receivers connections. + case RECEIVER: + // Create the required number of receivers connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = TestUtils.createConnection(TestClient.testContextProperties); + session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + sendDestination = session[i].createTopic(sendKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + consumer.setMessageListener(this); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (Connection conn : connection) + { + conn.start(); + } + } + + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @param numMessages The number of test messages to send. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = session[0].createTextMessage("test"); + + for (int i = 0; i < this.numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Close the test connections. + for (Connection conn : connection) + { + conn.close(); + } + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase4P2PMessageSize.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase4P2PMessageSize.java new file mode 100644 index 0000000000..3730233264 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase4P2PMessageSize.java @@ -0,0 +1,214 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.clienttestcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.distributedtesting.TestClient; +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; + +import javax.jms.*; + +/** + * Implements test case 4, P2P messages with message size. Sends/received a specified number of messages to a specified + * route on the default direct exchange, of a specified size. Produces reports on the actual number of messages + * sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Send required number of test messages. + * <tr><td> Generate test reports. + * </table> + */ +public class TestCase4P2PMessageSize implements TestClientControlledTest, MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase4P2PMessageSize.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The size of the test messages to send. */ + private int messageSize; + + /** The connection to send the test messages on. */ + private Connection connection; + + /** The controlSession to send the test messages on. */ + private Session session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC4_P2PMessageSize"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Create a new connection to pass the test messages on. + connection = TestUtils.createConnection(TestClient.testContextProperties); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("P2P_NUM_MESSAGES"); + messageSize = assignRoleMessage.getIntProperty("messageSize"); + Destination sendDestination = session.createQueue(assignRoleMessage.getStringProperty("P2P_QUEUE_AND_KEY_NAME")); + + log.debug("numMessages = " + numMessages); + log.debug("sendDestination = " + sendDestination); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a message producer if so. + case SENDER: + producer = session.createProducer(sendDestination); + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages. + case RECEIVER: + MessageConsumer consumer = session.createConsumer(sendDestination); + consumer.setMessageListener(this); + break; + } + + connection.start(); + } + + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @param numMessages The number of test messages to send. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = TestUtils.createTestMessageOfSize(session, messageSize); + + for (int i = 0; i < this.numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Close the test connection. + connection.close(); + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase5PubSubMessageSize.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase5PubSubMessageSize.java new file mode 100644 index 0000000000..f601712bc9 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/clienttestcases/TestCase5PubSubMessageSize.java @@ -0,0 +1,243 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.clienttestcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.TestUtils; +import org.apache.qpid.test.framework.distributedtesting.TestClient; +import org.apache.qpid.test.framework.distributedtesting.TestClientControlledTest; + +import javax.jms.*; + +/** + * Implements test case 5, pub/sub with message size. Sends/received a specified number of messages to a specified route + * on the default topic exchange, using the specified number of receivers connections, and the specified message size. + * Produces reports on the actual number of messages sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Send required number of test messages using pub/sub. + * <tr><td> Generate test reports. + * </table> + */ +public class TestCase5PubSubMessageSize implements TestClientControlledTest, MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(TestCase5PubSubMessageSize.class); + + /** Holds the count of test messages received. */ + private int messageCount; + + /** The role to be played by the test. */ + private Roles role; + + /** The number of test messages to send. */ + private int numMessages; + + /** The size of the test messages to send. */ + private int messageSize; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the + * interop testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "TC5_PubSubMessageSize"; + } + + /** + * Determines whether the test invite that matched this test case is acceptable. + * + * @param inviteMessage The invitation to accept or reject. + * + * @return <tt>true</tt> to accept the invitation, <tt>false</tt> to reject it. + * + * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public boolean acceptInvite(Message inviteMessage) throws JMSException + { + log.debug("public boolean acceptInvite(Message inviteMessage = " + inviteMessage + "): called"); + + // All invites are acceptable. + return true; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the + * assignment message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws javax.jms.JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Reset the message count for a new test. + messageCount = 0; + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numMessages = assignRoleMessage.getIntProperty("PUBSUB_NUM_MESSAGES"); + messageSize = assignRoleMessage.getIntProperty("messageSize"); + int numReceivers = assignRoleMessage.getIntProperty("PUBSUB_NUM_RECEIVERS"); + String sendKey = assignRoleMessage.getStringProperty("PUBSUB_KEY"); + + log.debug("numMessages = " + numMessages); + log.debug("numReceivers = " + numReceivers); + log.debug("sendKey = " + sendKey); + log.debug("role = " + role); + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = TestUtils.createConnection(TestClient.testContextProperties); + session[0] = connection[0].createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Extract and retain the test parameters. + Destination sendDestination = session[0].createTopic(sendKey); + + producer = session[0].createProducer(sendDestination); + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number + // of receivers connections. + case RECEIVER: + // Create the required number of receivers connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = TestUtils.createConnection(TestClient.testContextProperties); + session[i] = connection[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + sendDestination = session[i].createTopic(sendKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + consumer.setMessageListener(this); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (Connection conn : connection) + { + conn.start(); + } + } + + /** + * Performs the test case actions. Returning from here, indicates that the sending role has completed its test. + * + * @param numMessages The number of test messages to send. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + if (role.equals(Roles.SENDER)) + { + Message testMessage = TestUtils.createTestMessageOfSize(session[0], messageSize); + + for (int i = 0; i < this.numMessages; i++) + { + producer.send(testMessage); + + // Increment the message count. + messageCount++; + } + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws javax.jms.JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Close the test connections. + for (Connection conn : connection) + { + conn.close(); + } + + // Generate a report message containing the count of the number of messages passed. + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + report.setIntProperty("MESSAGE_COUNT", messageCount); + + return report; + } + + /** + * Counts incoming test messages. + * + * @param message The incoming test message. + */ + public void onMessage(Message message) + { + log.debug("public void onMessage(Message message = " + message + "): called"); + + // Increment the message count. + messageCount++; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java new file mode 100644 index 0000000000..a2e4a00aa6 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase1DummyRun.java @@ -0,0 +1,84 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import java.util.Properties; + +/** + * Coordinates test case 1, from the interop test specification. This test connects up the sender and receivers roles, + * and gets some dummy test reports from them, in order to check that the test framework itself is operational. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Exercises the interop testing framework without actually sending any test messages. + * <td> {@link FrameworkBaseCase} + * </table> + */ +public class InteropTestCase1DummyRun extends FrameworkBaseCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase1DummyRun.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase1DummyRun(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testDummyRun() throws Exception + { + log.debug("public void testDummyRun(): called"); + + Properties testConfig = new Properties(); + testConfig.put("TEST_NAME", "TC1_DummyRun"); + + /*Message[] reports =*/ getCircuitFactory().sequenceTest(null, null, testConfig); + + // Compare sender and receivers reports. + // Assert.assertEquals("Expected to get 2 dummy reports.", 2, reports.length); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC1_DummyRun"; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java new file mode 100644 index 0000000000..6d6515f1fd --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase2BasicP2P.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import java.util.Properties; + +/** + * Implements test case 2, from the interop test specification. This test sets up the TC2_BasicP2P test for 50 + * messages. It checks that the sender and receivers reports both indicate that all the test messages were transmitted + * successfully. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link FrameworkBaseCase} + * </table> + */ +public class InteropTestCase2BasicP2P extends FrameworkBaseCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase2BasicP2P.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase2BasicP2P(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicP2P() throws Exception + { + log.debug("public void testBasicP2P(): called"); + + Properties testConfig = new Properties(); + testConfig.setProperty("TEST_NAME", "TC2_BasicP2P"); + testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); + testConfig.put("P2P_NUM_MESSAGES", 50); + + /*Message[] reports =*/ getCircuitFactory().sequenceTest(null, null, testConfig); + + // Compare sender and receivers reports. + /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); + Assert.assertEquals("Sender and receivers messages sent did not match up.", messagesSent, messagesReceived);*/ + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC2_BasicP2P"; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java new file mode 100644 index 0000000000..2faca91e73 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase3BasicPubSub.java @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import java.util.Properties; + +/** + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link FrameworkBaseCase} + * </table> + */ +public class InteropTestCase3BasicPubSub extends FrameworkBaseCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase3BasicPubSub.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase3BasicPubSub(String name) + { + super(name); + } + + /** + * Performs the basic P2P test case, "Test Case 2" in the specification. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testBasicPubSub(): called"); + + Properties testConfig = new Properties(); + testConfig.put("TEST_NAME", "TC3_BasicPubSub"); + testConfig.put("PUBSUB_KEY", "tc3route"); + testConfig.put("PUBSUB_NUM_MESSAGES", 10); + testConfig.put("PUBSUB_NUM_RECEIVERS", 5); + + /*Message[] reports =*/ getCircuitFactory().sequenceTest(null, null, testConfig); + + // Compare sender and receivers reports. + /*int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 10, messagesSent); + Assert.assertEquals("Received messages did not match up to num sent * num receivers.", messagesSent * 5, + messagesReceived);*/ + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC3_BasicPubSub"; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase4P2PMessageSize.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase4P2PMessageSize.java new file mode 100644 index 0000000000..2d64f2b805 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase4P2PMessageSize.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import java.util.Properties; + +/** + * Implements test case 4, from the interop test specification. This test sets up the TC2_P2PMessageSize test for 50 + * messages, and a variety of message sizes. It checks that the sender and receivers reports both indicate that all + * the test messages were transmitted successfully. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Setup p2p test parameters and compare with test output. <td> {@link FrameworkBaseCase} + * </table> + */ +public class InteropTestCase4P2PMessageSize extends FrameworkBaseCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase4P2PMessageSize.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase4P2PMessageSize(String name) + { + super(name); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 0K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize0K() throws Exception + { + runTestForMessagesOfSize(0); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 63K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize63K() throws Exception + { + runTestForMessagesOfSize(63 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 64K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize64K() throws Exception + { + runTestForMessagesOfSize(64 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 65K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize65K() throws Exception + { + runTestForMessagesOfSize(65 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 127K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize127K() throws Exception + { + runTestForMessagesOfSize(127 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 128K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize128K() throws Exception + { + runTestForMessagesOfSize(128 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 129K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize129K() throws Exception + { + runTestForMessagesOfSize(129 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 255K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize255K() throws Exception + { + runTestForMessagesOfSize(255 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 256K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize256K() throws Exception + { + runTestForMessagesOfSize(256 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 257K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testP2PMessageSize257K() throws Exception + { + runTestForMessagesOfSize(257 * 1024); + } + + /** + * Sends 50 test messages of the specified size, and asserts that all were received. + * + * @param size The size of the messages to send in bytes. + */ + private void runTestForMessagesOfSize(int size) + { + Properties testConfig = new Properties(); + testConfig.setProperty("TEST_NAME", "TC4_P2PMessageSize"); + testConfig.setProperty("P2P_QUEUE_AND_KEY_NAME", "tc2queue"); + testConfig.put("P2P_NUM_MESSAGES", 50); + testConfig.put("messageSize", size); + + /*Message[] reports =*/ + getCircuitFactory().sequenceTest(null, null, testConfig); + + // Compare sender and receivers reports. + /* + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); + Assert.assertEquals("Sender and receivers messages sent did not match up.", messagesSent, messagesReceived); + */ + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC4_P2PMessageSize"; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase5PubSubMessageSize.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase5PubSubMessageSize.java new file mode 100644 index 0000000000..23d33fc115 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testcases/InteropTestCase5PubSubMessageSize.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.interop.testcases; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import java.util.Properties; + +/** + * Implements test case 5, from the interop test specification. This test sets up the TC2_PubSubMessageSize test for 10 + * messages, sent to 5 consumers, and a variety of message sizes. It checks that the sender and receivers reports both + * indicate that all the test messages were transmitted successfully. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Setup pub/sub test parameters and compare with test output. <td> {@link FrameworkBaseCase} + * </table> + */ +public class InteropTestCase5PubSubMessageSize extends FrameworkBaseCase +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(InteropTestCase5PubSubMessageSize.class); + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public InteropTestCase5PubSubMessageSize(String name) + { + super(name); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 0K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize0K() throws Exception + { + runTestForMessagesOfSize(0); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 63K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize63K() throws Exception + { + runTestForMessagesOfSize(63 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 64K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize64K() throws Exception + { + runTestForMessagesOfSize(64 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 65K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize65K() throws Exception + { + runTestForMessagesOfSize(65 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 127K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize127K() throws Exception + { + runTestForMessagesOfSize(127 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 128K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize128K() throws Exception + { + runTestForMessagesOfSize(128 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 129K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize129K() throws Exception + { + runTestForMessagesOfSize(129 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 255K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize255K() throws Exception + { + runTestForMessagesOfSize(255 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 256K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize256K() throws Exception + { + runTestForMessagesOfSize(256 * 1024); + } + + /** + * Performs the P2P message test case, "Test Case 4" in the specification, for messages 257K in size. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testPubSubMessageSize257K() throws Exception + { + runTestForMessagesOfSize(257 * 1024); + } + + /** + * Sends 50 test messages of the specified size, and asserts that all were received. + * + * @param size The size of the messages to send in bytes. + */ + private void runTestForMessagesOfSize(int size) + { + Properties testConfig = new Properties(); + testConfig.put("TEST_NAME", "TC5_PubSubMessageSize"); + testConfig.put("PUBSUB_KEY", "tc3route"); + testConfig.put("PUBSUB_NUM_MESSAGES", 10); + testConfig.put("PUBSUB_NUM_RECEIVERS", 5); + testConfig.put("messageSize", size); + + /*Message[] reports =*/ + getCircuitFactory().sequenceTest(null, null, testConfig); + + // Compare sender and receivers reports. + /* + int messagesSent = reports[0].getIntProperty("MESSAGE_COUNT"); + int messagesReceived = reports[1].getIntProperty("MESSAGE_COUNT"); + + Assert.assertEquals("The requested number of messages were not sent.", 50, messagesSent); + Assert.assertEquals("Sender and receivers messages sent did not match up.", messagesSent, messagesReceived); + */ + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as defined in the + * interop testing specification. For example the method "testP2P" might map onto the interop test case name + * "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "TC5_PubSubMessageSize"; + } +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java new file mode 100644 index 0000000000..63e2c75509 --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java @@ -0,0 +1,906 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQNoConsumersException; +import org.apache.qpid.client.AMQNoRouteException; +import org.apache.qpid.test.framework.distributedtesting.TestClient; +import org.apache.qpid.interop.clienttestcases.TestCase3BasicPubSub; +import org.apache.qpid.test.framework.TestUtils; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the + * default topic exchange, using the specified number of receivers connections. Produces reports on the actual number of + * messages sent/received. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Supply the name of the test case that this implements. + * <tr><td> Accept/Reject invites based on test parameters. + * <tr><td> Adapt to assigned roles. + * <tr><td> Send required number of test messages using pub/sub. <tr><td> Generate test reports. + * </table> + */ +public class SustainedClientTestCase extends TestCase3BasicPubSub implements ExceptionListener, MessageListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(SustainedClientTestCase.class); + + /** Used to log to the console. */ + private static final Logger console = Logger.getLogger("SustainedTest"); + + /** The role to be played by the test. */ + private Roles role; + + /** The number of receivers connection to use. */ + private int numReceivers; + + /** The routing key to send them to on the default direct exchange. */ + private Destination sendDestination; + + /** The routing key to send updates to on the default direct exchange. */ + private Destination sendUpdateDestination; + + /** The connections to send/receive the test messages on. */ + private Connection[] connection; + + /** The sessions to send/receive the test messages on. */ + private Session[] session; + + /** The producer to send the test messages with. */ + MessageProducer producer; + + /** Adapter that adjusts the send rate based on the updates from clients. */ + SustainedRateAdapter _rateAdapter; + + /** */ + int _batchSize; + + private static final long TEN_MILLI_SEC = 10000000; + private static final int DEBUG_LOG_UPATE_INTERVAL = 10; + private static final int LOG_UPATE_INTERVAL = 10; + private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage"); + + /** + * Should provide the name of the test case that this class implements. The exact names are defined in the interop + * testing spec. + * + * @return The name of the test case that this implements. + */ + public String getName() + { + log.debug("public String getName(): called"); + + return "Perf_SustainedPubSub"; + } + + /** + * Assigns the role to be played by this test case. The test parameters are fully specified in the assignment + * message. When this method return the test case will be ready to execute. + * + * @param role The role to be played; sender or receivers. + * @param assignRoleMessage The role assingment message, contains the full test parameters. + * + * @throws JMSException Any JMSException resulting from reading the message are allowed to fall through. + */ + public void assignRole(Roles role, Message assignRoleMessage) throws JMSException + { + log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage + + "): called"); + + // Take note of the role to be played. + this.role = role; + + // Extract and retain the test parameters. + numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS"); + _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL"); + String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY"); + String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY"); + int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE"); + String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME"); + + if (log.isDebugEnabled()) + { + log.debug("numReceivers = " + numReceivers); + log.debug("_batchSize = " + _batchSize); + log.debug("ackMode = " + ackMode); + log.debug("sendKey = " + sendKey); + log.debug("sendUpdateKey = " + sendUpdateKey); + log.debug("role = " + role); + } + + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + console.info("Creating Sender"); + // Create a new connection to pass the test messages on. + connection = new Connection[1]; + session = new Session[1]; + + connection[0] = TestUtils.createConnection(TestClient.testContextProperties); + session[0] = connection[0].createSession(false, ackMode); + + // Extract and retain the test parameters. + sendDestination = session[0].createTopic(sendKey); + + connection[0].setExceptionListener(this); + + producer = session[0].createProducer(sendDestination); + + sendUpdateDestination = session[0].createTopic(sendUpdateKey); + MessageConsumer updateConsumer = session[0].createConsumer(sendUpdateDestination); + + _rateAdapter = new SustainedRateAdapter(this); + updateConsumer.setMessageListener(_rateAdapter); + + break; + + // Otherwise the receivers role is being assigned, so set this up to listen for messages on the required number + // of receivers connections. + case RECEIVER: + console.info("Creating Receiver"); + // Create the required number of receivers connections. + connection = new Connection[numReceivers]; + session = new Session[numReceivers]; + + for (int i = 0; i < numReceivers; i++) + { + connection[i] = TestUtils.createConnection(TestClient.testContextProperties); + session[i] = connection[i].createSession(false, ackMode); + + sendDestination = session[i].createTopic(sendKey); + + sendUpdateDestination = session[i].createTopic(sendUpdateKey); + + MessageConsumer consumer = session[i].createConsumer(sendDestination); + + consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], + sendUpdateDestination)); + } + + break; + } + + // Start all the connection dispatcher threads running. + for (int i = 0; i < connection.length; i++) + { + connection[i].start(); + } + } + + /** Performs the test case actions. + * @param numMessages*/ + public void start(int numMessages) throws JMSException + { + log.debug("public void start(): called"); + + // Check that the sender role is being performed. + switch (role) + { + // Check if the sender role is being assigned, and set up a single message producer if so. + case SENDER: + _rateAdapter.run(); + break; + case RECEIVER: + + } + + // return from here when you have finished the test.. this will signal the controller and + } + + public void terminate() throws JMSException, InterruptedException + { + if (_rateAdapter != null) + { + _rateAdapter.stop(); + } + } + + /** + * Gets a report on the actions performed by the test case in its assigned role. + * + * @param session The controlSession to create the report message in. + * + * @return The report message. + * + * @throws JMSException Any JMSExceptions resulting from creating the report are allowed to fall through. + */ + public Message getReport(Session session) throws JMSException + { + log.debug("public Message getReport(Session controlSession): called"); + + // Close the test connections. + for (int i = 0; i < connection.length; i++) + { + connection[i].close(); + } + + Message report = session.createMessage(); + report.setStringProperty("CONTROL_TYPE", "REPORT"); + + return report; + } + + public void onException(JMSException jmsException) + { + Exception linked = jmsException.getLinkedException(); + + if (linked != null) + { + if (log.isDebugEnabled()) + { + log.debug("Linked Exception:" + linked); + } + + if ((linked instanceof AMQNoRouteException) || (linked instanceof AMQNoConsumersException)) + { + if (log.isDebugEnabled()) + { + if (linked instanceof AMQNoConsumersException) + { + log.warn("No clients currently available for message:" + + ((AMQNoConsumersException) linked).getUndeliveredMessage()); + } + else + { + log.warn("No route for message"); + } + } + + // Tell the rate adapter that there are no clients ready yet + _rateAdapter.NO_CLIENTS = true; + } + } + else + { + log.warn("Exception:" + linked); + } + } + + /** + * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and + * 'end' messages. + */ + class SustainedListener implements MessageListener + { + /** Number of messages received */ + private long _received = 0; + /** The number of messages in the batch */ + private int _batchSize = 0; + /** Record of the when the 'start' messagse was sen */ + private Long _startTime; + /** Message producer to use to send reports */ + MessageProducer _updater; + /** Session to create the report message on */ + Session _session; + /** Record of the client ID used for this SustainedListnener */ + String _client; + + /** + * Main Constructor + * + * @param clientname The _client id used to identify this connection. + * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to + * control the interval between sending reports. + * @param session The controlSession used for communication. + * @param sendDestination The destination that update reports should be sent to. + * + * @throws JMSException My occur if creatingthe Producer fails + */ + public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) + throws JMSException + { + _batchSize = batchSize; + _client = clientname; + _session = session; + _updater = session.createProducer(sendDestination); + } + + public void onMessage(Message message) + { + if (log.isDebugEnabled()) + { + log.debug("Message " + _received + "received in listener"); + } + + if (message instanceof TextMessage) + { + try + { + _received++; + if (((TextMessage) message).getText().equals("start")) + { + log.debug("Starting Batch"); + _startTime = System.nanoTime(); + } + else if (((TextMessage) message).getText().equals("end")) + { + if (_startTime != null) + { + long currentTime = System.nanoTime(); + sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH")); + log.debug("End Batch"); + } + } + } + catch (JMSException e) + { + // ignore error + } + } + + } + + /** + * sendStatus creates and sends the report back to the publisher + * + * @param time taken for the the last batch + * @param received Total number of messages received. + * @param batchNumber the batch number + * @throws JMSException if an error occurs during the send + */ + private void sendStatus(long time, long received, int batchNumber) throws JMSException + { + Message updateMessage = _session.createTextMessage("update"); + updateMessage.setStringProperty("CLIENT_ID", ":" + _client); + updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE"); + updateMessage.setLongProperty("RECEIVED", received); + updateMessage.setIntProperty("BATCH", batchNumber); + updateMessage.setLongProperty("DURATION", time); + + if (log.isInfoEnabled()) + { + log.info("**** SENDING [" + batchNumber + "]**** " + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); + } + + // Output on the main console.info the details of this batch + if ((batchNumber % 10) == 0) + { + console.info("Sending Report [" + batchNumber + "] " + "CLIENT_ID:" + _client + " RECEIVED:" + received + + " BATCH:" + batchNumber + " DURATION:" + time); + } + + _updater.send(updateMessage); + } + } + + /** + * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second + * that are sent through the test system. + * + * By keeping a record of the messages recevied and the average time taken to process the batch size can be + * calculated and so the delay can be adjusted to maintain that rate. + * + * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no + * messages in the batch. Otherwise the delay is used at the end of the batch. + */ + class SustainedRateAdapter implements MessageListener, Runnable + { + private SustainedClientTestCase _client; + private long _batchVariance = Integer.getInteger("batchVariance", 3); // no. batches to allow drifting + private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms) + private volatile long _delay; // in nanos + private long _sent; + private Map<String, Long> _slowClients = new HashMap<String, Long>(); + private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms + private static final long NO_CLIENT_SLEEP = 1000; // 1s + private volatile boolean NO_CLIENTS = true; + private int _delayShifting; + private final int REPORTS_WITHOUT_CHANGE = Integer.getInteger("stableReportCount", 5); + private boolean _warmedup = false; + private static final long EXPECTED_TIME_PER_BATCH = 100000L; + private int _warmUpBatches = Integer.getInteger("warmUpBatches", 10); + + SustainedRateAdapter(SustainedClientTestCase client) + { + _client = client; + } + + public void onMessage(Message message) + { + if (log.isDebugEnabled()) + { + log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called"); + } + + try + { + String controlType = message.getStringProperty("CONTROL_TYPE"); + + // Check if the message is a test invite. + if ("UPDATE".equals(controlType)) + { + NO_CLIENTS = false; + long duration = message.getLongProperty("DURATION"); + long totalReceived = message.getLongProperty("RECEIVED"); + String client = message.getStringProperty("CLIENT_ID"); + int batchNumber = message.getIntProperty("BATCH"); + + if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + log.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " Recevied BATCH:" + + batchNumber + " DURATION:" + duration); + } + + recordSlow(client, totalReceived, batchNumber); + + adjustDelay(client, batchNumber, duration); + + // Warm up completes when: + // we haven't warmed up + // and the number of batches sent to each client is at least half of the required warmup batches + if (!_warmedup && (batchNumber >= _warmUpBatches)) + { + _warmedup = true; + _warmup.countDown(); + + } + } + } + catch (JMSException e) + { + // + } + } + + CountDownLatch _warmup = new CountDownLatch(1); + + int _numBatches = 10000; + + // long[] _timings = new long[_numBatches]; + private boolean _running = true; + + public void run() + { + console.info("Warming up"); + + doBatch(_warmUpBatches); + + try + { + // wait for warmup to complete. + _warmup.await(); + + // set delay to the average length of the batches + _delay = _totalDuration / _warmUpBatches / delays.size(); + + console.info("Warmup complete delay set : " + _delay + " based on _totalDuration: " + _totalDuration + + " over no. batches: " + _warmUpBatches + " with client count: " + delays.size()); + + _totalDuration = 0L; + _totalReceived = 0L; + _sent = 0L; + } + catch (InterruptedException e) + { + // + } + + doBatch(_numBatches); + + } + + private void doBatch(int batchSize) // long[] timings, + { + TextMessage testMessage = null; + try + { + testMessage = _client.session[0].createTextMessage("start"); + + for (int batch = 0; batch <= batchSize; batch++) + // while (_running) + { + long start = System.nanoTime(); + + testMessage.setText("start"); + testMessage.setIntProperty("BATCH", batch); + + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + testMessage.setText("test"); + // start at 2 so start and end count as part of batch + for (int m = 2; m < _batchSize; m++) + { + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + } + + testMessage.setText("end"); + _client.producer.send(testMessage); + _rateAdapter.sentMessage(); + + long end = System.nanoTime(); + + long sendtime = end - start; + + if (log.isDebugEnabled()) + { + log.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime); // timings[batch]); + } + + if ((batch % LOG_UPATE_INTERVAL) == 0) + { + console.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status()); + } + + _rateAdapter.sleepBatch(); + + } + } + catch (JMSException e) + { + console.error("Runner ended"); + } + } + + private String status() + { + return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers" + " Delay is " + _delay + + " resulting in " + + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch")); + } + + private void sleepBatch() + { + if (checkForSlowClients()) + { // if there werwe slow clients we have already slept so don't sleep anymore again. + return; + } + + if (!SLEEP_PER_MESSAGE) + { + // per batch sleep.. if sleep is to small to spread over the batch. + if (_delay <= (TEN_MILLI_SEC * _batchSize)) + { + sleepLong(_delay); + } + else + { + log.info("Not sleeping _delay > ten*batch is:" + _delay); + } + } + } + + public void stop() + { + _running = false; + } + + Map<String, Long> delays = new HashMap<String, Long>(); + Long _totalReceived = 0L; + Long _totalDuration = 0L; + int _skipUpdate = 0; + + /** + * Adjust the delay for sending messages based on this update from the client + * + * @param client The client that send this update + * @param duration The time taken for the last batch of messagse + * @param batchNumber The reported batchnumber from the client + */ + private void adjustDelay(String client, int batchNumber, long duration) + { + // Retrieve the current total time taken for this client. + Long currentTime = delays.get(client); + + // Add the new duration time to this client + if (currentTime == null) + { + currentTime = duration; + } + else + { + currentTime += duration; + } + + delays.put(client, currentTime); + + long batchesSent = _sent / _batchSize; + + // ensure we don't divide by zero + if (batchesSent == 0) + { + batchesSent = 1L; + } + + _totalReceived += _batchSize; + _totalDuration += duration; + + // calculate average duration accross clients per batch + long averageDuration = _totalDuration / delays.size() / batchesSent; + + // calculate the difference between current send delay and average report delay + long diff = (duration) - averageDuration; + + if (log.isInfoEnabled() && ((batchNumber % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + log.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers." + " on batch: " + + batchesSent + " received batch: " + batchNumber + " Batch Duration: " + duration + " Average: " + + averageDuration + " so diff: " + diff + " for : " + client + " Delay is " + _delay + " resulting in " + + ((_delay > (TEN_MILLI_SEC * _batchSize)) ? ((_delay / _batchSize) + "/msg") : (_delay + "/batch"))); + } + + // if the averageDuration differs from the current by more than the specified variane then adjust delay. + if (Math.abs(diff) > _timeVariance) + { + + // if the the _delay is larger than the required duration to send report + // speed up + if (diff > TEN_MILLI_SEC) + { + _delay -= TEN_MILLI_SEC; + + if (_delay < 0) + { + _delay = 0; + log.info("Reset _delay to 0"); + delayStable(); + } + else + { + delayChanged(); + } + + } + else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance + { + // the report took longer + _delay += TEN_MILLI_SEC; + delayChanged(); + } + } + else + { + delayStable(); + } + + // If we have a consumer that is behind with the batches. + if ((batchesSent - batchNumber) > _batchVariance) + { + log.debug("Increasing _delay as sending more than receiving"); + + _delay += 2 * TEN_MILLI_SEC; + delayChanged(); + } + + } + + /** Reset the number of iterations before we say the delay has stabilised. */ + private void delayChanged() + { + _delayShifting = REPORTS_WITHOUT_CHANGE; + } + + /** + * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will + * output Delay stabilised + */ + private void delayStable() + { + _delayShifting--; + + if (_delayShifting < 0) + { + _delayShifting = 0; + console.debug("Delay stabilised:" + _delay); + } + } + + /** + * Checks that the client has received enough messages. If the client has fallen behind then they are put in the + * _slowClients lists which will increase the delay. + * + * @param client The client identifier to check + * @param received the number of messages received by that client + * @param batchNumber + */ + private void recordSlow(String client, long received, int batchNumber) + { + if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance) + { + _slowClients.put(client, received); + } + else + { + _slowClients.remove(client); + } + } + + /** Incrment the number of sent messages and then sleep, if required. */ + public void sentMessage() + { + + _sent++; + + if (_delay > (TEN_MILLI_SEC * _batchSize)) + { + long batchDelay = _delay / _batchSize; + // less than 10ms sleep doesn't always work. + // _delay is in nano seconds + // if (batchDelay < (TEN_MILLI_SEC)) + // { + // sleep(0, (int) batchDelay); + // } + // else + { + // if (batchDelay < 30000000000L) + { + sleepLong(batchDelay); + } + } + } + else + { + if (SLEEP_PER_MESSAGE && (_delay > 0)) + { + sleepLong(_delay / _batchSize); + } + } + } + + /** + * Check at the end of each batch and pause sending messages to allow slow clients to catch up. + * + * @return true if there were slow clients that caught up. + */ + private boolean checkForSlowClients() + { + // This will allways be true as we are running this at the end of each batchSize + // if (_sent % _batchSize == 0) + { + // Cause test to pause when we have slow + if (!_slowClients.isEmpty() || NO_CLIENTS) + { + + while (!_slowClients.isEmpty()) + { + if (log.isInfoEnabled() && ((_sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL) == 0)) + { + String clients = ""; + Iterator it = _slowClients.keySet().iterator(); + while (it.hasNext()) + { + clients += it.next(); + if (it.hasNext()) + { + clients += ", "; + } + } + + log.info("Pausing for slow clients:" + clients); + } + + if (console.isDebugEnabled() && ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0)) + { + console.debug(_slowClients.size() + " slow clients."); + } + + sleep(PAUSE_SLEEP); + } + + if (NO_CLIENTS) + { + sleep(NO_CLIENT_SLEEP); + } + + log.debug("Continuing"); + + return true; + } + else + { + if ((_sent / _batchSize % LOG_UPATE_INTERVAL) == 0) + { + console.info("Total Delay :" + _delay + " " + + ((_delayShifting == 0) ? "Stablised" : ("Not Stablised(" + _delayShifting + ")"))); + } + } + + } + + return false; + } + + /** + * Sleep normally takes micro-seconds this allows the use of a nano-second value. + * + * @param delay nanoseconds to sleep for. + */ + private void sleepLong(long delay) + { + sleep(delay / 1000000, (int) (delay % 1000000)); + } + + /** + * Sleep for the specified micro-seconds. + * @param sleep microseconds to sleep for. + */ + private void sleep(long sleep) + { + sleep(sleep, 0); + } + + /** + * Perform the sleep , swallowing any InteruptException. + * + * NOTE: If a sleep request is > 10s then reset only sleep for 5s + * + * @param milli to sleep for + * @param nano sub miliseconds to sleep for + */ + private void sleep(long milli, int nano) + { + try + { + log.debug("Sleep:" + milli + ":" + nano); + if (milli > 10000) + { + + if (_delay == milli) + { + _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH; + log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + + "s. Reset _totalDuration:" + _totalDuration); + } + else + { + log.error("Sleeping for more than 10 seconds adjusted to 5s!:" + (milli / 1000) + "s"); + } + + milli = 5000; + } + + Thread.sleep(milli, nano); + } + catch (InterruptedException e) + { + // + } + } + + public void setClient(SustainedClientTestCase client) + { + _client = client; + } + } + +} diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java new file mode 100644 index 0000000000..0077b4727a --- /dev/null +++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCase.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.sustained; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.test.framework.DropInTest; +import org.apache.qpid.test.framework.FrameworkBaseCase; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Properties; + +/** + * SustainedTestCase is a {@link FrameworkBaseCase} that runs the "Perf_SustainedPubSub" test case. This consists of one + * test client sending, and several receiving, and attempts to find the highest rate at which messages can be broadcast + * to the receivers. It is also a {@link DropInTest} to which more test clients may be added during a test run. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> + * </table> + */ +public class SustainedTestCase extends FrameworkBaseCase implements DropInTest +{ + /** Used for debugging. */ + Logger log = Logger.getLogger(SustainedTestCase.class); + + /** Holds the root name of the topic on which to send the test messages. */ + private static final String SUSTAINED_KEY = "Perf_SustainedPubSub"; + + /** + * Creates a new coordinating test case with the specified name. + * + * @param name The test case name. + */ + public SustainedTestCase(String name) + { + super(name); + } + + /** + * Performs a single test run of the sustained test. + * + * @throws Exception Any exceptions are allowed to fall through and fail the test. + */ + public void testBasicPubSub() throws Exception + { + log.debug("public void testSinglePubSubCycle(): called"); + + Properties testConfig = new Properties(); + testConfig.put("TEST_NAME", "Perf_SustainedPubSub"); + testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY); + testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2)); + testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000)); + testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE"); + testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE)); + + log.info("Created Config: " + testConfig.entrySet().toArray()); + + getCircuitFactory().sequenceTest(null, null, testConfig); + } + + /** + * Accepts a late joining client into this test case. The client will be enlisted with a control message + * with the 'CONTROL_TYPE' field set to the value 'LATEJOIN'. It should also provide values for the fields: + * + * <p/><table> + * <tr><td> CLIENT_NAME <td> A unique name for the new client. + * <tr><td> CLIENT_PRIVATE_CONTROL_KEY <td> The key for the route on which the client receives its control messages. + * </table> + * + * @param message The late joiners join message. + * + * @throws JMSException Any JMS Exception are allowed to fall through, indicating that the join failed. + */ + public void lateJoin(Message message) throws JMSException + { + throw new RuntimeException("Not implemented."); + /* + // Extract the joining clients details from its join request message. + TestClientDetails clientDetails = new TestClientDetails(); + clientDetails.clientName = message.getStringProperty("CLIENT_NAME"); + clientDetails.privateControlKey = message.getStringProperty("CLIENT_PRIVATE_CONTROL_KEY"); + + // Register the joining client, but do block for confirmation as cannot do a synchronous receivers during this + // method call, as it may have been called from an 'onMessage' method. + assignReceiverRole(clientDetails, new Properties(), false); + */ + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + return "Perf_SustainedPubSub"; + } +} |