diff options
author | Robert Greig <rgreig@apache.org> | 2007-04-05 13:36:04 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-04-05 13:36:04 +0000 |
commit | 2a1e4c9663ff0725c061248a96ebab763678fdd6 (patch) | |
tree | c59fff417b426a559806edbf2c010412d57032b6 | |
parent | be0ad1041a449196a328260d2210c5f7c27fa0a1 (diff) | |
download | qpid-python-2a1e4c9663ff0725c061248a96ebab763678fdd6.tar.gz |
Merged revisions 525531-525536 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r525531 | rgreig | 2007-04-04 16:18:44 +0100 (Wed, 04 Apr 2007) | 1 line
Added standard command line handline
........
r525533 | rgreig | 2007-04-04 16:19:38 +0100 (Wed, 04 Apr 2007) | 1 line
Added simeple file copy function.
........
r525535 | rgreig | 2007-04-04 16:20:30 +0100 (Wed, 04 Apr 2007) | 1 line
Added comments and logging to track down bug.
........
r525536 | rgreig | 2007-04-04 16:21:43 +0100 (Wed, 04 Apr 2007) | 1 line
Fixed dangling transaction problem by correctly binding queue.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525825 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 308 insertions, 73 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java index 21988d97a8..2a83d9b649 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -21,73 +21,241 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MessageMetaData; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.exchange.Exchange; +/** + * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages, queues + * and exchanges in a transactional manner. + * + * <p/>All message store, remove, enqueue and dequeue operations are carried out against a {@link StoreContext} which + * encapsulates the transactional context they are performed in. Many such operations can be carried out in a single + * transaction. + * + * <p/>The storage and removal of queues and exchanges, are not carried out in a transactional context. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities + * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort. + * <tr><td> Store and remove queues. + * <tr><td> Store and remove exchanges. + * <tr><td> Store and remove messages. + * <tr><td> Bind and unbind queues to exchanges. + * <tr><td> Enqueue and dequeue messages to queues. + * <tr><td> Generate message identifiers. + * </table> + */ public interface MessageStore { /** * Called after instantiation in order to configure the message store. A particular implementation can define * whatever parameters it wants. - * @param virtualHost the virtual host using by this store - * @param base the base element identifier from which all configuration items are relative. For example, if the base - * element is "store", the all elements used by concrete classes will be "store.foo" etc. - * @param config the apache commons configuration object - * @throws Exception if an error occurs that means the store is unable to configure itself + * + * @param virtualHost The virtual host using by this store + * @param base The base element identifier from which all configuration items are relative. For example, if + * the base element is "store", the all elements used by concrete classes will be "store.foo" etc. + * @param config The apache commons configuration object. + * + * @throws Exception If any error occurs that means the store is unable to configure itself. */ void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception; /** * Called to close and cleanup any resources used by the message store. - * @throws Exception if close fails + * + * @throws Exception If the close fails. */ void close() throws Exception; + /** + * Removes the specified message from the store in the given transactional store context. + * + * @param storeContext The transactional context to remove the message in. + * @param messageId Identifies the message to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeMessage(StoreContext storeContext, Long messageId) throws AMQException; + /** + * Makes the specified exchange persistent. + * + * @param exchange The exchange to persist. + * + * @throws AMQException If the operation fails for any reason. + */ void createExchange(Exchange exchange) throws AMQException; + /** + * Removes the specified persistent exchange. + * + * @param exchange The exchange to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeExchange(Exchange exchange) throws AMQException; + /** + * Binds the specified queue to an exchange with a routing key. + * + * @param exchange The exchange to bind to. + * @param routingKey The routing key to bind by. + * @param queue The queue to bind. + * @param args Additional parameters. + * + * @throws AMQException If the operation fails for any reason. + */ void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; + /** + * Unbinds the specified from an exchange under a particular routing key. + * + * @param exchange The exchange to unbind from. + * @param routingKey The routing key to unbind. + * @param queue The queue to unbind. + * @param args Additonal parameters. + * + * @throws AMQException If the operation fails for any reason. + */ void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException; - + /** + * Makes the specified queue persistent. + * + * @param queue The queue to store. + * + * @throws AMQException If the operation fails for any reason. + */ void createQueue(AMQQueue queue) throws AMQException; + /** + * Removes the specified queue from the persistent store. + * + * @param name The queue to remove. + * + * @throws AMQException If the operation fails for any reason. + */ void removeQueue(AMQShortString name) throws AMQException; + /** + * Places a message onto a specified queue, in a given transactional context. + * + * @param context The transactional context for the operation. + * @param name The name of the queue to place the message on. + * @param messageId The message to enqueue. + * + * @throws AMQException If the operation fails for any reason. + */ void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + /** + * Extracts a message from a specified queue, in a given transactional context. + * + * @param context The transactional context for the operation. + * @param name The name of the queue to take the message from. + * @param messageId The message to dequeue. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException; + /** + * Begins a transactional context. + * + * @param context The transactional context to begin. + * + * @throws AMQException If the operation fails for any reason. + */ void beginTran(StoreContext context) throws AMQException; + /** + * Commits all operations performed within a given transactional context. + * + * @param context The transactional context to commit all operations for. + * + * @throws AMQException If the operation fails for any reason. + */ void commitTran(StoreContext context) throws AMQException; + /** + * Abandons all operations performed within a given transactional context. + * + * @param context The transactional context to abandon. + * + * @throws AMQException If the operation fails for any reason. + */ void abortTran(StoreContext context) throws AMQException; + /** + * Tests a transactional context to see if it has been begun but not yet committed or aborted. + * + * @param context The transactional context to test. + * + * @return <tt>true</tt> if the transactional context is live, <tt>false</tt> otherwise. + */ boolean inTran(StoreContext context); /** * Return a valid, currently unused message id. - * @return a message id + * + * @return A fresh message id. */ Long getNewMessageId(); - void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException; + /** + * Stores a chunk of message data. + * + * @param context The transactional context for the operation. + * @param messageId The message to store the data for. + * @param index The index of the data chunk. + * @param contentBody The content of the data chunk. + * @param lastContentBody Flag to indicate that this is the last such chunk for the message. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ + void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, + boolean lastContentBody) throws AMQException; + /** + * Stores message meta-data. + * + * @param context The transactional context for the operation. + * @param messageId The message to store the data for. + * @param messageMetaData The message meta data to store. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException; + /** + * Retrieves message meta-data. + * + * @param context The transactional context for the operation. + * @param messageId The message to get the meta-data for. + * + * @return The message meta data. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException; + /** + * Retrieves a chunk of message data. + * + * @param context The transactional context for the operation. + * @param messageId The message to get the data chunk for. + * @param index The offset index of the data chunk within the message. + * + * @return A chunk of message data. + * + * @throws AMQException If the operation fails for any reason, or if the specified message does not exist. + */ ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException; - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java index 2e2f2ba7d6..3ee49d58cf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java @@ -22,16 +22,14 @@ package org.apache.qpid.server.store; import org.apache.log4j.Logger; - /** * A context that the store can use to associate with a transactional context. For example, it could store * some kind of txn id. - * + * * @author Apache Software Foundation */ public class StoreContext { - private static final Logger _logger = Logger.getLogger(StoreContext.class); private String _name; @@ -54,7 +52,17 @@ public class StoreContext public void setPayload(Object payload) { - _logger.debug("["+_name+"] Setting payload: " + payload); + _logger.debug("public void setPayload(Object payload = " + payload + "): called"); _payload = payload; } + + /** + * Prints out the transactional context as a string, mainly for debugging purposes. + * + * @return The transactional context as a string. + */ + public String toString() + { + return "<_name = " + _name + ", _payload = " + _payload + ">"; + } } diff --git a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java index 6173780aa7..9051d6b470 100644 --- a/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java @@ -143,8 +143,8 @@ public class CommandLineParser String[] nextOptionSpec = config[i];
addOption(nextOptionSpec[0], nextOptionSpec[1], (nextOptionSpec.length > 2) ? nextOptionSpec[2] : null,
- (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false,
- (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
+ (nextOptionSpec.length > 3) ? ("true".equals(nextOptionSpec[3]) ? true : false) : false,
+ (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
}
}
@@ -209,8 +209,9 @@ public class CommandLineParser // Print usage on each of the command line options.
for (CommandLineOption optionInfo : optionMap.values())
{
- result += optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "")
- + optionInfo.comment + "\n";
+ result +=
+ optionInfo.option + " " + ((optionInfo.argument != null) ? (optionInfo.argument + " ") : "")
+ + optionInfo.comment + "\n";
}
return result;
@@ -604,6 +605,37 @@ public class CommandLineParser }
/**
+ * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
+ * a map of properties containing them.
+ *
+ * @param args The command line.
+ *
+ * @return A set of properties containing all name=value pairs from the command line.
+ */
+ public static Properties processCommandLine(String[] args, CommandLineParser commandLine)
+ {
+ // Capture the command line arguments or display errors and correct usage and then exit.
+ Properties options = null;
+
+ try
+ {
+ options = commandLine.parseCommandLine(args);
+
+ // Add all the trailing command line options (name=value pairs) to system properties. They may be picked up
+ // from there.
+ commandLine.addCommandLineToSysProperties();
+ }
+ catch (IllegalArgumentException e)
+ {
+ System.out.println(commandLine.getErrors());
+ System.out.println(commandLine.getUsage());
+ System.exit(1);
+ }
+
+ return options;
+ }
+
+ /**
* Holds information about a command line options. This includes what its name is, whether or not it is a flag,
* whether or not it is mandatory, what its user comment is, what its argument reminder text is and what its
* regular expression format is.
@@ -646,7 +678,7 @@ public class CommandLineParser * @param formatRegexp The regular expression that the argument to this option must meet to be valid.
*/
public CommandLineOption(String option, boolean expectsArgs, String comment, String argument, boolean mandatory,
- String formatRegexp)
+ String formatRegexp)
{
this.option = option;
this.expectsArgs = expectsArgs;
diff --git a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java index ba79a6e8d4..3c8d3f916b 100644 --- a/java/common/src/main/java/org/apache/qpid/util/FileUtils.java +++ b/java/common/src/main/java/org/apache/qpid/util/FileUtils.java @@ -158,4 +158,40 @@ public class FileUtils return is;
}
+
+ /**
+ * Copies the specified source file to the specified destintaion file. If the destinationst file does not exist,
+ * it is created.
+ *
+ * @param src The source file name.
+ * @param dst The destination file name.
+ */
+ public static void copy(File src, File dst)
+ {
+ try
+ {
+ InputStream in = new FileInputStream(src);
+ if (!dst.exists())
+ {
+ dst.createNewFile();
+ }
+
+ OutputStream out = new FileOutputStream(dst);
+
+ // Transfer bytes from in to out
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) > 0)
+ {
+ out.write(buf, 0, len);
+ }
+
+ in.close();
+ out.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 77526141d6..9439604acd 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -35,6 +35,7 @@ import javax.jms.Message; import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.MathUtils;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -71,6 +72,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
* <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
* <tr><td> persistent <td> true <td> Only makes sense to test persistent.
+ * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
* <tr><td> commitBatchSize <td> 10
* <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
* </table>
@@ -108,6 +110,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
defaults.setProperty(RATE_PROPNAME, "20");
+ defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
}
/** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
@@ -150,7 +153,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList try
{
// Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
PingDurableClient pingProducer = new PingDurableClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 44f7083bb5..913685bca2 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -35,13 +35,10 @@ import javax.jms.*; import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.url.URLSyntaxException;
@@ -90,6 +87,7 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
* <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
* 1 - AUTO_ACKNOWLEDGE
@@ -257,6 +255,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
/** Holds the name of the proeprty to get the message acknowledgement mode from. */
public static final String ACK_MODE_PROPNAME = "ackMode";
@@ -299,6 +300,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
@@ -337,6 +339,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Flag used to indicate if the destinations should be unique client. */
protected boolean _isUnique;
+ /** Flag used to indicate that durable destination should be used. */
+ protected boolean _isDurable;
+
/** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
protected boolean _failBeforeCommit;
@@ -424,6 +429,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+ private String _clientID;
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
@@ -463,6 +469,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+ _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
@@ -498,10 +505,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ _clientID = address.getHostName() + System.currentTimeMillis();
// Create a connection to the broker.
- createConnection(clientID);
+ createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
_producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
@@ -509,7 +516,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create the destinations to send pings to and receive replies from.
_replyDestination = _consumerSession.createTemporaryQueue();
- createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique);
+ createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
if (producer)
@@ -548,7 +555,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
try
{
- Properties options = processCommandLine(args);
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -577,43 +584,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
/**
- * Extracts all name=value pairs from the command line, sets them all as system properties and also returns
- * a map of properties containing them.
- *
- * @param args The command line.
- *
- * @return A set of properties containing all name=value pairs from the command line.
- *
- * @todo This is a commonly used piece of code. Make it accept a command line definition and move it into the
- * CommandLineParser class.
- */
- protected static Properties processCommandLine(String[] args)
- {
- // Use the command line parser to evaluate the command line.
- CommandLineParser commandLine = new CommandLineParser(new String[][] {});
-
- // Capture the command line arguments or display errors and correct usage and then exit.
- Properties options = null;
-
- try
- {
- options = commandLine.parseCommandLine(args);
-
- // Add all the trailing command line options (name=value pairs) to system properties. Tests may pick up
- // overridden values from there.
- commandLine.addCommandLineToSysProperties();
- }
- catch (IllegalArgumentException e)
- {
- System.out.println(commandLine.getErrors());
- System.out.println(commandLine.getUsage());
- System.exit(1);
- }
-
- return options;
- }
-
- /**
* Convenience method for a short pause.
*
* @param sleepTime The time in milliseconds to pause for.
@@ -677,11 +647,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
- public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
+ boolean durable) throws JMSException, AMQException
{
log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + "): called");
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
@@ -709,13 +680,30 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- log.debug("Created topic " + destination);
+ if (!durable)
+ {
+ destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ log.debug("Created non-durable topic " + destination);
+ }
+ else
+ {
+ destination =
+ AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+ _clientID, (AMQConnection) _connection);
+ log.debug("Created durable topic " + destination);
+ }
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ AMQShortString destinationName = new AMQShortString(rootName + id);
+ destination =
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+ _isDurable);
+ ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
+ ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+
log.debug("Created queue " + destination);
}
|