diff options
author | Robert Greig <rgreig@apache.org> | 2007-02-28 11:05:20 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-02-28 11:05:20 +0000 |
commit | a707cad8e7bf0e7e5dc9725d2d08e63456dcb82d (patch) | |
tree | b893be439771ef8aa64cd6fc30539c8cb5340a01 /java/integrationtests | |
parent | a20abbb7c6c6ac4f7105cb57544375e7058f3904 (diff) | |
download | qpid-python-a707cad8e7bf0e7e5dc9725d2d08e63456dcb82d.tar.gz |
(Patch submitted by Rupert Smith). java_iop.diff
This adds some interop tests (which are just copies of the existing topic tests) in a new integrationtests module. The java 1.4. client build has been updated to be able to run these tests. They succesfully talk to each other and the .net code. This is intended to be a starting point of a more in depth interop test as per the spec, but does not implement what is in the draft spec yet.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@512699 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/integrationtests')
-rw-r--r-- | java/integrationtests/README.txt | 13 | ||||
-rw-r--r-- | java/integrationtests/pom.xml | 127 | ||||
-rw-r--r-- | java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java | 291 | ||||
-rw-r--r-- | java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java | 244 |
4 files changed, 675 insertions, 0 deletions
diff --git a/java/integrationtests/README.txt b/java/integrationtests/README.txt new file mode 100644 index 0000000000..00a21883a9 --- /dev/null +++ b/java/integrationtests/README.txt @@ -0,0 +1,13 @@ +This module contains integration tests, for testing a java client againt *any* broker
+implementation or against other clients. These tests must not rely on starting the
+Java broker in-vm but must depend on a broker being started independantly before running
+the tests in this module. By default tests in this module will expect the broker to be
+started on localhost on the default port, but this can be overridden by passing in a
+sys property to maven. Interop tests are in this module. Java broker specific tests that
+use an in-vm broker should go in the systests module.
+
+Don't set the tests in this module to run by default as part of the maven build, until
+there is a script to start and stop the broker; needed to fully automate these tests.
+Interop tests will always be run using a seperate script (not from maven) but it might
+be worthwile to script into the maven build starting of the Java broker, and running
+these tests against it.
\ No newline at end of file diff --git a/java/integrationtests/pom.xml b/java/integrationtests/pom.xml new file mode 100644 index 0000000000..79d587f360 --- /dev/null +++ b/java/integrationtests/pom.xml @@ -0,0 +1,127 @@ +<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-integrationtests</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ <name>Qpid Integration Tests</name>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid</artifactId>
+ <version>1.0-incubating-M2-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <topDirectoryLocation>..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+
+ <!-- These tests depend on the client API only. -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-client</artifactId>
+ </dependency>
+
+ <!-- Test dependencies. -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ </plugin>
+
+ <!-- Backports the module to Java 1.4. This is done during the packaging phase as a transformation of the Jar. -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>retrotranslator-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>retro-client</id>
+ <phase>package</phase>
+ <goals>
+ <goal>translate</goal>
+ </goals>
+ <configuration>
+ <!--<destdir>${project.build.directory}/retro-classes</destdir>-->
+ <destjar>${project.build.directory}/${project.build.finalName}-java14.jar</destjar>
+ <verify>${retrotranslator.verify}</verify>
+ <verifyClasspath>
+ <element>${retrotranslator.1.4-rt-path}</element>
+ <element>${retrotranslator.1.4-jce-path}</element>
+ <element>${retrotranslator.1.4-jsse-path}</element>
+ <element>${retrotranslator.1.4-sasl-path}</element>
+ </verifyClasspath>
+ <failonwarning>false</failonwarning>
+ <includes>
+ <include>
+ <directory>${project.build.directory}</directory>
+ <pattern>${project.build.finalName}.jar</pattern>
+ </include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- This identifies the backported java 1.4 jars and attaches them as jar (classified as java14) build artifacts. -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-artifacts</id>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>${project.build.directory}/${project.build.finalName}-java14.jar</file>
+ <type>jar</type>
+ <classifier>java14</classifier>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+
+ <resources>
+ </resources>
+
+ </build>
+
+</project>
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java new file mode 100644 index 0000000000..dbd07958fd --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/Listener.java @@ -0,0 +1,291 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop;
+
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Listener implements the listening end of the Qpid interop tests. It is capable of being run as a standalone listener
+ * that responds to the test messages send by the publishing end of the tests implemented by {@link Publisher}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Count messages received on a topic. <td> {@link Publisher}
+ * <tr><td> Send reports on messages received, when requested to. <td> {@link Publisher}
+ * <tr><td> Shutdown, when requested to. <td> {@link Publisher}
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report message, just to check if the other end can decode them
+ * correctly. Not really the right place to test this, so remove them from {@link #sendReport()} once a better
+ * test exists.
+ */
+public class Listener implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the name of (routing key for) the topic to receive test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the name of (routing key for) the queue to send reports to. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to receive test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to send reports to. */
+ private final Queue _response;
+
+ /** Holds the connection to listen on. */
+ private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
+ private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
+ private final javax.jms.Session _session;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
+ private boolean init;
+
+ /** Holds the count of messages received by this listener. */
+ private int count;
+
+ /** Used to hold the start time of the first message. */
+ private long start;
+
+ /**
+ * Creates a topic listener using the specified broker URL.
+ *
+ * @param connectionUrl The broker URL to listen on.
+ *
+ * @throws AMQException If the broker connection cannot be established.
+ * @throws URLSyntaxException If the broker URL syntax is not correct.
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ Listener(String connectionUrl) throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Listener(String connectionUrl = " + connectionUrl + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUrl);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to listen for test and control messages on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for incoming messages on the test topic.
+ _session.createConsumer(_topic).setMessageListener(this);
+
+ // Set up this listener with a producer to send the reports on.
+ _controller = _session.createProducer(_response);
+
+ _connection.start();
+ System.out.println("Waiting for messages...");
+ }
+
+ /**
+ * Starts a test subscriber. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ new Listener(DEFAULT_URI);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Handles all message received by this listener. Test messages are counted, report messages result in a report being sent and
+ * shutdown messages result in this listener being terminated.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void onMessage(Message message = " + message + "): called");
+
+ // Take the start time of the first message if this is the first message.
+ if (!init)
+ {
+ start = System.nanoTime() / 1000000;
+ count = 0;
+ init = true;
+ }
+
+ try
+ {
+ // Check if the message is a control message telling this listener to shut down.
+ if (isShutdown(message))
+ {
+ log.debug("Got a shutdown message.");
+ shutdown();
+ }
+ // Check if the message is a report request message asking this listener to respond with the message count.
+ else if (isReport(message))
+ {
+ log.debug("Got a report request message.");
+
+ // Send the message count report.
+ sendReport();
+
+ // Reset the initialization flag so that the next message is considered to be the first.
+ init = false;
+ }
+ // Otherwise it is an ordinary test message, so increment the message count.
+ else
+ {
+ count++;
+ }
+ }
+ catch (JMSException e)
+ {
+ log.warn("There was a JMSException during onMessage.", e);
+ }
+ }
+
+ /**
+ * Checks a message to see if it is a termination request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a termination request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks a message to see if it is a report request control message.
+ *
+ * @param m The message to check.
+ *
+ * @return <tt>true</tt> if it is a report request control message, <tt>false</tt> otherwise.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ return result;
+ }
+
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
+ {
+ //log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ // + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+ //log.debug("comp = " + comp);
+
+ boolean result = (comp != null) && comp.equals(value);
+ //log.debug("result = " + result);
+
+ return result;
+ }
+
+ /**
+ * Closes down the connection to the broker.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.stop();
+ _connection.close();
+ }
+
+ /**
+ * Send the report message to the response queue.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void sendReport() throws JMSException
+ {
+ log.debug("private void report(): called");
+
+ // Create the report message.
+ long time = ((System.nanoTime() / 1000000) - start);
+ String msg = "Received " + count + " in " + time + "ms";
+ Message message = _session.createTextMessage(msg);
+
+ // Shove some more field table types in the message just to see if the other end can handle it.
+ message.setBooleanProperty("BOOLEAN", true);
+ //message.setByteProperty("BYTE", (byte) 5);
+ message.setDoubleProperty("DOUBLE", Math.PI);
+ message.setFloatProperty("FLOAT", 1.0f);
+ message.setIntProperty("INT", 1);
+ message.setShortProperty("SHORT", (short) 1);
+ message.setLongProperty("LONG", (long) 1827361278);
+ message.setStringProperty("STRING", "hello");
+
+ // Send the report message.
+ _controller.send(message);
+ log.debug("Sent report: " + msg);
+ }
+}
diff --git a/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java b/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java new file mode 100644 index 0000000000..cab679876f --- /dev/null +++ b/java/integrationtests/src/main/java/org/apache/qpid/interop/Publisher.java @@ -0,0 +1,244 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.interop;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Publisher is the sending end of Qpid interop tests. It is capable of being run as a standalone publisher
+ * that sends test messages to the listening end of the tests implemented by {@link Listener}.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td>
+ *
+ * @todo This doesn't implement the interop test spec yet. Its a port of the old topic tests but has been adapted with
+ * interop spec in mind.
+ *
+ * @todo I've added lots of field table types in the report request message, just to check if the other end can decode
+ * them correctly. Not really the right place to test this, so remove them from {@link #doTest()} once a better
+ * test exists.
+ */
+public class Publisher implements MessageListener
+{
+ private static Logger log = Logger.getLogger(Publisher.class);
+
+ /** The default AMQ connection URL to use for tests. */
+ public static final String DEFAULT_URI = "amqp://guest:guest@default/test?brokerlist='tcp://localhost:5672'";
+
+ /** Holds the default test timeout for broker communications before tests give up. */
+ public static final int TIMEOUT = 3000;
+
+ /** Holds the routing key for the topic to send test messages on. */
+ public static final String CONTROL_TOPIC = "topic_control";
+
+ /** Holds the routing key for the queue to receive reports on. */
+ public static final String RESPONSE_QUEUE = "response";
+
+ /** Holds the JMS Topic to send test messages on. */
+ private final Topic _topic;
+
+ /** Holds the JMS Queue to receive reports on. */
+ private final Queue _response;
+
+ /** Holds the number of messages to send in each test run. */
+ private int numMessages;
+
+ /** A monitor used to wait for all reports to arrive back from consumers on. */
+ private CountDownLatch allReportsReceivedEvt;
+
+ /** Holds the connection to listen on. */
+ private Connection _connection;
+
+ /** Holds the channel for all test messages.*/
+ private Session _session;
+
+ /** Holds the producer to send test messages on. */
+ private MessageProducer publisher;
+
+ /**
+ * Creates a topic publisher that will send the specifed number of messages and expect the specifed number of report back from test
+ * subscribers.
+ *
+ * @param connectionUri The broker URL.
+ * @param numMessages The number of messages to send in each test.
+ * @param numSubscribers The number of subscribes that are expected to reply with a report.
+ */
+ Publisher(String connectionUri, int numMessages, int numSubscribers)
+ throws AMQException, JMSException, URLSyntaxException
+ {
+ log.debug("Publisher(String connectionUri = " + connectionUri + ", int numMessages = " + numMessages
+ + ", int numSubscribers = " + numSubscribers + "): called");
+
+ // Create a connection to the broker.
+ _connection = new AMQConnection(connectionUri);
+
+ // Establish a session on the broker.
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Set up the destinations to send test messages and listen for reports on.
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
+
+ // Set this listener up to listen for reports on the response queue.
+ _session.createConsumer(_response).setMessageListener(this);
+
+ // Set up this listener with a producer to send the test messages and report requests on.
+ publisher = _session.createProducer(_topic);
+
+ // Keep the test parameters.
+ this.numMessages = numMessages;
+
+ // Set up a countdown to count all subscribers sending their reports.
+ allReportsReceivedEvt = new CountDownLatch(numSubscribers);
+
+ _connection.start();
+ System.out.println("Sending messages and waiting for reports...");
+ }
+
+ /**
+ * Start a test publisher. The broker URL must be specified as the first command line argument.
+ *
+ * @param argv The command line arguments, ignored.
+ *
+ * @todo Add command line arguments to configure all aspects of the test.
+ */
+ public static void main(String[] argv)
+ {
+ try
+ {
+ // Create an instance of this publisher with the command line parameters.
+ Publisher publisher = new Publisher(DEFAULT_URI, 1, 1);
+
+ // Publish the test messages.
+ publisher.doTest();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Sends the test messages and waits for all subscribers to reply with a report.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void doTest() throws JMSException
+ {
+ log.debug("public void DoTest(): called");
+
+ // Create a test message to send.
+ Message testMessage = _session.createTextMessage("test");
+
+ // Send the desired number of test messages.
+ for (int i = 0; i < numMessages; i++)
+ {
+ publisher.send(testMessage);
+ }
+
+ log.debug("Sent " + numMessages + " test messages.");
+
+ // Send the report request.
+ Message reportRequestMessage = _session.createTextMessage("Report request message.");
+ reportRequestMessage.setStringProperty("TYPE", "REPORT_REQUEST");
+
+ reportRequestMessage.setBooleanProperty("BOOLEAN", false);
+ //reportRequestMessage.Headers.SetByte("BYTE", 5);
+ reportRequestMessage.setDoubleProperty("DOUBLE", 3.141);
+ reportRequestMessage.setFloatProperty("FLOAT", 1.0f);
+ reportRequestMessage.setIntProperty("INT", 1);
+ reportRequestMessage.setLongProperty("LONG", 1);
+ reportRequestMessage.setStringProperty("STRING", "hello");
+ reportRequestMessage.setShortProperty("SHORT", (short) 2);
+
+ publisher.send(reportRequestMessage);
+
+ log.debug("Sent the report request message, waiting for all replies...");
+
+ // Wait until all the reports come in.
+ try
+ {
+ allReportsReceivedEvt.await(TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ { }
+
+ // Check if all reports were really received or if the timeout occurred.
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got all reports.");
+ }
+ else
+ {
+ log.debug("Waiting for reports timed out, still waiting for " + allReportsReceivedEvt.getCount() + ".");
+ }
+
+ // Send the termination request.
+ Message terminationRequestMessage = _session.createTextMessage("Termination request message.");
+ terminationRequestMessage.setStringProperty("TYPE", "TERMINATION_REQUEST");
+ publisher.send(terminationRequestMessage);
+
+ log.debug("Sent the termination request message.");
+
+ // Close all message producers and consumers and the connection to the broker.
+ shutdown();
+ }
+
+ /**
+ * Handles all report messages from subscribers. This decrements the count of subscribers that are still to reply, until this becomes
+ * zero, at which time waiting threads are notified of this event.
+ *
+ * @param message The received report message.
+ */
+ public void onMessage(Message message)
+ {
+ log.debug("public void OnMessage(Message message = " + message + "): called");
+
+ // Decrement the count of expected messages and release the wait monitor when this becomes zero.
+ allReportsReceivedEvt.countDown();
+
+ if (allReportsReceivedEvt.getCount() == 0)
+ {
+ log.debug("Got reports from all subscribers.");
+ }
+ }
+
+ /**
+ * Stops the message consumers and closes the connection.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ private void shutdown() throws JMSException
+ {
+ _session.close();
+ _connection.close();
+ }
+}
|