summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-04-05 13:36:04 +0000
committerRobert Greig <rgreig@apache.org>2007-04-05 13:36:04 +0000
commit2a1e4c9663ff0725c061248a96ebab763678fdd6 (patch)
treec59fff417b426a559806edbf2c010412d57032b6
parentbe0ad1041a449196a328260d2210c5f7c27fa0a1 (diff)
downloadqpid-python-2a1e4c9663ff0725c061248a96ebab763678fdd6.tar.gz
Merged revisions 525531-525536 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r525531 | rgreig | 2007-04-04 16:18:44 +0100 (Wed, 04 Apr 2007) | 1 line Added standard command line handline ........ r525533 | rgreig | 2007-04-04 16:19:38 +0100 (Wed, 04 Apr 2007) | 1 line Added simeple file copy function. ........ r525535 | rgreig | 2007-04-04 16:20:30 +0100 (Wed, 04 Apr 2007) | 1 line Added comments and logging to track down bug. ........ r525536 | rgreig | 2007-04-04 16:21:43 +0100 (Wed, 04 Apr 2007) | 1 line Fixed dangling transaction problem by correctly binding queue. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525825 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java190
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java42
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/FileUtils.java36
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java5
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java92
6 files changed, 308 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 21988d97a8..2a83d9b649 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -21,73 +21,241 @@
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
+/**
+ * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues
+ * and exchanges in a transactional manner.
+ *
+ * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which
+ * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single
+ * transaction.
+ *
+ * <p/>The storage and removal of queues and exchanges, are not carried out in a transactional context.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort.
+ * <tr><td> Store and remove queues.
+ * <tr><td> Store and remove exchanges.
+ * <tr><td> Store and remove messages.
+ * <tr><td> Bind and unbind queues to exchanges.
+ * <tr><td> Enqueue and dequeue messages to queues.
+ * <tr><td> Generate message identifiers.
+ * </table>
+ */
public interface MessageStore
{
/**
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
- * @param virtualHost the virtual host using by this store
- * @param base the base element identifier from which all configuration items are relative. For example, if the base
- * element is "store", the all elements used by concrete classes will be "store.foo" etc.
- * @param config the apache commons configuration object
- * @throws Exception if an error occurs that means the store is unable to configure itself
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param base The base element identifier from which all configuration items are relative. For example, if
+ * the base element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object.
+ *
+ * @throws Exception If any error occurs that means the store is unable to configure itself.
*/
void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
- * @throws Exception if close fails
+ *
+ * @throws Exception If the close fails.
*/
void close() throws Exception;
+ /**
+ * Removes the specified message from the store in the given transactional store context.
+ *
+ * @param storeContext The transactional context to remove the message in.
+ * @param messageId Identifies the message to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
+ /**
+ * Makes the specified exchange persistent.
+ *
+ * @param exchange The exchange to persist.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void createExchange(Exchange exchange) throws AMQException;
+ /**
+ * Removes the specified persistent exchange.
+ *
+ * @param exchange The exchange to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void removeExchange(Exchange exchange) throws AMQException;
+ /**
+ * Binds the specified queue to an exchange with a routing key.
+ *
+ * @param exchange The exchange to bind to.
+ * @param routingKey The routing key to bind by.
+ * @param queue The queue to bind.
+ * @param args Additional parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+ /**
+ * Unbinds the specified from an exchange under a particular routing key.
+ *
+ * @param exchange The exchange to unbind from.
+ * @param routingKey The routing key to unbind.
+ * @param queue The queue to unbind.
+ * @param args Additonal parameters.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
-
+ /**
+ * Makes the specified queue persistent.
+ *
+ * @param queue The queue to store.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void createQueue(AMQQueue queue) throws AMQException;
+ /**
+ * Removes the specified queue from the persistent store.
+ *
+ * @param name The queue to remove.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void removeQueue(AMQShortString name) throws AMQException;
+ /**
+ * Places a message onto a specified queue, in a given transactional context.
+ *
+ * @param context The transactional context for the operation.
+ * @param name The name of the queue to place the message on.
+ * @param messageId The message to enqueue.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ /**
+ * Extracts a message from a specified queue, in a given transactional context.
+ *
+ * @param context The transactional context for the operation.
+ * @param name The name of the queue to take the message from.
+ * @param messageId The message to dequeue.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
+ /**
+ * Begins a transactional context.
+ *
+ * @param context The transactional context to begin.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void beginTran(StoreContext context) throws AMQException;
+ /**
+ * Commits all operations performed within a given transactional context.
+ *
+ * @param context The transactional context to commit all operations for.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void commitTran(StoreContext context) throws AMQException;
+ /**
+ * Abandons all operations performed within a given transactional context.
+ *
+ * @param context The transactional context to abandon.
+ *
+ * @throws AMQException If the operation fails for any reason.
+ */
void abortTran(StoreContext context) throws AMQException;
+ /**
+ * Tests a transactional context to see if it has been begun but not yet committed or aborted.
+ *
+ * @param context The transactional context to test.
+ *
+ * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise.
+ */
boolean inTran(StoreContext context);
/**
* Return a valid, currently unused message id.
- * @return a message id
+ *
+ * @return A fresh message id.
*/
Long getNewMessageId();
- void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException;
+ /**
+ * Stores a chunk of message data.
+ *
+ * @param context The transactional context for the operation.
+ * @param messageId The message to store the data for.
+ * @param index The index of the data chunk.
+ * @param contentBody The content of the data chunk.
+ * @param lastContentBody Flag to indicate that this is the last such chunk for the message.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
+ void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody,
+ boolean lastContentBody) throws AMQException;
+ /**
+ * Stores message meta-data.
+ *
+ * @param context The transactional context for the operation.
+ * @param messageId The message to store the data for.
+ * @param messageMetaData The message meta data to store.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
+ /**
+ * Retrieves message meta-data.
+ *
+ * @param context The transactional context for the operation.
+ * @param messageId The message to get the meta-data for.
+ *
+ * @return The message meta data.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+ /**
+ * Retrieves a chunk of message data.
+ *
+ * @param context The transactional context for the operation.
+ * @param messageId The message to get the data chunk for.
+ * @param index The offset index of the data chunk within the message.
+ *
+ * @return A chunk of message data.
+ *
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ */
ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
index 2e2f2ba7d6..3ee49d58cf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
@@ -22,16 +22,14 @@ package org.apache.qpid.server.store;
import org.apache.log4j.Logger;
-
/**
* A context that the store can use to associate with a transactional context. For example, it could store
* some kind of txn id.
- *
+ *
* @author Apache Software Foundation
*/
public class StoreContext
{
-
private static final Logger _logger = Logger.getLogger(StoreContext.class);
private String _name;
@@ -54,7 +52,17 @@ public class StoreContext
public void setPayload(Object payload)
{
- _logger.debug("["+_name+"] Setting payload: " + payload);
+ _logger.debug("public void setPayload(Object payload = " + payload + "): called");
_payload = payload;
}
+
+ /**
+ * Prints out the transactional context as a string, mainly for debugging purposes.
+ *
+ * @return The transactional context as a string.
+ */
+ public String toString()
+ {
+ return "<_name = " + _name + ", _payload = " + _payload + ">";
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
index 6173780aa7..9051d6b470 100644
--- a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
+++ b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
@@ -143,8 +143,8 @@ public class CommandLineParser
String[] nextOptionSpec = config[i];
addOption(nextOptionSpec[0], nextOptionSpec[1], (nextOptionSpec.length > 2) ? nextOptionSpec[2] : null,
- (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false,
- (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
+ (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false,
+ (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
}
}
@@ -209,8 +209,9 @@ public class CommandLineParser
// Print usage on each of the command line options.
for (CommandLineOption optionInfo : optionMap.values())
{
- result += optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "")
- + optionInfo.comment + "\n";
+ result +=
+ optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "")
+ + optionInfo.comment + "\n";
}
return result;
@@ -604,6 +605,37 @@ public class CommandLineParser
}
/**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ */
+ public static Properties processCommandLine(String[] args, CommandLineParser commandLine)
+ {
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+
+ // Add all the trailing command line options (name=value pairs) to system properties. They may be picked up
+ // from there.
+ commandLine.addCommandLineToSysProperties();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
+
+ return options;
+ }
+
+ /**
* Holds information about a command line options. This includes what its name is, whether or not it is a flag,
* whether or not it is mandatory, what its user comment is, what its argument reminder text is and what its
* regular expression format is.
@@ -646,7 +678,7 @@ public class CommandLineParser
* @param formatRegexp The regular expression that the argument to this option must meet to be valid.
*/
public CommandLineOption(String option, boolean expectsArgs, String comment, String argument, boolean mandatory,
- String formatRegexp)
+ String formatRegexp)
{
this.option = option;
this.expectsArgs = expectsArgs;
diff --git a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
index ba79a6e8d4..3c8d3f916b 100644
--- a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
+++ b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
@@ -158,4 +158,40 @@ public class FileUtils
return is;
}
+
+ /**
+ * Copies the specified source file to the specified destintaion file. If the destinationst file does not exist,
+ * it is created.
+ *
+ * @param src The source file name.
+ * @param dst The destination file name.
+ */
+ public static void copy(File src, File dst)
+ {
+ try
+ {
+ InputStream in = new FileInputStream(src);
+ if (!dst.exists())
+ {
+ dst.createNewFile();
+ }
+
+ OutputStream out = new FileOutputStream(dst);
+
+ // Transfer bytes from in to out
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) > 0)
+ {
+ out.write(buf, 0, len);
+ }
+
+ in.close();
+ out.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
index 77526141d6..9439604acd 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
@@ -35,6 +35,7 @@ import javax.jms.Message;
import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.MathUtils;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -71,6 +72,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
* <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
* <tr><td> persistent <td> true <td> Only makes sense to test persistent.
+ * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
* <tr><td> commitBatchSize <td> 10
* <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
* </table>
@@ -108,6 +110,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
defaults.setProperty(RATE_PROPNAME, "20");
+ defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
}
/** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
@@ -150,7 +153,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
try
{
// Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
PingDurableClient pingProducer = new PingDurableClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 44f7083bb5..913685bca2 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -35,13 +35,10 @@ import javax.jms.*;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.url.URLSyntaxException;
@@ -90,6 +87,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
* <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
* 1 - AUTO_ACKNOWLEDGE
@@ -257,6 +255,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
/** Holds the name of the proeprty to get the message acknowledgement mode from. */
public static final String ACK_MODE_PROPNAME = "ackMode";
@@ -299,6 +300,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
@@ -337,6 +339,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Flag used to indicate if the destinations should be unique client. */
protected boolean _isUnique;
+ /** Flag used to indicate that durable destination should be used. */
+ protected boolean _isDurable;
+
/** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
protected boolean _failBeforeCommit;
@@ -424,6 +429,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+ private String _clientID;
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
@@ -463,6 +469,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
@@ -498,10 +505,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ _clientID = address.getHostName() + System.currentTimeMillis();
// Create a connection to the broker.
- createConnection(clientID);
+ createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
_producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
@@ -509,7 +516,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Create the destinations to send pings to and receive replies from.
_replyDestination = _consumerSession.createTemporaryQueue();
- createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
if (producer)
@@ -548,7 +555,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
try
{
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -577,43 +584,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
- * a map of properties containing them.
- *
- * @param args The command line.
- *
- * @return A set of properties containing all name=value pairs from the command line.
- *
- * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
- * CommandLineParser class.
- */
- protected static Properties processCommandLine(String[] args)
- {
- // Use the command line parser to evaluate the command line.
- CommandLineParser commandLine = new CommandLineParser(new String[][] {});
-
- // Capture the command line arguments or display errors and correct usage and then exit.
- Properties options = null;
-
- try
- {
- options = commandLine.parseCommandLine(args);
-
- // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
- // overridden values from there.
- commandLine.addCommandLineToSysProperties();
- }
- catch (IllegalArgumentException e)
- {
- System.out.println(commandLine.getErrors());
- System.out.println(commandLine.getUsage());
- System.exit(1);
- }
-
- return options;
- }
-
- /**
* Convenience method for a short pause.
*
* @param sleepTime The time in milliseconds to pause for.
@@ -677,11 +647,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
- public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
+ boolean durable) throws JMSException, AMQException
{
log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
@@ -709,13 +680,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- log.debug("Created topic " + destination);
+ if (!durable)
+ {
+ destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ log.debug("Created non-durable topic " + destination);
+ }
+ else
+ {
+ destination =
+ AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+ _clientID, (AMQConnection) _connection);
+ log.debug("Created durable topic " + destination);
+ }
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ AMQShortString destinationName = new AMQShortString(rootName + id);
+ destination =
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+ _isDurable);
+ ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
+ ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+
log.debug("Created queue " + destination);
}