summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-25 12:46:22 +0000
committerRobert Greig <rgreig@apache.org>2007-01-25 12:46:22 +0000
commit4e1da8706dc8302caa7f97e29ac7cc33a60b5bb3 (patch)
tree07057da081931306c080d902c4351a8439b08b71
parent805c3d03b5e7226f3e3e02bcc94c83bf87c905e0 (diff)
downloadqpid-python-4e1da8706dc8302caa7f97e29ac7cc33a60b5bb3.tar.gz
(Submitted by Rupert Smith) Class has been documented.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499764 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java10
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java256
2 files changed, 191 insertions, 75 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
index c1dd5b18ad..97b411323e 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
@@ -45,8 +45,8 @@ import org.apache.qpid.jms.Session;
* <tr><td> Keep track of p2p or topic ping type.
* </table>
*
- * @todo This base class does not seem particularly usefull and some methods are duplicated in {@link AbstractPingProducer},
- * consider merging it into that class.
+ * @todo This base class does not seem particularly usefull and all functionality is duplicated in {@link AbstractPingProducer}.
+ * Merge it into that class.
*/
public abstract class AbstractPingClient
{
@@ -175,11 +175,11 @@ public abstract class AbstractPingClient
/**
* Sets the connection that this ping client is using.
*
- * @param _connection The ping connection.
+ * @param connection The ping connection.
*/
- public void setConnection(AMQConnection _connection)
+ public void setConnection(AMQConnection connection)
{
- this._connection = _connection;
+ this._connection = connection;
}
/**
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
index debaa0d785..091a865473 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.ping;
import java.io.IOException;
@@ -13,6 +33,7 @@ import javax.jms.Message;
import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
@@ -21,84 +42,105 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
/**
- * This abstract class captures functionality that is common to all ping producers. It provides functionality to
- * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework
- * for running a ping loop, and terminating that loop on exceptions or a shutdown handler.
- * <p/>
+ * Provides common functionality that ping producers (the senders of ping messages) can use. This base class keeps
+ * track of the connection used to send pings; provides a convenience method to commit a transaction only when a session
+ * to commit on is transactional; keeps track of whether the ping client is pinging to a queue or a topic; provides
+ * prompts to the console to terminate brokers before and after commits, in order to test failover functionality;
+ * requires sub-classes to implement a ping loop, that this provides a run loop to repeatedly call; provides a
+ * default shutdown hook to cleanly terminate the run loop; keeps track of the destinations to send pings to;
+ * provides a convenience method to generate short pauses; and provides a convience formatter for outputing readable
+ * timestamps for pings.
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Manage the connection.
- * <tr><td> Provide clean shutdown on exception or shutdown hook.
- * <tr><td> Provide useable shutdown hook implementation.
- * <tr><td> Run a ping loop.
+ * <tr><td> Commit the current transcation on a session.
+ * <tr><td> Generate failover promts.
+ * <tr><td> Keep track the connection.
+ * <tr><td> Keep track of p2p or topic ping type.
+ * <tr><td> Call ping loop to repeatedly send pings.
+ * <tr><td> Provide a shutdown hook.
+ * <tr><td> Generate short pauses.
* </table>
*
- * @author Rupert Smith
+ * @todo Destination count versus list of desintations is redundant. Use _destinions.size() to get the count and
+ * use a list of 1 destination when only 1 is needed. It is only important to distinguish when 1 destination
+ * is shared between multiple ping producers on the same JVM or if each ping producer has its own single
+ * destination.
+ *
+ * @todo Timestamp messages in nanos, not millis. Millis seems to have bad resolution, at least on windows.
*/
public abstract class AbstractPingProducer implements Runnable, ExceptionListener
{
private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
- /** tells if the test is being done for pubsub or p2p */
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
private boolean _isPubSub = false;
- /**
- * Used to format time stamping output.
- */
- protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /** This id generator is used to generate ids to append to the queue name to ensure that queues are unique. */
- private static AtomicInteger _queueSequenceID = new AtomicInteger();
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/**
- * Used to tell the ping loop when to terminate, it only runs while this is true.
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
*/
+ private static AtomicInteger _queueSequenceID = new AtomicInteger();
+
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
protected boolean _publish = true;
- /**
- * Holds the connection handle to the broker.
- */
+ /** Holds the connection to the broker. */
private Connection _connection;
- /**
- * Holds the producer session, need to create test messages.
- */
+ /** Holds the producer session, needed to create ping messages. */
private Session _producerSession;
- /**
- * Holds the number of destinations for multiple-destination test. By default it will be 1
- */
+ /** Holds the number of destinations that this ping producer will send pings to, defaulting to a single destination. */
protected int _destinationCount = 1;
- /** list of all the destinations for multiple-destinations test */
+ /** Holds the set of destiniations that this ping producer pings. */
private List<Destination> _destinations = new ArrayList<Destination>();
- /**
- * Holds the message producer to send the pings through.
- */
+ /** Holds the message producer to send the pings through. */
protected org.apache.qpid.jms.MessageProducer _producer;
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
protected boolean _failBeforeCommit = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
protected boolean _failAfterCommit = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
protected boolean _failBeforeSend = false;
+
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
protected boolean _failAfterSend = false;
+
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
protected boolean _failOnce = true;
/** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize = 1;
/**
- * Sets the test for pubsub or p2p.
- * @param value
+ * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
+ *
+ * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
*/
- public void setPubSub(boolean value)
+ public void setPubSub(boolean pubsub)
{
- _isPubSub = value;
+ _isPubSub = pubsub;
}
+ /**
+ * Checks whether this client is a p2p or pub/sub ping client.
+ *
+ * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
+ */
public boolean isPubSub()
{
return _isPubSub;
}
+
/**
* Convenience method for a short pause.
*
@@ -124,10 +166,11 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
public abstract void pingLoop();
/**
- * Generates a test message of the specified size.
+ * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
*
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
+ * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
*
* @return A freshly generated test message.
*
@@ -138,6 +181,7 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
// Timestamp the message.
msg.setLongProperty("timestamp", System.currentTimeMillis());
+
return msg;
}
@@ -191,85 +235,119 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
});
}
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
public Connection getConnection()
{
return _connection;
}
+ /**
+ * Sets the connection that this ping client is using.
+ *
+ * @param connection The ping connection.
+ */
public void setConnection(Connection connection)
{
this._connection = connection;
}
+ /**
+ * Gets the producer session that the ping client is using to send pings on.
+ *
+ * @return The producer session that the ping client is using to send pings on.
+ */
public Session getProducerSession()
{
return _producerSession;
}
+ /**
+ * Keeps track of the producer session that the ping client is using to send pings on.
+ *
+ * @param session The producer session that the ping client is using to send pings on.
+ */
public void setProducerSession(Session session)
{
this._producerSession = session;
}
+ /**
+ * Gets the number of destinations that this ping client is sending to.
+ *
+ * @return The number of destinations that this ping client is sending to.
+ */
public int getDestinationsCount()
{
return _destinationCount;
}
+ /**
+ * Sets the number of destination that this ping client should send to.
+ *
+ * @param count The number of destination that this ping client should send to.
+ *
+ * @deprectaed Use _destinations.size() instead.
+ */
public void setDestinationsCount(int count)
{
this._destinationCount = count;
}
+ /**
+ * Commits the transaction on the producer session.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ *
+ * @deprecated Use the commitTx(Session session) method instead, to explicitly specify which session is being
+ * committed. This makes it more obvious what is going on.
+ */
protected void commitTx() throws JMSException
{
commitTx(getProducerSession());
}
/**
- * Creates destinations dynamically and adds to the destinations list for multiple-destinations test
- * @param count
+ * Creates the specified number of destinations to send pings to. Topics or Queues will be created depending on
+ * the value of the {@link #_isPubSub} flag.
+ *
+ * @param count The number of ping destinations to create.
*/
protected void createDestinations(int count)
{
- if (isPubSub())
- {
- createTopics(count);
- }
- else
- {
- createQueues(count);
- }
- }
-
- private void createQueues(int count)
- {
+ // Create the desired number of ping destinations.
for (int i = 0; i < count; i++)
{
- AMQShortString name =
- new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
- AMQQueue queue = new AMQQueue(name, name, false, false, false);
-
- _destinations.add(queue);
- }
- }
+ AMQDestination destination = null;
- private void createTopics(int count)
- {
- for (int i = 0; i < count; i++)
- {
- AMQShortString name =
- new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
- AMQTopic topic = new AMQTopic(name);
+ // Check if this is a pub/sub pinger, in which case create topics.
+ if (isPubSub())
+ {
+ AMQShortString name =
+ new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ destination = new AMQTopic(name);
+ }
+ // Otherwise this is a p2p pinger, in which case create queues.
+ else
+ {
+ AMQShortString name =
+ new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
+ destination = new AMQQueue(name, name, false, false, false);
+ }
- _destinations.add(topic);
+ _destinations.add(destination);
}
}
/**
- * Returns the destination from the destinations list with given index. This is for multiple-destinations test
- * @param index
- * @return Destination with given index
+ * Returns the destination from the destinations list with the given index.
+ *
+ * @param index The index of the destination to get.
+ *
+ * @return Destination with the given index.
*/
protected Destination getDestination(int index)
{
@@ -277,8 +355,21 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
}
/**
- * Convenience method to commit the transaction on the session associated with this pinger.
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing (unless the failover after send flag is set).
+ *
+ * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
+ * is applied. This flag applies whether the pinger is transactional or not.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied. These flags will only apply if using a transactional pinger.
+ *
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
+ * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
*/
protected void commitTx(Session session) throws JMSException
{
@@ -351,11 +442,27 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
}
}
+ /**
+ * Sends the specified message to the default destination of the ping producer.
+ *
+ * @param message The message to send.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
protected void sendMessage(Message message) throws JMSException
{
sendMessage(null, message);
}
+ /**
+ * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
+ * of the ping producer. If an explicit destination is set, this overrides the default.
+ *
+ * @param destination The destination to send to.
+ * @param message The message to send.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
protected void sendMessage(Destination destination, Message message) throws JMSException
{
if (_failBeforeSend)
@@ -376,9 +483,15 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
else
{
_producer.send(destination, message);
- }
+ }
}
+ /**
+ * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ *
+ * @param broker The name of the broker to terminate.
+ */
protected void doFailover(String broker)
{
System.out.println("Kill Broker " + broker + " now then press return");
@@ -392,6 +505,10 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
System.out.println("Continuing.");
}
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
protected void doFailover()
{
System.out.println("Kill Broker now then press return");
@@ -403,6 +520,5 @@ public abstract class AbstractPingProducer implements Runnable, ExceptionListene
{ }
System.out.println("Continuing.");
-
}
}