diff options
Diffstat (limited to 'java/integrationtests')
-rw-r--r-- | java/integrationtests/README.txt | 13 | ||||
-rw-r--r-- | java/integrationtests/pom.xml | 127 | ||||
-rw-r--r-- | java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java | 291 | ||||
-rw-r--r-- | java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java | 244 |
4 files changed, 675 insertions, 0 deletions
diff --git a/java/integrationtests/README.txt b/java/integrationtests/README.txt new file mode 100644 index 0000000000..00a21883a9 --- /dev/null +++ b/java/integrationtests/README.txt @@ -0,0 +1,13 @@ +This module contains integration tests, for testing a java client againt *any* broker
+implementation or against other clients. These tests must not rely on starting the
+Java broker in-vm but must depend on a broker being started independantly before running
+the tests in this module. By default tests in this module will expect the broker to be
+started on localhost on the default port, but this can be overridden by passing in a
+sys property to maven. Interop tests are in this module. Java broker specific tests that
+use an in-vm broker should go in the systests module.
+
+Don't set the tests in this module to run by default as part of the maven build, until
+there is a script to start and stop the broker; needed to fully automate these tests.
+Interop tests will always be run using a seperate script (not from maven) but it might
+be worthwile to script into the maven build starting of the Java broker, and running
+these tests against it.
\ No newline at end of file diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml new file mode 100644 index 0000000000..79d587f360 --- /dev/null +++ b/java/integrationtests/pom.xml @@ -0,0 +1,127 @@ +<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-integrationtests</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid Integration Tests</name>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+
+ <!-- These tests depend on the client API only. -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ </dependency>
+
+ <!-- Test dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+
+ <!-- Backports the module to Java 1.4. This is done during the packaging phase as a transformation of the Jar. -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>retrotranslator-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>retro-client</id>
+ <phase>package</phase>
+ <goals>
+ <goal>translate</goal>
+ </goals>
+ <configuration>
+ <!--<destdir>${project.build.directory}/retro-classes</destdir>-->
+ <destjar>${project.build.directory}/${project.build.finalName}-java14.jar</destjar>
+ <verify>${retrotranslator.verify}</verify>
+ <verifyClasspath>
+ <element>${retrotranslator.1.4-rt-path}</element>
+ <element>${retrotranslator.1.4-jce-path}</element>
+ <element>${retrotranslator.1.4-jsse-path}</element>
+ <element>${retrotranslator.1.4-sasl-path}</element>
+ </verifyClasspath>
+ <failonwarning>false</failonwarning>
+ <includes>
+ <include>
+ <directory>${project.build.directory}</directory>
+ <pattern>${project.build.finalName}.jar</pattern>
+ </include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- This identifies the backported java 1.4 jars and attaches them as jar (classified as java14) build artifacts. -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/${project.build.finalName}-java14.jar</file>
+ <type>jar</type>
+ <classifier>java14</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ <resources>
+ </resources>
+
+ </build>
+
+</project>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java new file mode 100644 index 0000000000..dbd07958fd --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java @@ -0,0 +1,291 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop;
+
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Listener implements the listening end of the Qpid interop tests. It is capable of being run as a standalone listener
+ * that responds to the test messages send by the publishing end of the tests implemented by {@link Publisher}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Count messages received on a topic. <td> {@link Publisher}
+ * <tr><td> Send reports on messages received, when requested to. <td> {@link Publisher}
+ * <tr><td> Shutdown, when requested to. <td> {@link Publisher}
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report message, just to check if the other end can decode them
+ * correctly. Not really the right place to test this, so remove them from {@link #sendReport()} once a better
+ * test exists.
+ */
+public class Listener implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the name of (routing key for) the topic to receive test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the name of (routing key for) the queue to send reports to. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to receive test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to send reports to. */
+ private final Queue _response;
+
+ /** Holds the connection to listen on. */
+ private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
+ private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
+ private final javax.jms.Session _session;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
+ private boolean init;
+
+ /** Holds the count of messages received by this listener. */
+ private int count;
+
+ /** Used to hold the start time of the first message. */
+ private long start;
+
+ /**
+ * Creates a topic listener using the specified broker URL.
+ *
+ * @param connectionUrl The broker URL to listen on.
+ *
+ * @throws AMQException If the broker connection cannot be established.
+ * @throws URLSyntaxException If the broker URL syntax is not correct.
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ Listener(String connectionUrl) throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Listener(String connectionUrl = " + connectionUrl + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUrl);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to listen for test and control messages on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for incoming messages on the test topic.
+ _session.createConsumer(_topic).setMessageListener(this);
+
+ // Set up this listener with a producer to send the reports on.
+ _controller = _session.createProducer(_response);
+
+ _connection.start();
+ System.out.println("Waiting for messages...");
+ }
+
+ /**
+ * Starts a test subscriber. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ new Listener(DEFAULT_URI);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
+ * shutdown messages result in this listener being terminated.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ // Take the start time of the first message if this is the first message.
+ if (!init)
+ {
+ start = System.nanoTime() / 1000000;
+ count = 0;
+ init = true;
+ }
+
+ try
+ {
+ // Check if the message is a control message telling this listener to shut down.
+ if (isShutdown(message))
+ {
+ log.debug("Got a shutdown message.");
+ shutdown();
+ }
+ // Check if the message is a report request message asking this listener to respond with the message count.
+ else if (isReport(message))
+ {
+ log.debug("Got a report request message.");
+
+ // Send the message count report.
+ sendReport();
+
+ // Reset the initialization flag so that the next message is considered to be the first.
+ init = false;
+ }
+ // Otherwise it is an ordinary test message, so increment the message count.
+ else
+ {
+ count++;
+ }
+ }
+ catch (JMSException e)
+ {
+ log.warn("There was a JMSException during onMessage.", e);
+ }
+ }
+
+ /**
+ * Checks a message to see if it is a termination request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a termination request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks a message to see if it is a report request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
+ {
+ //log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ // + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+ //log.debug("comp = " + comp);
+
+ boolean result = (comp != null) && comp.equals(value);
+ //log.debug("result = " + result);
+
+ return result;
+ }
+
+ /**
+ * Closes down the connection to the broker.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+
+ /**
+ * Send the report message to the response queue.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void sendReport() throws JMSException
+ {
+ log.debug("private void report(): called");
+
+ // Create the report message.
+ long time = ((System.nanoTime() / 1000000) - start);
+ String msg = "Received " + count + " in " + time + "ms";
+ Message message = _session.createTextMessage(msg);
+
+ // Shove some more field table types in the message just to see if the other end can handle it.
+ message.setBooleanProperty("BOOLEAN", true);
+ //message.setByteProperty("BYTE", (byte) 5);
+ message.setDoubleProperty("DOUBLE", Math.PI);
+ message.setFloatProperty("FLOAT", 1.0f);
+ message.setIntProperty("INT", 1);
+ message.setShortProperty("SHORT", (short) 1);
+ message.setLongProperty("LONG", (long) 1827361278);
+ message.setStringProperty("STRING", "hello");
+
+ // Send the report message.
+ _controller.send(message);
+ log.debug("Sent report: " + msg);
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java new file mode 100644 index 0000000000..cab679876f --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java @@ -0,0 +1,244 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Publisher is the sending end of Qpid interop tests. It is capable of being run as a standalone publisher
+ * that sends test messages to the listening end of the tests implemented by {@link Listener}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report request message, just to check if the other end can decode
+ * them correctly. Not really the right place to test this, so remove them from {@link #doTest()} once a better
+ * test exists.
+ */
+public class Publisher implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Publisher.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the default test timeout for broker communications before tests give up. */
+ public static final int TIMEOUT = 3000;
+
+ /** Holds the routing key for the topic to send test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the routing key for the queue to receive reports on. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to send test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to receive reports on. */
+ private final Queue _response;
+
+ /** Holds the number of messages to send in each test run. */
+ private int numMessages;
+
+ /** A monitor used to wait for all reports to arrive back from consumers on. */
+ private CountDownLatch allReportsReceivedEvt;
+
+ /** Holds the connection to listen on. */
+ private Connection _connection;
+
+ /** Holds the channel for all test messages.*/
+ private Session _session;
+
+ /** Holds the producer to send test messages on. */
+ private MessageProducer publisher;
+
+ /**
+ * Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
+ * subscribers.
+ *
+ * @param connectionUri The broker URL.
+ * @param numMessages The number of messages to send in each test.
+ * @param numSubscribers The number of subscribes that are expected to reply with a report.
+ */
+ Publisher(String connectionUri, int numMessages, int numSubscribers)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Publisher(String connectionUri = " + connectionUri + ", int numMessages = " + numMessages
+ + ", int numSubscribers = " + numSubscribers + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUri);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to send test messages and listen for reports on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for reports on the response queue.
+ _session.createConsumer(_response).setMessageListener(this);
+
+ // Set up this listener with a producer to send the test messages and report requests on.
+ publisher = _session.createProducer(_topic);
+
+ // Keep the test parameters.
+ this.numMessages = numMessages;
+
+ // Set up a countdown to count all subscribers sending their reports.
+ allReportsReceivedEvt = new CountDownLatch(numSubscribers);
+
+ _connection.start();
+ System.out.println("Sending messages and waiting for reports...");
+ }
+
+ /**
+ * Start a test publisher. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ // Create an instance of this publisher with the command line parameters.
+ Publisher publisher = new Publisher(DEFAULT_URI, 1, 1);
+
+ // Publish the test messages.
+ publisher.doTest();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Sends the test messages and waits for all subscribers to reply with a report.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void doTest() throws JMSException
+ {
+ log.debug("public void DoTest(): called");
+
+ // Create a test message to send.
+ Message testMessage = _session.createTextMessage("test");
+
+ // Send the desired number of test messages.
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.send(testMessage);
+ }
+
+ log.debug("Sent " + numMessages + " test messages.");
+
+ // Send the report request.
+ Message reportRequestMessage = _session.createTextMessage("Report request message.");
+ reportRequestMessage.setStringProperty("TYPE", "REPORT_REQUEST");
+
+ reportRequestMessage.setBooleanProperty("BOOLEAN", false);
+ //reportRequestMessage.Headers.SetByte("BYTE", 5);
+ reportRequestMessage.setDoubleProperty("DOUBLE", 3.141);
+ reportRequestMessage.setFloatProperty("FLOAT", 1.0f);
+ reportRequestMessage.setIntProperty("INT", 1);
+ reportRequestMessage.setLongProperty("LONG", 1);
+ reportRequestMessage.setStringProperty("STRING", "hello");
+ reportRequestMessage.setShortProperty("SHORT", (short) 2);
+
+ publisher.send(reportRequestMessage);
+
+ log.debug("Sent the report request message, waiting for all replies...");
+
+ // Wait until all the reports come in.
+ try
+ {
+ allReportsReceivedEvt.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Check if all reports were really received or if the timeout occurred.
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got all reports.");
+ }
+ else
+ {
+ log.debug("Waiting for reports timed out, still waiting for " + allReportsReceivedEvt.getCount() + ".");
+ }
+
+ // Send the termination request.
+ Message terminationRequestMessage = _session.createTextMessage("Termination request message.");
+ terminationRequestMessage.setStringProperty("TYPE", "TERMINATION_REQUEST");
+ publisher.send(terminationRequestMessage);
+
+ log.debug("Sent the termination request message.");
+
+ // Close all message producers and consumers and the connection to the broker.
+ shutdown();
+ }
+
+ /**
+ * Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
+ * zero, at which time waiting threads are notified of this event.
+ *
+ * @param message The received report message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void OnMessage(Message message = " + message + "): called");
+
+ // Decrement the count of expected messages and release the wait monitor when this becomes zero.
+ allReportsReceivedEvt.countDown();
+
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got reports from all subscribers.");
+ }
+ }
+
+ /**
+ * Stops the message consumers and closes the connection.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.close();
+ }
+}
|