summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-06-27 15:34:57 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-06-27 15:34:57 +0000
commit810d80bbafb2b4826710a3e9388df89adfa220af (patch)
tree496d88aa1d2330e2bc5549ed9c7011cfe77fb012
parenta5f95fd26f628f6ae45bd276505aaeb5177a1949 (diff)
downloadqpid-python-810d80bbafb2b4826710a3e9388df89adfa220af.tar.gz
Merged revisions 549530-550509 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r549530 | rupertlssmith | 2007-06-21 17:14:03 +0100 (Thu, 21 Jun 2007) | 1 line Added minimal checkstyle to project reports. Fixed some problems with site generation. ........ r549849 | rupertlssmith | 2007-06-22 16:39:27 +0100 (Fri, 22 Jun 2007) | 1 line Added Immediate and Mandatory message tests. ........ r550509 | ritchiem | 2007-06-25 15:16:30 +0100 (Mon, 25 Jun 2007) | 1 line Update to provide a SustainedTestCase, this sends batches of messages to the broker. The rate of publication is regulated by the average consume rate advertised by all connected clients. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@551199 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/pom.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java51
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java82
-rw-r--r--qpid/java/client-java14/pom.xml1
-rw-r--r--qpid/java/client/example/pom.xml1
-rw-r--r--qpid/java/common/pom.xml1
-rw-r--r--qpid/java/distribution/pom.xml1
-rw-r--r--qpid/java/etc/coding_standards.xml117
-rw-r--r--qpid/java/etc/license_header.txt20
-rw-r--r--qpid/java/integrationtests/pom.xml7
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java9
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java9
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java62
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java5
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java5
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java5
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java582
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java9
-rw-r--r--qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java10
-rw-r--r--qpid/java/management/eclipse-plugin/pom.xml46
-rw-r--r--qpid/java/perftests/pom.xml1
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java37
-rw-r--r--qpid/java/pom.xml67
-rw-r--r--qpid/java/systests/distribution/pom.xml1
-rw-r--r--qpid/java/systests/pom.xml6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java686
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java135
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java282
28 files changed, 1929 insertions, 310 deletions
diff --git a/qpid/java/broker/pom.xml b/qpid/java/broker/pom.xml
index bad0d8a52d..81c5d22b22 100644
--- a/qpid/java/broker/pom.xml
+++ b/qpid/java/broker/pom.xml
@@ -30,6 +30,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
index 609a85c22f..988f589339 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
@@ -1,31 +1,35 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.txn;
-import java.util.List;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.StoreContext;
+import java.util.List;
+
/**
* @author Apache Software Foundation
*/
@@ -44,33 +48,26 @@ public class CleanupMessageOperation implements TxnOp
}
public void prepare(StoreContext context) throws AMQException
- {
- }
+ { }
public void undoPrepare()
{
- //don't need to do anything here, if the store's txn failed
- //when processing prepare then the message was not stored
- //or enqueued on any queues and can be discarded
+ // don't need to do anything here, if the store's txn failed
+ // when processing prepare then the message was not stored
+ // or enqueued on any queues and can be discarded
}
public void commit(StoreContext context)
{
-
- try
+ // No-op can't be done here has this is before the message has been attempted to be delivered.
+ /*try
{
_msg.checkDeliveredToConsumer();
}
catch (NoConsumersException e)
{
- //TODO: store this for delivery after the commit-ok
_returns.add(e);
- }
- catch (AMQException e)
- {
- _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " +
- e, e);
- }
+ }*/
}
public void rollback(StoreContext context)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
index 93459beb45..4e684098d0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
@@ -1,26 +1,27 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.txn;
-import java.util.LinkedList;
-import java.util.List;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.TxAck;
@@ -28,9 +29,13 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import java.util.LinkedList;
+import java.util.List;
+
/** A transactional context that only supports local transactions. */
public class LocalTransactionalContext implements TransactionalContext
{
@@ -54,6 +59,7 @@ public class LocalTransactionalContext implements TransactionalContext
private boolean _inTran = false;
+ /** Are there messages to deliver. NOT Has the message been delivered */
private boolean _messageDelivered = false;
private static class DeliveryDetails
@@ -62,7 +68,6 @@ public class LocalTransactionalContext implements TransactionalContext
public AMQQueue queue;
private boolean deliverFirst;
-
public DeliveryDetails(AMQMessage message, AMQQueue queue, boolean deliverFirst)
{
this.message = message;
@@ -72,15 +77,14 @@ public class LocalTransactionalContext implements TransactionalContext
}
public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
- List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages)
{
_messageStore = messageStore;
_storeContext = storeContext;
_returnMessages = returnMessages;
- //_txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ // _txnBuffer.enlist(new StoreMessageOperation(messageStore));
}
-
public StoreContext getStoreContext()
{
return _storeContext;
@@ -90,11 +94,12 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.rollback(_storeContext);
// Hack to deal with uncommitted non-transactional writes
- if(_messageStore.inTran(_storeContext))
+ if (_messageStore.inTran(_storeContext))
{
_messageStore.abortTran(_storeContext);
_inTran = false;
}
+
_postCommitDeliveryList.clear();
}
@@ -106,7 +111,7 @@ public class LocalTransactionalContext implements TransactionalContext
// be added for every queue onto which the message is
// enqueued. Finally a cleanup op will be added to decrement
// the reference associated with the routing.
-// message.incrementReference();
+ // message.incrementReference();
_postCommitDeliveryList.add(new DeliveryDetails(message, queue, deliverFirst));
_messageDelivered = true;
_txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
@@ -119,7 +124,7 @@ public class LocalTransactionalContext implements TransactionalContext
message.incrementReference();
_messageDelivered = true;
- */
+ */
}
private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
@@ -131,16 +136,16 @@ public class LocalTransactionalContext implements TransactionalContext
}
public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
- UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
{
- //check that the tag exists to give early failure
- if (!multiple || deliveryTag > 0)
+ // check that the tag exists to give early failure
+ if (!multiple || (deliveryTag > 0))
{
checkAck(deliveryTag, unacknowledgedMessageMap);
}
- //we use a single txn op for all acks and update this op
- //as new acks come in. If this is the first ack in the txn
- //we will need to create and enlist the op.
+ // we use a single txn op for all acks and update this op
+ // as new acks come in. If this is the first ack in the txn
+ // we will need to create and enlist the op.
if (_ackOp == null)
{
beginTranIfNecessary();
@@ -148,7 +153,7 @@ public class LocalTransactionalContext implements TransactionalContext
_txnBuffer.enlist(_ackOp);
}
// update the op to include this ack request
- if (multiple && deliveryTag == 0)
+ if (multiple && (deliveryTag == 0))
{
// if have signalled to ack all, that refers only
// to all at this time
@@ -178,6 +183,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_log.debug("Starting transaction on message store: " + this);
}
+
_messageStore.beginTran(_storeContext);
_inTran = true;
}
@@ -189,12 +195,13 @@ public class LocalTransactionalContext implements TransactionalContext
{
_log.debug("Committing transactional context: " + this);
}
+
if (_ackOp != null)
{
_messageDelivered = true;
_ackOp.consolidate();
- //already enlisted, after commit will reset regardless of outcome
+ // already enlisted, after commit will reset regardless of outcome
_ackOp = null;
}
@@ -202,7 +209,7 @@ public class LocalTransactionalContext implements TransactionalContext
{
_txnBuffer.enlist(new StoreMessageOperation(_messageStore));
}
- //fixme fail commit here ... QPID-440
+ // fixme fail commit here ... QPID-440
try
{
_txnBuffer.commit(_storeContext);
@@ -215,7 +222,7 @@ public class LocalTransactionalContext implements TransactionalContext
try
{
- postCommitDelivery();
+ postCommitDelivery(_returnMessages);
}
catch (AMQException e)
{
@@ -224,23 +231,32 @@ public class LocalTransactionalContext implements TransactionalContext
}
}
- private void postCommitDelivery() throws AMQException
+ private void postCommitDelivery(List<RequiredDeliveryException> returnMessages) throws AMQException
{
if (_log.isDebugEnabled())
{
_log.debug("Performing post commit delivery");
}
+
try
{
for (DeliveryDetails dd : _postCommitDeliveryList)
{
dd.queue.process(_storeContext, dd.message, dd.deliverFirst);
+
+ try
+ {
+ dd.message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException nce)
+ {
+ returnMessages.add(nce);
+ }
}
}
finally
{
_postCommitDeliveryList.clear();
}
-
}
}
diff --git a/qpid/java/client-java14/pom.xml b/qpid/java/client-java14/pom.xml
index 90a49e7d6c..db1644c5b5 100644
--- a/qpid/java/client-java14/pom.xml
+++ b/qpid/java/client-java14/pom.xml
@@ -32,6 +32,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/client/example/pom.xml b/qpid/java/client/example/pom.xml
index 50680666e1..1d48b3afbe 100644
--- a/qpid/java/client/example/pom.xml
+++ b/qpid/java/client/example/pom.xml
@@ -31,6 +31,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/common/pom.xml b/qpid/java/common/pom.xml
index aaa9a556e8..a16573e066 100644
--- a/qpid/java/common/pom.xml
+++ b/qpid/java/common/pom.xml
@@ -31,6 +31,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/distribution/pom.xml b/qpid/java/distribution/pom.xml
index 366b478687..8774b04c18 100644
--- a/qpid/java/distribution/pom.xml
+++ b/qpid/java/distribution/pom.xml
@@ -31,6 +31,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/etc/coding_standards.xml b/qpid/java/etc/coding_standards.xml
new file mode 100644
index 0000000000..00b1a9516a
--- /dev/null
+++ b/qpid/java/etc/coding_standards.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.1//EN" "http://www.puppycrawl.com/dtds/configuration_1_1.dtd">
+<module name="Checker">
+ <!-- Checks package.html defined for all packages. -->
+ <module name="PackageHtml"/>
+
+ <module name="TreeWalker">
+
+ <!-- Whitespace conventions. -->
+ <module name="TabCharacter"/>
+
+ <!-- License conventions. Checks that the license is included in every file. -->
+ <module name="Header">
+ <property name="headerFile" value="${checkstyle.header.file}"/>
+ </module>
+
+ <!-- Coding style conventions. -->
+ <module name="com.puppycrawl.tools.checkstyle.checks.coding.PackageDeclarationCheck">
+ <property name="severity" value="error"/>
+ </module>
+
+ <!-- These rules ensure that everything is javadoc'ed. -->
+ <!--
+ <module name="RequiredRegexp">
+ <property name="format" value="&lt;table id=&quot;crc&quot;&gt;&lt;caption&gt;CRC Card&lt;/caption&gt;"/>
+ </module>
+ -->
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocTypeCheck">
+ <property name="excludeScope" value="nothing"/>
+ <property name="scope" value="private"/>
+ <property name="severity" value="error"/>
+ <property name="tokens" value="CLASS_DEF, INTERFACE_DEF"/>
+ <property name="allowMissingParamTags" value="true"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocVariableCheck">
+ <property name="excludeScope" value="nothing"/>
+ <property name="scope" value="private"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.javadoc.JavadocMethodCheck">
+ <property name="allowMissingParamTags" value="false"/>
+ <property name="allowMissingReturnTag" value="false"/>
+ <property name="allowMissingThrowsTags" value="false"/>
+ <property name="allowThrowsTagsForSubclasses" value="false"/>
+ <property name="allowUndeclaredRTE" value="true"/>
+ <property name="allowMissingJavadoc" value="false"/>
+ <property name="allowMissingPropertyJavadoc" value="true"/>
+ <property name="excludeScope" value="nothing"/>
+ <property name="scope" value="private"/>
+ <property name="severity" value="error"/>
+ <property name="tokens" value="METHOD_DEF, CTOR_DEF"/>
+ </module>
+
+ <module name="JavadocStyle">
+ <property name="scope" value="private"/>
+ <property name="checkHtml" value="false"/>
+ <property name="checkFirstSentence" value="true"/>
+ <property name="checkEmptyJavadoc" value="true"/>
+ </module>
+
+ <!-- These rules enforce the conventions for the naming of variables. -->
+ <!--
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.ConstantNameCheck">
+ <property name="format" value="^log$|^[A-Z](_?[A-Z0-9]+)*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.LocalFinalVariableNameCheck">
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.LocalVariableNameCheck">
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.MemberNameCheck">
+ <property name="applyToPackage" value="true"/>
+ <property name="applyToPrivate" value="true"/>
+ <property name="applyToProtected" value="true"/>
+ <property name="applyToPublic" value="true"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.MethodNameCheck">
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.PackageNameCheck">
+ <property name="format" value="^[a-z]+(\.[a-zA-Z_][a-zA-Z0-9_]*)*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.ParameterNameCheck">
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.StaticVariableNameCheck">
+ <property name="format" value="^[a-z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ </module>
+
+ <module name="com.puppycrawl.tools.checkstyle.checks.naming.TypeNameCheck">
+ <property name="format" value="^[A-Z][a-zA-Z0-9_]*$"/>
+ <property name="severity" value="error"/>
+ <property name="tokens" value="CLASS_DEF, INTERFACE_DEF"/>
+ </module>
+ -->
+ </module>
+</module>
diff --git a/qpid/java/etc/license_header.txt b/qpid/java/etc/license_header.txt
new file mode 100644
index 0000000000..02ee6e8f98
--- /dev/null
+++ b/qpid/java/etc/license_header.txt
@@ -0,0 +1,20 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */ \ No newline at end of file
diff --git a/qpid/java/integrationtests/pom.xml b/qpid/java/integrationtests/pom.xml
index 3afdf48204..9ccd153f54 100644
--- a/qpid/java/integrationtests/pom.xml
+++ b/qpid/java/integrationtests/pom.xml
@@ -31,6 +31,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
@@ -45,6 +46,12 @@
<artifactId>qpid-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
index 31de84e630..d2042be741 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/coordinator/CoordinatingTestCase.java
@@ -1,4 +1,3 @@
-/* Copyright Rupert Smith, 2005 to 2006, all rights reserved. */
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,16 +20,16 @@
*/
package org.apache.qpid.interop.coordinator;
-import java.util.Map;
-
-import javax.jms.*;
-
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.util.ConversationFactory;
+import javax.jms.*;
+
+import java.util.Map;
+
/**
* A CoordinatingTestCase is a JUnit test case extension that knows how to coordinate test clients that take part in a
* test case as defined in the interop testing specification
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
index 9f769822de..37952d08c8 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/InteropClientTestCase.java
@@ -79,12 +79,19 @@ public interface InteropClientTestCase extends MessageListener
/**
* Performs the test case actions.
- *
+ * return from here when you have finished the test.. this will signal the controller that the test has ended.
* @throws JMSException Any JMSException resulting from reading the message are allowed to fall through.
*/
public void start() throws JMSException;
/**
+ * Gives notice of termination of the test case actions.
+ *
+ * @throws JMSException Any JMSException resulting from allowed to fall through.
+ */
+ public void terminate() throws JMSException, InterruptedException;
+
+ /**
* Gets a report on the actions performed by the test case in its assigned role.
*
* @param session The session to create the report message in.
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
index 6cca23446f..a82b05e20f 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
@@ -20,23 +20,31 @@
*/
package org.apache.qpid.interop.testclient;
-import java.io.IOException;
-import java.util.*;
-
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.interop.testclient.testcases.TestCase1DummyRun;
import org.apache.qpid.interop.testclient.testcases.TestCase2BasicP2P;
-import org.apache.qpid.interop.testclient.testcases.TestCase3BasicPubSub;
-import org.apache.qpid.util.ClasspathScanner;
import org.apache.qpid.util.CommandLineParser;
import org.apache.qpid.util.PropertiesUtils;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Implements a test client as described in the interop testing spec
* (http://cwiki.apache.org/confluence/display/qpid/Interop+Testing+Specification). A test client is an agent that
@@ -201,7 +209,7 @@ public class TestClient implements MessageListener
}
// Open a connection to communicate with the coordinator on.
- _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, brokerUrl, virtualHost);
+ _connection = createConnection(DEFAULT_CONNECTION_PROPS_RESOURCE, clientName, brokerUrl, virtualHost);
session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -219,17 +227,21 @@ public class TestClient implements MessageListener
_connection.start();
}
+
+ public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ {
+ return createConnection(connectionPropsResource, "clientID", brokerUrl, virtualHost);
+ }
+
/**
* Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
- * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
- * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
- * handler.
- *
- * @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it
- * to a Utils library class.
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate that
+ * the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure handler.
*
* @param connectionPropsResource The name of the connection properties file.
- * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the properties.
+ * @param clientID
+ * @param brokerUrl The broker url to connect to, <tt>null</tt> to use the default from the
+ * properties.
* @param virtualHost The virtual host to connectio to, <tt>null</tt> to use the default.
*
* @return A JMS conneciton.
@@ -237,7 +249,7 @@ public class TestClient implements MessageListener
* @todo Make username/password configurable. Allow multiple urls for fail over. Once it feels right, move it to a
* Utils library class.
*/
- public static Connection createConnection(String connectionPropsResource, String brokerUrl, String virtualHost)
+ public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
{
log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
+ ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
@@ -251,7 +263,7 @@ public class TestClient implements MessageListener
if (brokerUrl != null)
{
String connectionString =
- "amqp://guest:guest/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
+ "amqp://guest:guest@" + clientID + "/" + ((virtualHost != null) ? virtualHost : "") + "?brokerlist='" + brokerUrl + "'";
connectionProps.setProperty(CONNECTION_PROPERTY, connectionString);
}
@@ -381,6 +393,14 @@ public class TestClient implements MessageListener
{
log.info("Received termination instruction from coordinator.");
+// try
+// {
+// currentTestCase.terminate();
+// }
+// catch (InterruptedException e)
+// {
+// //
+// }
// Is a cleaner shutdown needed?
_connection.close();
System.exit(0);
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
index 85b89172bb..5f257c0b36 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase1DummyRun.java
@@ -74,6 +74,11 @@ public class TestCase1DummyRun implements InteropClientTestCase
// Do nothing.
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
public Message getReport(Session session) throws JMSException
{
log.debug("public Message getReport(Session session): called");
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
index ea62b46451..ff56ee9b93 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase2BasicP2P.java
@@ -170,6 +170,11 @@ public class TestCase2BasicP2P implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
index 223c4916bf..7b35142c82 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/testcases/TestCase3BasicPubSub.java
@@ -202,6 +202,11 @@ public class TestCase3BasicPubSub implements InteropClientTestCase
}
}
+ public void terminate() throws JMSException, InterruptedException
+ {
+ //todo
+ }
+
/**
* Gets a report on the actions performed by the test case in its assigned role.
*
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
index cabe73e331..1597da6dba 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
@@ -38,6 +38,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
/**
* Implements test case 3, basic pub/sub. Sends/received a specified number of messages to a specified route on the
@@ -52,7 +53,10 @@ import java.util.Map;
public class SustainedTestClient extends TestCase3BasicPubSub implements ExceptionListener
{
/** Used for debugging. */
- private static final Logger log = Logger.getLogger(SustainedTestClient.class);
+ private static final Logger debugLog = Logger.getLogger(SustainedTestClient.class);
+
+ private static final Logger log = Logger.getLogger("SustainedTest");
+
/** The role to be played by the test. */
private Roles role;
@@ -83,9 +87,11 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
SustainedRateAdapter _rateAdapter;
/** */
- int updateInterval;
+ int _batchSize;
+
- private boolean _running = true;
+ private static final long TEN_MILLI_SEC = 10000000;
+ private static final long FIVE_MILLI_SEC = 5000000;
/**
* Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -95,7 +101,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public String getName()
{
- log.debug("public String getName(): called");
+ debugLog.debug("public String getName(): called");
return "Perf_SustainedPubSub";
}
@@ -111,31 +117,34 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public void assignRole(Roles role, Message assignRoleMessage) throws JMSException
{
- log.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
- + "): called");
+ debugLog.debug("public void assignRole(Roles role = " + role + ", Message assignRoleMessage = " + assignRoleMessage
+ + "): called");
// Take note of the role to be played.
this.role = role;
// Extract and retain the test parameters.
numReceivers = assignRoleMessage.getIntProperty("SUSTAINED_NUM_RECEIVERS");
- updateInterval = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
+ _batchSize = assignRoleMessage.getIntProperty("SUSTAINED_UPDATE_INTERVAL");
String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
- log.debug("numReceivers = " + numReceivers);
- log.debug("updateInterval = " + updateInterval);
- log.debug("ackMode = " + ackMode);
- log.debug("sendKey = " + sendKey);
- log.debug("sendUpdateKey = " + sendUpdateKey);
- log.debug("role = " + role);
+ if (debugLog.isDebugEnabled())
+ {
+ debugLog.debug("numReceivers = " + numReceivers);
+ debugLog.debug("_batchSize = " + _batchSize);
+ debugLog.debug("ackMode = " + ackMode);
+ debugLog.debug("sendKey = " + sendKey);
+ debugLog.debug("sendUpdateKey = " + sendUpdateKey);
+ debugLog.debug("role = " + role);
+ }
switch (role)
{
// Check if the sender role is being assigned, and set up a single message producer if so.
case SENDER:
- log.info("*********** Creating SENDER");
+ log.info("Creating Sender");
// Create a new connection to pass the test messages on.
connection = new Connection[1];
session = new Session[1];
@@ -164,7 +173,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
// Otherwise the receiver role is being assigned, so set this up to listen for messages on the required number
// of receiver connections.
case RECEIVER:
- log.info("*********** Creating RECEIVER");
+ log.info("Creating Receiver");
// Create the required number of receiver connections.
connection = new Connection[numReceivers];
session = new Session[numReceivers];
@@ -183,7 +192,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
MessageConsumer consumer = session[i].createConsumer(sendDestination);
- consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, updateInterval, session[i], sendUpdateDestination));
+ consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
}
break;
@@ -196,29 +205,32 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
+
/** Performs the test case actions. */
public void start() throws JMSException
{
- log.debug("public void start(): called");
+ debugLog.debug("public void start(): called");
// Check that the sender role is being performed.
switch (role)
{
// Check if the sender role is being assigned, and set up a single message producer if so.
case SENDER:
- Message testMessage = session[0].createTextMessage("test");
-
-// for (int i = 0; i < numMessages; i++)
- while (_running)
- {
- producer.send(testMessage);
-
- _rateAdapter.sentMessage();
- }
+ _rateAdapter.run();
break;
case RECEIVER:
}
+
+ //return from here when you have finished the test.. this will signal the controller and
+ }
+
+ public void terminate() throws JMSException, InterruptedException
+ {
+ if (_rateAdapter != null)
+ {
+ _rateAdapter.stop();
+ }
}
/**
@@ -232,7 +244,7 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
*/
public Message getReport(Session session) throws JMSException
{
- log.debug("public Message getReport(Session session): called");
+ debugLog.debug("public Message getReport(Session session): called");
// Close the test connections.
for (int i = 0; i < connection.length; i++)
@@ -252,89 +264,100 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (linked != null)
{
- if (linked instanceof AMQNoRouteException)
+ if (debugLog.isDebugEnabled())
{
- log.warn("No route .");
+ debugLog.debug("Linked Exception:" + linked);
}
- else if (linked instanceof AMQNoConsumersException)
- {
- log.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
- }
- else
+ if ((linked instanceof AMQNoRouteException)
+ || (linked instanceof AMQNoConsumersException))
{
+ if (debugLog.isDebugEnabled())
+ {
+ if (linked instanceof AMQNoConsumersException)
+ {
+ debugLog.warn("No clients currently available for message:" + ((AMQNoConsumersException) linked).getUndeliveredMessage());
+ }
+ else
+ {
+ debugLog.warn("No route for message");
+ }
+ }
- log.warn("LinkedException:" + linked);
+ // Tell the rate adapter that there are no clients ready yet
+ _rateAdapter.NO_CLIENTS = true;
}
-
- _rateAdapter.NO_CLIENTS = true;
}
else
{
- log.warn("Exception:" + linked);
+ debugLog.warn("Exception:" + linked);
}
}
+ /**
+ * Inner class that listens for messages and sends a report for the time taken between receiving the 'start' and
+ * 'end' messages.
+ */
class SustainedListener implements MessageListener
{
- private int _received = 0;
- private int _updateInterval = 0;
- private Long _time;
+ /** Number of messages received */
+ private long _received = 0;
+ /** The number of messages in the batch */
+ private int _batchSize = 0;
+ /** Record of the when the 'start' messagse was sen */
+ private Long _startTime;
+ /** Message producer to use to send reports */
MessageProducer _updater;
+ /** Session to create the report message on */
Session _session;
+ /** Record of the client ID used for this SustainedListnener */
String _client;
- public SustainedListener(String clientname, int updateInterval, Session session, Destination sendDestination) throws JMSException
+ /**
+ * Main Constructor
+ *
+ * @param clientname The _client id used to identify this connection.
+ * @param batchSize The number of messages that are to be sent per batch. Note: This is not used to
+ * control the interval between sending reports.
+ * @param session The session used for communication.
+ * @param sendDestination The destination that update reports should be sent to.
+ *
+ * @throws JMSException My occur if creatingthe Producer fails
+ */
+ public SustainedListener(String clientname, int batchSize, Session session, Destination sendDestination) throws JMSException
{
- _updateInterval = updateInterval;
+ _batchSize = batchSize;
_client = clientname;
_session = session;
_updater = session.createProducer(sendDestination);
}
- public void setReportInterval(int reportInterval)
- {
- _updateInterval = reportInterval;
- _received = 0;
- }
-
public void onMessage(Message message)
{
- if (log.isDebugEnabled())
+ if (debugLog.isTraceEnabled())
{
- log.debug("Message " + _received + "received in listener");
+ debugLog.trace("Message " + _received + "received in listener");
}
+
if (message instanceof TextMessage)
{
-
try
{
- if (((TextMessage) message).getText().equals("test"))
+ _received++;
+ if (((TextMessage) message).getText().equals("start"))
{
- if (_received == 0)
- {
- _time = System.nanoTime();
- sendStatus(0, _received);
- }
-
- _received++;
-
- if (_received % _updateInterval == 0)
+ debugLog.info("Starting Batch");
+ _startTime = System.nanoTime();
+ }
+ else if (((TextMessage) message).getText().equals("end"))
+ {
+ if (_startTime != null)
{
- Long currentTime = System.nanoTime();
-
- try
- {
- sendStatus(currentTime - _time, _received);
- _time = currentTime;
- }
- catch (JMSException e)
- {
- log.error("Unable to send update.");
- }
+ long currentTime = System.nanoTime();
+ sendStatus(currentTime - _startTime, _received);
+ debugLog.info("End Batch");
}
-
}
}
catch (JMSException e)
@@ -342,37 +365,68 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
//ignore error
}
}
+
}
- private void sendStatus(long time, int received) throws JMSException
+ /**
+ * sendStatus creates and sends the report back to the publisher
+ *
+ * @param time taken for the the last batch
+ * @param received Total number of messages received.
+ *
+ * @throws JMSException if an error occurs during the send
+ */
+ private void sendStatus(long time, long received) throws JMSException
{
Message updateMessage = _session.createTextMessage("update");
- updateMessage.setStringProperty("CLIENT_ID", _client);
+ updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
updateMessage.setLongProperty("RECEIVED", received);
updateMessage.setLongProperty("DURATION", time);
- log.info("**** SENDING **** CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ }
+
+ // Output on the main log.info the details of this batch
+ if (received / _batchSize % 10 == 0)
+ {
+ log.info("Sending Report [" + received / _batchSize + "] "
+ + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+ }
_updater.send(updateMessage);
}
-
}
- class SustainedRateAdapter implements MessageListener
+
+ /**
+ * This class is used here to adjust the _delay value which in turn is used to control the number of messages/second
+ * that are sent through the test system.
+ *
+ * By keeping a record of the messages recevied and the average time taken to process the batch size can be
+ * calculated and so the delay can be adjusted to maintain that rate.
+ *
+ * Given that delays of < 10ms can be rounded up the delay is only used between messages if the _delay > 10ms * no
+ * messages in the batch. Otherwise the delay is used at the end of the batch.
+ */
+ class SustainedRateAdapter implements MessageListener, Runnable
{
private SustainedTestClient _client;
- private long _variance = 250; //no. messages to allow drifting
+ private long _messageVariance = 500; //no. messages to allow drifting
+ private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
private volatile long _delay; //in nanos
private long _sent;
private Map<String, Long> _slowClients = new HashMap<String, Long>();
- private static final long PAUSE_SLEEP = 10; // 10 ms
- private static final long NO_CLIENT_SLEEP = 1000; // 1s
- private static final long MAX_MESSAGE_DRIFT = 1000; // no messages drifted from producer
+ private static final long PAUSE_SLEEP = TEN_MILLI_SEC / 1000; // 10 ms
+ private static final long NO_CLIENT_SLEEP = 1000; // 1s
private volatile boolean NO_CLIENTS = true;
private int _delayShifting;
- private static final int REPORTS_WITHOUT_CHANGE = 10;
- private static final double MAXIMUM_DELAY_SHIFT = .02; //2%
+ private static final int REPORTS_WITHOUT_CHANGE = 5;
+ private boolean _warmedup = false;
+ private static final long EXPECTED_TIME_PER_BATCH = 100000L;
SustainedRateAdapter(SustainedTestClient client)
{
@@ -381,9 +435,9 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
public void onMessage(Message message)
{
- if (log.isDebugEnabled())
+ if (debugLog.isDebugEnabled())
{
- log.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
+ debugLog.debug("SustainedRateAdapter onMessage(Message message = " + message + "): called");
}
try
@@ -395,15 +449,25 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
{
NO_CLIENTS = false;
long duration = message.getLongProperty("DURATION");
- long received = message.getLongProperty("RECEIVED");
+ long totalReceived = message.getLongProperty("RECEIVED");
String client = message.getStringProperty("CLIENT_ID");
- log.info("**** SENDING **** CLIENT_ID:" + client + " RECEIVED:" + received + " DURATION:" + duration);
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+ }
+
+ recordSlow(client, totalReceived);
+ adjustDelay(client, totalReceived, duration);
- recordSlow(client, received);
- adjustDelay(client, received, duration);
+ if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+ {
+ _warmedup = true;
+ _warmup.countDown();
+
+ }
}
}
catch (JMSException e)
@@ -412,72 +476,220 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
- class Pair<X, Y>
+ CountDownLatch _warmup = new CountDownLatch(1);
+
+ int _warmUpBatches = 20;
+
+ int _numBatches = 10000;
+
+ // long[] _timings = new long[_numBatches];
+ private boolean _running = true;
+
+
+ public void run()
+ {
+ log.info("Warming up");
+
+ doBatch(_warmUpBatches);
+
+ try
+ {
+ //wait for warmup to complete.
+ _warmup.await();
+
+ //set delay to the average length of the batches
+ _delay = _totalDuration / _warmUpBatches / delays.size();
+
+ log.info("Warmup complete delay set : " + _delay
+ + " based on _totalDuration: " + _totalDuration
+ + " over no. batches: " + _warmUpBatches
+ + " with client count: " + delays.size());
+
+ _totalDuration = 0L;
+ _totalReceived = 0L;
+ _sent = 0L;
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+
+ doBatch(_numBatches);
+
+ }
+
+ private void doBatch(int batchSize) // long[] timings,
{
- X item1;
- Y item2;
+ TextMessage testMessage = null;
+ try
+ {
+ testMessage = _client.session[0].createTextMessage("start");
+
- Pair(X i1, Y i2)
+ for (int batch = 0; batch < batchSize; batch++)
+// while (_running)
+ {
+ long start = System.nanoTime();
+
+ testMessage.setText("start");
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ testMessage.setText("test");
+ //start at 2 so start and end count as part of batch
+ for (int m = 2; m < _batchSize; m++)
+ {
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+ }
+
+ testMessage.setText("end");
+ _client.producer.send(testMessage);
+ _rateAdapter.sentMessage();
+
+ long end = System.nanoTime();
+
+ long sendtime = end - start;
+
+ debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+
+ if (batch % 10 == 0)
+ {
+ log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
+ }
+
+ _rateAdapter.sleepBatch();
+
+ }
+ }
+ catch (JMSException e)
{
- item1 = i1;
- item2 = i2;
+ log.error("Runner ended");
}
+ }
- X getItem1()
+ private String status()
+ {
+ return " TotalDuration: " + _totalDuration + " for " + delays.size() + " consumers"
+ + " Delay is " + _delay + " resulting in "
+ + ((_delay > TEN_MILLI_SEC * _batchSize) ? (_delay / _batchSize) + "/msg" : _delay + "/batch");
+ }
+
+ private void sleepBatch()
+ {
+ if (checkForSlowClients())
+ {//if there werwe slow clients we have already slept so don't sleep anymore again.
+ return;
+ }
+
+ //Slow down if gap between send and received is too large
+ if (_sent - _totalReceived / delays.size() > _messageVariance)
{
- return item1;
+ //pause between batches.
+ debugLog.info("Sleeping to keep sent in check with received");
+ log.debug("Increaseing _delay as sending more than receiving");
+ _delay += TEN_MILLI_SEC;
}
- Y getItem2()
+ //per batch sleep.. if sleep is to small to spread over the batch.
+ if (_delay <= TEN_MILLI_SEC * _batchSize)
{
- return item2;
+ sleepLong(_delay);
+ }
+ else
+ {
+ debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
}
}
- Map<String, Pair<Long, Long>> delays = new HashMap<String, Pair<Long, Long>>();
- Long totalReceived = 0L;
- Long totalDuration = 0L;
+ public void stop()
+ {
+ _running = false;
+ }
- private void adjustDelay(String client, long received, long duration)
+ Map<String, Long> delays = new HashMap<String, Long>();
+ Long _totalReceived = 0L;
+ Long _totalDuration = 0L;
+ int _skipUpdate = 0;
+
+ /**
+ * Adjust the delay for sending messages based on this update from the client
+ *
+ * @param client The client that send this update
+ * @param totalReceived The number of messages that this client has received.
+ * @param duration The time taken for the last batch of messagse
+ */
+ private void adjustDelay(String client, long totalReceived, long duration)
{
- Pair<Long, Long> current = delays.get(client);
+ //Retrieve the current total time taken for this client.
+ Long currentTime = delays.get(client);
- if (current == null)
+ // Add the new duration time to this client
+ if (currentTime == null)
{
- delays.put(client, new Pair<Long, Long>(received, duration));
+ currentTime = duration;
}
else
{
- //reduce totals
- totalReceived -= current.getItem1();
- totalDuration -= current.getItem2();
+ currentTime += duration;
}
- totalReceived += received;
- totalDuration += duration;
+ delays.put(client, currentTime);
+
+
+ _totalReceived += _batchSize;
+ _totalDuration += duration;
- long averageDuration = totalDuration / delays.size();
+ // Calculate the number of messages in the batch.
+ long batchCount = (_totalReceived / _batchSize);
- long diff = Math.abs(_delay - averageDuration);
+ //calculate average duration accross clients per batch
+ long averageDuration = _totalDuration / delays.size() / batchCount;
+
+ //calculate the difference between current send delay and average report delay
+ long diff = (duration) - averageDuration;
+
+ if (debugLog.isInfoEnabled())
+ {
+ debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
+ + " on batch: " + batchCount
+ + " Batch Duration: " + duration
+ + " Average: " + averageDuration
+ + " so diff: " + diff + " for : " + client
+ + " Delay is " + _delay + " resulting in "
+ + ((_delay > TEN_MILLI_SEC * _batchSize)
+ ? (_delay / _batchSize) + "/msg" : _delay + "/batch"));
+ }
//if the averageDuration differs from the current by more than the specified variane then adjust delay.
- if (diff > _variance)
+ if (Math.abs(diff) > _timeVariance)
{
- if (averageDuration > _delay)
+
+ // if the the _delay is larger than the required duration to send report
+ // speed up
+ if (diff > TEN_MILLI_SEC)
{
- // we can go faster
- _delay -= diff;
+ _delay -= TEN_MILLI_SEC;
+
if (_delay < 0)
{
_delay = 0;
+ debugLog.info("Reset _delay to 0");
+ delayStable();
+ }
+ else
+ {
+ delayChanged();
}
+
}
- else
+ else if (diff < 0) // diff < 0 diff cannot be 0 as it is > _timeVariance
{
- // we need to slow down
- _delay += diff;
+ // the report took longer
+ _delay += TEN_MILLI_SEC;
+ delayChanged();
}
- delayChanged();
}
else
{
@@ -486,11 +698,16 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
+ /** Reset the number of iterations before we say the delay has stabilised. */
private void delayChanged()
{
_delayShifting = REPORTS_WITHOUT_CHANGE;
}
+ /**
+ * Record the fact that delay has stabilised If delay has stablised for REPORTS_WITHOUT_CHANGE then it will
+ * output Delay stabilised
+ */
private void delayStable()
{
_delayShifting--;
@@ -498,14 +715,20 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
if (_delayShifting < 0)
{
_delayShifting = 0;
- log.info("Delay stabilised:" + _delay);
+ log.debug("Delay stabilised:" + _delay);
}
}
- // Record Slow clients
+ /**
+ * Checks that the client has received enough messages. If the client has fallen behind then they are put in the
+ * _slowClients lists which will increase the delay.
+ *
+ * @param client The client identifier to check
+ * @param received the number of messages received by that client
+ */
private void recordSlow(String client, long received)
{
- if (received < (_sent - _variance))
+ if (received < (_sent - _messageVariance))
{
_slowClients.put(client, received);
}
@@ -515,20 +738,49 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
}
}
+ /** Incrment the number of sent messages and then sleep, if required. */
public void sentMessage()
{
- if (_sent % updateInterval == 0)
+
+ _sent++;
+
+ if (_delay > TEN_MILLI_SEC * _batchSize)
{
+ long batchDelay = _delay / _batchSize;
+ // less than 10ms sleep doesn't always work.
+ // _delay is in nano seconds
+// if (batchDelay < (TEN_MILLI_SEC))
+// {
+// sleep(0, (int) batchDelay);
+// }
+// else
+ {
+// if (batchDelay < 30000000000L)
+ {
+ sleepLong(batchDelay);
+ }
+ }
+ }
+ }
+
+ /**
+ * Check at the end of each batch and pause sending messages to allow slow clients to catch up.
+ *
+ * @return true if there were slow clients that caught up.
+ */
+ private boolean checkForSlowClients()
+ {
+ if (_sent % _batchSize == 0)
+ {
// Cause test to pause when we have slow
if (!_slowClients.isEmpty() || NO_CLIENTS)
{
- log.info("Pausing for slow clients");
-
- //_delay <<= 1;
+ debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
while (!_slowClients.isEmpty())
{
+ debugLog.info(_slowClients.size() + " slow clients.");
sleep(PAUSE_SLEEP);
}
@@ -537,45 +789,67 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
sleep(NO_CLIENT_SLEEP);
}
- log.debug("Continuing");
- return;
+ debugLog.debug("Continuing");
+ return true;
}
else
{
- log.info("Delay:" + _delay);
+ debugLog.info("Delay:" + _delay);
}
+
}
- _sent++;
+ return false;
+ }
- if (_delay > 0)
- {
- // less than 10ms sleep doesn't work.
- // _delay is in nano seconds
- if (_delay < 1000000)
- {
- sleep(0, (int) _delay);
- }
- else
- {
- if (_delay < 30000000000L)
- {
- sleep(_delay / 1000000, (int) (_delay % 1000000));
- }
- }
- }
+ /**
+ * Sleep normally takes micro-seconds this allows the use of a nano-second value.
+ *
+ * @param delay nanoseconds to sleep for.
+ */
+ private void sleepLong(long delay)
+ {
+ sleep(delay / 1000000, (int) (delay % 1000000));
}
+ /**
+ * Sleep for the specified micro-seconds.
+ * @param sleep microseconds to sleep for.
+ */
private void sleep(long sleep)
{
sleep(sleep, 0);
}
+ /**
+ * Perform the sleep , swallowing any InteruptException.
+ *
+ * NOTE: If a sleep request is > 10s then reset only sleep for 5s
+ *
+ * @param milli to sleep for
+ * @param nano sub miliseconds to sleep for
+ */
private void sleep(long milli, int nano)
{
try
{
- log.debug("Sleep:" + milli + ":" + nano);
+ debugLog.debug("Sleep:" + milli + ":" + nano);
+ if (milli > 10000)
+ {
+
+ if (_delay == milli)
+ {
+ _totalDuration = _totalReceived / _batchSize * EXPECTED_TIME_PER_BATCH;
+ debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s. Reset _totalDuration:" + _totalDuration);
+ }
+ else
+ {
+ debugLog.error("Sleeping for more than 10 seconds adjusted to 5s!:" + milli / 1000 + "s");
+ }
+
+ milli = 5000;
+ }
+
Thread.sleep(milli, nano);
}
catch (InterruptedException e)
@@ -583,6 +857,12 @@ public class SustainedTestClient extends TestCase3BasicPubSub implements Excepti
//
}
}
+
+ public void setClient(SustainedTestClient client)
+ {
+ _client = client;
+ }
}
}
+
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
index 4081d87192..b437e165b4 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
@@ -78,11 +78,12 @@ public class SustainedTestCoordinator extends CoordinatingTestCase3BasicPubSub i
Map<String, Object> testConfig = new HashMap<String, Object>();
testConfig.put("TEST_NAME", "Perf_SustainedPubSub");
testConfig.put("SUSTAINED_KEY", SUSTAINED_KEY);
- //testConfig.put("SUSTAINED_MSG_RATE", 10);
- testConfig.put("SUSTAINED_NUM_RECEIVERS", 2);
- testConfig.put("SUSTAINED_UPDATE_INTERVAL", 25);
+ testConfig.put("SUSTAINED_NUM_RECEIVERS", Integer.getInteger("numReceives", 2));
+ testConfig.put("SUSTAINED_UPDATE_INTERVAL", Integer.getInteger("batchSize", 1000));
testConfig.put("SUSTAINED_UPDATE_KEY", SUSTAINED_KEY + ".UPDATE");
- testConfig.put("ACKNOWLEDGE_MODE", AMQSession.NO_ACKNOWLEDGE);
+ testConfig.put("ACKNOWLEDGE_MODE", Integer.getInteger("ackMode", AMQSession.AUTO_ACKNOWLEDGE));
+
+ log.info("Created Config: " + testConfig.entrySet().toArray());
sequenceTest(testConfig);
}
diff --git a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
index 4ca2fe8ff5..0090bec3d0 100644
--- a/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
+++ b/qpid/java/integrationtests/src/main/java/org/apache/qpid/util/ConversationFactory.java
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.util;
+import org.apache.log4j.Logger;
+
+import javax.jms.*;
+
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
/**
* A conversation helper, uses a message correlation id pattern to match up sent and received messages as a conversation
* over JMS messaging. Incoming message traffic is divided up by correlation id. Each id has a queue (behaviour dependant
@@ -153,7 +153,7 @@ public class ConversationFactory
* queue.
* @param queueClass The queue implementation class.
*
- * @throws JMSException All undelying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public ConversationFactory(Connection connection, Destination receiveDestination,
Class<? extends BlockingQueue> queueClass) throws JMSException
diff --git a/qpid/java/management/eclipse-plugin/pom.xml b/qpid/java/management/eclipse-plugin/pom.xml
index 4fbc8a0e3f..6637460822 100644
--- a/qpid/java/management/eclipse-plugin/pom.xml
+++ b/qpid/java/management/eclipse-plugin/pom.xml
@@ -15,39 +15,40 @@
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
- -->
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.qpid.management</groupId>
<artifactId>org.apache.qpid.management.ui</artifactId>
<packaging>jar</packaging>
<version>1.0-incubating-M2-SNAPSHOT</version>
<name>Qpid Management</name>
- <url>http://cwiki.apache.org/confluence/display/qpid</url>
<parent>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<properties>
- <topDirectoryLocation>../../</topDirectoryLocation>
+ <topDirectoryLocation>../..</topDirectoryLocation>
</properties>
<repositories>
- <repository>
+ <repository>
<id>repo1.maven.org</id>
<name>Maven eclipse Repository</name>
<url>http://repo1.maven.org/eclipse</url>
- </repository>
- <repository>
+ </repository>
+ <repository>
<id>apache.snapshots</id>
<name>Apache SNAPSHOT Repository</name>
<url>http://people.apache.org/repo/m2-snapshot-repository</url>
<snapshots>
- <enabled>true</enabled>
+ <enabled>true</enabled>
</snapshots>
</repository>
</repositories>
@@ -196,40 +197,42 @@
<directory>icons/</directory>
<targetPath>icons/</targetPath>
<includes>
- <include>**</include>
+ <include>**</include>
</includes>
</resource>
<resource>
<directory>icons/</directory>
<targetPath>/</targetPath>
<includes>
- <include>splash.bmp</include>
+ <include>splash.bmp</include>
</includes>
</resource>
<resource>
- <directory>${basedir}</directory>
- <targetPath>/</targetPath>
- <includes>
- <include>plugin.xml</include>
- <include>plugin.properties</include>
- </includes>
+ <directory>${basedir}</directory>
+ <targetPath>/</targetPath>
+ <includes>
+ <include>plugin.xml</include>
+ <include>plugin.properties</include>
+ </includes>
</resource>
- </resources>
+ </resources>
<plugins>
+ <!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
</plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
- <archive>
- <manifestFile>META-INF/MANIFEST.MF</manifestFile>
- </archive>
- <finalName>${artifactId}_${version}</finalName>
+ <archive>
+ <manifestFile>META-INF/MANIFEST.MF</manifestFile>
+ </archive>
+ <finalName>${artifactId}_${version}</finalName>
</configuration>
- </plugin>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -238,6 +241,7 @@
<skip>false</skip>
</configuration>
</plugin>
+ -->
</plugins>
</build>
diff --git a/qpid/java/perftests/pom.xml b/qpid/java/perftests/pom.xml
index d934fee7ec..77d70b7020 100644
--- a/qpid/java/perftests/pom.xml
+++ b/qpid/java/perftests/pom.xml
@@ -32,6 +32,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index 248034af9b..46333db844 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.ping;
-import javax.jms.*;
-
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -35,6 +33,8 @@ import uk.co.thebadgerset.junit.extensions.TestThreadAware;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+import javax.jms.*;
+
/**
* PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
* simultaneously to simluate many clients/producers/connections.
@@ -72,39 +72,6 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
super(name);
_logger.debug("testParameters = " + testParameters);
-
- // Sets up the test parameters with defaults.
- /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, PingPongProducer.MESSAGE_SIZE_DEAFULT);
- testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- PingPongProducer.PERSISTENT_MODE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, PingPongProducer.TRANSACTED_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, PingPongProducer.VERBOSE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, PingPongProducer.RATE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, PingPongProducer.PUBSUB_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, PingPongProducer.TX_BATCH_SIZE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, PingPongProducer.TIMEOUT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- PingPongProducer.DESTINATION_COUNT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, PingPongProducer.UNIQUE_DESTS_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, PingPongProducer.ACK_MODE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
}
/**
diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml
index c86206d90e..2e1a792c49 100644
--- a/qpid/java/pom.xml
+++ b/qpid/java/pom.xml
@@ -31,9 +31,9 @@ under the License.
<packaging>pom</packaging>
<scm>
- <connection>scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/trunk</connection>
- <developerConnection>scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/trunk</developerConnection>
- <url>http://svn.apache.org/viewvc/incubator/qpid/trunk/</url>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/branches/M2/java</connection>
+ <developerConnection>scm:svn:http://svn.apache.org/repos/asf/incubator/qpid/branches/M2/java</developerConnection>
+ <url>http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java</url>
</scm>
<prerequisites>
@@ -103,7 +103,7 @@ under the License.
<surefire.fork.mode>never</surefire.fork.mode>
<surefire.format>brief</surefire.format>
- <surefire.usefile>false</surefire.usefile>
+ <surefire.usefile>true</surefire.usefile>
<compile.forked>false</compile.forked>
<java.source.version>1.5</java.source.version>
<compile.flags>-Xlint:fallthrough,finally</compile.flags>
@@ -554,30 +554,83 @@ under the License.
<reporting>
<plugins>
+ <!--
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId>
<version>${cobertura.version}</version>
</plugin>
+ -->
+
+ <!-- Run the javadoc report. -->
+ <!--
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <tags>
+ <tag>
+ <name>todo</name>
+ <placement>a</placement>
+ <head>To do:</head>
+ </tag>
+ </tags>
+ </configuration>
+ </plugin>
+ -->
+
+ <!-- Generate the clover coverage report. -->
+ <!--
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clover-plugin</artifactId>
+ </plugin>
+ -->
+
+ <!-- Standard Maven project info reports. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>${mprojectinfo.version}</version>
</plugin>
+
+ <!-- Generate the surefire test report. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId>
<version>${surefire-report.version}</version>
</plugin>
+
+ <!-- Generate the TODO lists. -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>taglist-maven-plugin</artifactId>
+ </plugin>
+
+ <!-- Generate the source code cross reference. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>${javadoc.version}</version>
+ <artifactId>maven-jxr-plugin</artifactId>
+ </plugin>
+
+ <!-- Minimal checkstyle rules. -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <configLocation>${basedir}/${topDirectoryLocation}/etc/coding_standards.xml</configLocation>
+ <headerLocation>${basedir}/${topDirectoryLocation}/etc/license_header.txt</headerLocation>
+ </configuration>
</plugin>
+
+ <!-- Generate report on changed files. -->
+ <!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clover-plugin</artifactId>
+ <artifactId>maven-changelog-plugin</artifactId>
</plugin>
+ -->
+
</plugins>
</reporting>
diff --git a/qpid/java/systests/distribution/pom.xml b/qpid/java/systests/distribution/pom.xml
index bff1e0d9e5..70a6a18cce 100644
--- a/qpid/java/systests/distribution/pom.xml
+++ b/qpid/java/systests/distribution/pom.xml
@@ -31,6 +31,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
</parent>
<properties>
diff --git a/qpid/java/systests/pom.xml b/qpid/java/systests/pom.xml
index 8a245b73a9..f845b9fb44 100644
--- a/qpid/java/systests/pom.xml
+++ b/qpid/java/systests/pom.xml
@@ -30,6 +30,7 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid</artifactId>
<version>1.0-incubating-M2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<properties>
@@ -55,6 +56,11 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>uk.co.thebadgerset</groupId>
+ <artifactId>junit-toolkit</artifactId>
+ </dependency>
+
<!-- Test Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
new file mode 100644
index 0000000000..05fbceca20
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
@@ -0,0 +1,686 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ImmediateMessageTest tests for the desired behaviour of immediate messages. Immediate messages are a non-JMS
+ * feature. A message may be marked with an immediate delivery flag, which means that a consumer must be connected
+ * to receive the message, through a valid route, when it is sent, or when its transaction is committed in the case
+ * of transactional messaging. If this is not the case, the broker should return the message with a NO_CONSUMERS code.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Check that an immediate message is sent succesfully not using transactions when a consumer is connected.
+ * <tr><td> Check that an immediate message is committed succesfully in a transaction when a consumer is connected.
+ * <tr><td> Check that an immediate message results in no consumers code, not using transactions, when no consumer is
+ * connected.
+ * <tr><td> Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is
+ * connected.
+ * </table>
+ *
+ * @todo Write a test decorator, the sole function of which is to populate test context properties, from sys properties,
+ * from trailing prop=value pairs on the command line, from test properties files or other sources. This should
+ * run through stanard JUnit without the JUnit toolkit extensions, and through Maven surefire, and also through
+ * the JUnit toolkit extended test runners.
+ *
+ * @todo Veto test topologies using bounce back. Or else the bounce back client will act as an immediate consumer.
+ */
+public class ImmediateMessageTest extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = LoggerFactory.getLogger(ImmediateMessageTest.class);
+
+ /** Used to read the tests configurable properties through. */
+ ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the immediate flag on. */
+ private boolean immediateFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+
+ /** Check that an immediate message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Send one message with no errors.
+ PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_517_ImmediateOkTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Send one message with no errors.
+ PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an immediate message results in no consumers code, not using transactions, when no consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoConsumerNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ /** Check that an immediate message results in no consumers code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_517_ImmediateFailsNoConsumerTx() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ PublisherReceiverImpl.testWithAssertions(testProps, AMQNoConsumersException.class);
+ }
+
+ protected void setUp() throws Exception
+ {
+ NDC.push(getName());
+
+ // Ensure that the in-vm broker is created.
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ // Ensure that the in-vm broker is cleaned up so that the next test starts afresh.
+ TransportConnection.killVMBroker(1);
+ ApplicationRegistry.remove(1);
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+
+ /*
+ * Stuff below:
+ *
+ * This will get tidied into some sort on JMS convenience framework, through which practically any usefull test
+ * topology can be created. This will become a replacement for PingPongProducer.
+ *
+ * Base everything on standard connection properties defined in PingPongProducer. Split JMS and AMQP-only properties.
+ *
+ * Integrate with ConversationFactory, so that it will work with prod/con pairs.
+ *
+ * Support pub/rec pairs.
+ * Support m*n pub/rec setups. All pubs/recs on one machine.
+ *
+ * Support bounce back clients, with configurable bounce back behavior. All, one in X, round robin one in m, etc.
+ *
+ * Support pairing of m*n pub/rec setups with bounce back clients. JVM running a test, can simulate m publishers,
+ * will receive (a known subset of) all messages sent, bounced back to n receivers. Co-location of pub/rec will be
+ * the normal model to allow accurate timings to be taken.
+ *
+ * Support creation of pub or rec only.
+ * Support clock synching of pub/rec on different JVMs, by calculating clock offsets. Must also provide an accuracy
+ * estimate to +- the results.
+ *
+ * Augment the interop Coordinator, to become a full distributed test coordinator. Capable of querying available
+ * tests machines, looking at test parameters and farming out tests onto the test machines, passing all test
+ * parameters, standard naming of pub/rec config parameters used to set up m*n test topologies, run test cases,
+ * report results, tear down m*n topologies. Need to split the re-usable general purpose distributed test coordinator
+ * from the Qpid specific test framework for creating test-topoloigies and passing Qpid specific parameters.
+ *
+ * Write all tests against pub/rec pairs, without coding to the fact that the topology may be anything from 1:1 in
+ * JVM to m*n with bounce back clients accross many machines. That is, make the test topology orthogonal to the test
+ * case.
+ */
+
+ private static class ExceptionMonitor implements ExceptionListener
+ {
+ List<JMSException> exceptions = new ArrayList<JMSException>();
+
+ public void onException(JMSException e)
+ {
+ log.debug("ExceptionMonitor got JMSException: ", e);
+
+ exceptions.add(e);
+ }
+
+ public boolean assertNoExceptions()
+ {
+ return exceptions.isEmpty();
+ }
+
+ public boolean assertOneJMSException()
+ {
+ return exceptions.size() == 1;
+ }
+
+ public boolean assertOneJMSExceptionWithLinkedCause(Class aClass)
+ {
+ if (exceptions.size() == 1)
+ {
+ JMSException e = exceptions.get(0);
+
+ Exception linkedCause = e.getLinkedException();
+
+ if ((linkedCause != null) && aClass.isInstance(linkedCause))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void reset()
+ {
+ exceptions = new ArrayList();
+ }
+ }
+
+ /**
+ * Establishes a JMS connection using a properties file and qpids built in JNDI implementation. This is a simple
+ * convenience method for code that does anticipate handling connection failures. All exceptions that indicate
+ * that the connection has failed, are wrapped as rutime exceptions, preumably handled by a top level failure
+ * handler.
+ *
+ * @param messagingProps Any additional connection properties.
+ *
+ * @return A JMS conneciton.
+ *
+ * @todo Move this to a Utils library class or base test class. Also move the copy in interop.TestClient too.
+ *
+ * @todo Make in VM broker creation step optional on whether one is to be used or not.
+ */
+ public static Connection createConnection(ParsedProperties messagingProps)
+ {
+ log.debug("public static Connection createConnection(Properties messagingProps = " + messagingProps + "): called");
+
+ try
+ {
+ // Extract the configured connection properties from the test configuration.
+ String conUsername = messagingProps.getProperty(USERNAME_PROPNAME);
+ String conPassword = messagingProps.getProperty(PASSWORD_PROPNAME);
+ String virtualHost = messagingProps.getProperty(VIRTUAL_HOST_PROPNAME);
+ String brokerUrl = messagingProps.getProperty(BROKER_PROPNAME);
+
+ // Set up the broker connection url.
+ String connectionString =
+ "amqp://" + conUsername + ":" + conPassword + "/" + ((virtualHost != null) ? virtualHost : "")
+ + "?brokerlist='" + brokerUrl + "'";
+
+ // messagingProps.setProperty(CONNECTION_PROPNAME, connectionString);
+
+ Context ctx = new InitialContext(messagingProps);
+
+ ConnectionFactory cf = (ConnectionFactory) ctx.lookup(CONNECTION_NAME);
+ Connection connection = cf.createConnection();
+
+ return connection;
+ }
+ catch (NamingException e)
+ {
+ log.debug("Got NamingException: ", e);
+ throw new RuntimeException("Got JNDI NamingException whilst looking up the connection factory.", e);
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ throw new RuntimeException("Could not establish connection due to JMSException.", e);
+ }
+ }
+
+ /**
+ * Creates a publisher and a receiver on the same connection, configured according the to specified standard
+ * properties.
+ *
+ * @param messagingProps The connection properties.
+ *
+ * @return A publisher/receiver client pair.
+ */
+ public static PublisherReceiver createPublisherReceiverPairSharedConnection(ParsedProperties messagingProps)
+ {
+ try
+ {
+ int ackMode = messagingProps.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ boolean useTopics = messagingProps.getPropertyAsBoolean(PUBSUB_PROPNAME);
+ String destinationSendRoot = messagingProps.getProperty(SEND_DESTINATION_NAME_ROOT_PROPNAME);
+ String destinationReceiveRoot = messagingProps.getProperty(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME);
+ boolean createPublisherProducer = messagingProps.getPropertyAsBoolean(PUBLISHER_PRODUCER_BIND_PROPNAME);
+ boolean createPublisherConsumer = messagingProps.getPropertyAsBoolean(PUBLISHER_CONSUMER_BIND_PROPNAME);
+ boolean createReceiverProducer = messagingProps.getPropertyAsBoolean(RECEIVER_PRODUCER_BIND_PROPNAME);
+ boolean createReceiverConsumer = messagingProps.getPropertyAsBoolean(RECEIVER_CONSUMER_BIND_PROPNAME);
+ boolean transactional = messagingProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+
+ // Check if any Qpid/AMQP specific flags or options need to be set.
+ boolean immediate = messagingProps.getPropertyAsBoolean(IMMEDIATE_PROPNAME);
+ boolean mandatory = messagingProps.getPropertyAsBoolean(MANDATORY_PROPNAME);
+ boolean needsQpidOptions = immediate | mandatory;
+
+ log.debug("ackMode = " + ackMode);
+ log.debug("useTopics = " + useTopics);
+ log.debug("destinationSendRoot = " + destinationSendRoot);
+ log.debug("destinationReceiveRoot = " + destinationReceiveRoot);
+ log.debug("createPublisherProducer = " + createPublisherProducer);
+ log.debug("createPublisherConsumer = " + createPublisherConsumer);
+ log.debug("createReceiverProducer = " + createReceiverProducer);
+ log.debug("createReceiverConsumer = " + createReceiverConsumer);
+ log.debug("transactional = " + transactional);
+ log.debug("immediate = " + immediate);
+ log.debug("mandatory = " + mandatory);
+ log.debug("needsQpidOptions = " + needsQpidOptions);
+
+ // Create connection, sessions and producer/consumer pairs on each session.
+ Connection connection = createConnection(messagingProps);
+
+ // Add the connection exception listener to assert on exception conditions with.
+ ExceptionMonitor exceptionMonitor = new ExceptionMonitor();
+ connection.setExceptionListener(exceptionMonitor);
+
+ Session publisherSession = connection.createSession(transactional, ackMode);
+ Session receiverSession = connection.createSession(transactional, ackMode);
+
+ Destination publisherProducerDestination =
+ useTopics ? publisherSession.createTopic(destinationSendRoot)
+ : publisherSession.createQueue(destinationSendRoot);
+
+ MessageProducer publisherProducer =
+ createPublisherProducer
+ ? (needsQpidOptions
+ ? ((AMQSession) publisherSession).createProducer(publisherProducerDestination, mandatory, immediate)
+ : publisherSession.createProducer(publisherProducerDestination)) : null;
+
+ MessageConsumer publisherConsumer =
+ createPublisherConsumer
+ ? publisherSession.createConsumer(publisherSession.createQueue(destinationReceiveRoot)) : null;
+
+ MessageProducer receiverProducer =
+ createReceiverProducer ? receiverSession.createProducer(receiverSession.createQueue(destinationReceiveRoot))
+ : null;
+
+ MessageConsumer receiverConsumer =
+ createReceiverConsumer ? receiverSession.createConsumer(receiverSession.createQueue(destinationSendRoot))
+ : null;
+
+ // Start listening for incoming messages.
+ connection.start();
+
+ // Package everything up.
+ ProducerConsumerPair publisher =
+ new ProducerConsumerPairImpl(publisherProducer, publisherConsumer, publisherSession);
+ ProducerConsumerPair receiver =
+ new ProducerConsumerPairImpl(receiverProducer, receiverConsumer, receiverSession);
+
+ PublisherReceiver result = new PublisherReceiverImpl(publisher, receiver, connection, exceptionMonitor);
+
+ return result;
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ throw new RuntimeException("Could not create publisher/receiver pair due to a JMSException.", e);
+ }
+ }
+
+ public static Message createTestMessage(ProducerConsumerPair client, ParsedProperties testProps) throws JMSException
+ {
+ return client.getSession().createMessage();
+ }
+
+ /**
+ * A ProducerConsumerPair is a pair consisting of one message producer and one message consumer. It is a standard
+ * unit of connectivity allowing a full-duplex conversation to be held, provided both the consumer and producer
+ * are instantiated and configured.
+ *
+ * In some situations a test, or piece of application code will be written with differing numbers of publishers
+ * and receivers in different roles, where one role produces only and one consumes only. This messaging topology
+ * can still make use of producer/consumer pairs as standard building blocks, combined into publisher/receiver
+ * units to fulfill the different messaging roles, with the publishers consumer uninstantiated and the receivers
+ * producer uninstantiated. Use a {@link PublisherReceiver} for this.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Provide a message producer for sending messages.
+ * <tr><td> Provide a message consumer for receiving messages.
+ * </table>
+ *
+ * @todo Update the {@link org.apache.qpid.util.ConversationFactory} so that it accepts these as the basic
+ * conversation connection units.
+ */
+ public static interface ProducerConsumerPair
+ {
+ public MessageProducer getProducer();
+
+ public MessageConsumer getConsumer();
+
+ public void send(Message message) throws JMSException;
+
+ public Session getSession();
+
+ public void close() throws JMSException;
+ }
+
+ /**
+ * A single producer and consumer.
+ */
+ public static class ProducerConsumerPairImpl implements ProducerConsumerPair
+ {
+ MessageProducer producer;
+
+ MessageConsumer consumer;
+
+ Session session;
+
+ public ProducerConsumerPairImpl(MessageProducer producer, MessageConsumer consumer, Session session)
+ {
+ this.producer = producer;
+ this.consumer = consumer;
+ this.session = session;
+ }
+
+ public MessageProducer getProducer()
+ {
+ return null;
+ }
+
+ public MessageConsumer getConsumer()
+ {
+ return null;
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ producer.send(message);
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ public void close() throws JMSException
+ {
+ if (producer != null)
+ {
+ producer.close();
+ }
+
+ if (consumer != null)
+ {
+ consumer.close();
+ }
+ }
+ }
+
+ /**
+ * Multiple producers and consumers made to look like a single producer and consumer. All methods repeated accross
+ * all producers and consumers.
+ */
+ public static class MultiProducerConsumerPairImpl implements ProducerConsumerPair
+ {
+ public MessageProducer getProducer()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public MessageConsumer getConsumer()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void send(Message message) throws JMSException
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public Session getSession()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+
+ public void close()
+ {
+ throw new RuntimeException("Not implemented.");
+ }
+ }
+
+ /**
+ * A PublisherReceiver consists of two sets of producer/consumer pairs, one for an 'instigating' publisher
+ * role, and one for a more 'passive' receiver role.
+ *
+ * <p/>A set of publishers and receivers forms a typical test configuration where both roles are to be controlled
+ * from within a single JVM. This is not a particularly usefull arrangement for applications which want to place
+ * these roles on physically seperate machines and pass messages between them. It is a faily normal arrangement for
+ * test code though, either to publish and receive messages through an in-VM message broker in order to test its
+ * expected behaviour, or to publish and receive (possibly bounced back) messages through a seperate broker instance
+ * in order to take performance timings. In the case of performance timings, the co-location of the publisher and
+ * receiver means that the timings are taken on the same machine for accurate timing without the need for clock
+ * synchronization.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Manage an m*n array of publisher and recievers.
+ * </table>
+ */
+ public static interface PublisherReceiver
+ {
+ public ProducerConsumerPair getPublisher();
+
+ public ProducerConsumerPair getReceiver();
+
+ public void start();
+
+ public void send(ParsedProperties testProps, int numMessages);
+
+ public ExceptionMonitor getConnectionExceptionMonitor();
+
+ public ExceptionMonitor getExceptionMonitor();
+
+ public void close();
+ }
+
+ public static class PublisherReceiverImpl implements PublisherReceiver
+ {
+ private ProducerConsumerPair publisher;
+ private ProducerConsumerPair receiver;
+ private Connection connection;
+ private ExceptionMonitor connectionExceptionMonitor;
+ private ExceptionMonitor exceptionMonitor;
+
+ public PublisherReceiverImpl(ProducerConsumerPair publisher, ProducerConsumerPair receiver, Connection connection,
+ ExceptionMonitor connectionExceptionMonitor)
+ {
+ this.publisher = publisher;
+ this.receiver = receiver;
+ this.connection = connection;
+ this.connectionExceptionMonitor = connectionExceptionMonitor;
+ this.exceptionMonitor = new ExceptionMonitor();
+ }
+
+ public ProducerConsumerPair getPublisher()
+ {
+ return publisher;
+ }
+
+ public ProducerConsumerPair getReceiver()
+ {
+ return receiver;
+ }
+
+ public void start()
+ { }
+
+ public void close()
+ {
+ try
+ {
+ publisher.close();
+ receiver.close();
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ throw new RuntimeException("Got JMSException during close.", e);
+ }
+ }
+
+ public ExceptionMonitor getConnectionExceptionMonitor()
+ {
+ return connectionExceptionMonitor;
+ }
+
+ public ExceptionMonitor getExceptionMonitor()
+ {
+ return exceptionMonitor;
+ }
+
+ public void send(ParsedProperties testProps, int numMessages)
+ {
+ boolean transactional = testProps.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+
+ // Send an immediate message through the publisher and ensure that it results in a JMSException.
+ try
+ {
+ getPublisher().send(createTestMessage(getPublisher(), testProps));
+
+ if (transactional)
+ {
+ getPublisher().getSession().commit();
+ }
+ }
+ catch (JMSException e)
+ {
+ log.debug("Got JMSException: ", e);
+ exceptionMonitor.onException(e);
+ }
+ }
+
+ public static void testWithAssertions(ParsedProperties testProps, Class aClass /*, assertions */)
+ {
+ PublisherReceiver testClients;
+
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ testClients = createPublisherReceiverPairSharedConnection(testProps);
+ testClients.start();
+
+ testClients.send(testProps, 1);
+
+ pause(1000L);
+
+ String errors = "";
+
+ if (!testClients.getConnectionExceptionMonitor().assertOneJMSExceptionWithLinkedCause(aClass))
+ {
+ errors += "Was expecting linked exception type " + aClass.getName() + ".\n";
+ }
+
+ // Clean up the publisher/receiver client pair.
+ testClients.close();
+
+ assertEquals(errors, "", errors);
+ }
+
+ /**
+ */
+ public static void testNoExceptions(ParsedProperties testProps)
+ {
+ PublisherReceiver testClients;
+
+ // Create a standard publisher/receiver test client pair on a shared connection, individual sessions.
+ testClients = createPublisherReceiverPairSharedConnection(testProps);
+ testClients.start();
+
+ testClients.send(testProps, 1);
+
+ pause(1000L);
+
+ String errors = "";
+
+ if (!testClients.getConnectionExceptionMonitor().assertNoExceptions())
+ {
+ errors += "There were connection exceptions.\n";
+ }
+
+ if (!testClients.getExceptionMonitor().assertNoExceptions())
+ {
+ errors += "There were exceptions on producer.\n";
+ }
+
+ // Clean up the publisher/receiver client pair.
+ testClients.close();
+
+ assertEquals(errors, "", errors);
+ }
+ }
+
+ /**
+ * Pauses for the specified length of time. In the event of failing to pause for at least that length of time
+ * due to interuption of the thread, a RutimeException is raised to indicate the failure. The interupted status
+ * of the thread is restores in that case. This method should only be used when it is expected that the pause
+ * will be succesfull, for example in test code that relies on inejecting a pause.
+ *
+ * @param t The minimum time to pause for in milliseconds.
+ */
+ public static void pause(long t)
+ {
+ try
+ {
+ Thread.sleep(t);
+ }
+ catch (InterruptedException e)
+ {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException("Failed to generate the requested pause length.", e);
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
new file mode 100644
index 0000000000..f41acca11b
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQNoRouteException;
+import org.apache.qpid.client.transport.TransportConnection;
+import static org.apache.qpid.server.exchange.MessagingTestConfigProperties.*;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+/**
+ * MandatoryMessageTest tests for the desired behaviour of mandatory messages. Mandatory messages are a non-JMS
+ * feature. A message may be marked with a mandatory delivery flag, which means that a valid route for the message
+ * must exist, when it is sent, or when its transaction is committed in the case of transactional messaging. If this
+ * is not the case, the broker should return the message with a NO_CONSUMERS code.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Check that a mandatory message is sent succesfully not using transactions when a consumer is connected.
+ * <tr><td> Check that a mandatory message is committed succesfully in a transaction when a consumer is connected.
+ * <tr><td> Check that a mandatory message results in no route code, not using transactions, when no consumer is
+ * connected.
+ * <tr><td> Check that a mandatory message results in no route code, upon transaction commit, when a consumer is
+ * connected.
+ * </table>
+ */
+public class MandatoryMessageTest extends TestCase
+{
+ /** Used for debugging. */
+ private static final Logger log = LoggerFactory.getLogger(MandatoryMessageTest.class);
+
+ /** Used to read the tests configurable properties through. */
+ ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
+
+ /** All these tests should have the mandatory flag on. */
+ // private boolean mandatoryFlag = testProps.setProperty(IMMEDIATE_PROPNAME, true);
+ private boolean mandatoryFlag = testProps.setProperty(MANDATORY_PROPNAME, true);
+
+ /** Check that an mandatory message is sent succesfully not using transactions when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Send one message with no errors.
+ ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message is committed succesfully in a transaction when a consumer is connected. */
+ public void test_QPID_508_MandatoryOkTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Send one message with no errors.
+ ImmediateMessageTest.PublisherReceiverImpl.testNoExceptions(testProps);
+ }
+
+ /** Check that an mandatory message results in no route code, not using transactions, when no consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteNoTx() throws Exception
+ {
+ // Ensure transactional sessions are off.
+ testProps.setProperty(TRANSACTED_PROPNAME, false);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ /** Check that an mandatory message results in no route code, upon transaction commit, when a consumer is connected. */
+ public void test_QPID_508_MandatoryFailsNoRouteTx() throws Exception
+ {
+ // Ensure transactional sessions are on.
+ testProps.setProperty(TRANSACTED_PROPNAME, true);
+
+ // Set up the messaging topology so that only the publishers producer is bound (do not set up the receiver to
+ // collect its messages).
+ testProps.setProperty(RECEIVER_CONSUMER_BIND_PROPNAME, false);
+
+ // Send one message and get a linked no consumers exception.
+ ImmediateMessageTest.PublisherReceiverImpl.testWithAssertions(testProps, AMQNoRouteException.class);
+ }
+
+ protected void setUp() throws Exception
+ {
+ NDC.push(getName());
+
+ // Ensure that the in-vm broker is created.
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ // Ensure that the in-vm broker is cleaned up so that the next test starts afresh.
+ TransportConnection.killVMBroker(1);
+ ApplicationRegistry.remove(1);
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
new file mode 100644
index 0000000000..9c8cefc492
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java
@@ -0,0 +1,282 @@
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.jms.Session;
+
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * MessagingTestConfigProperties defines a set of property names and default values for specifying a messaging topology,
+ * and test parameters for running a messaging test over that topology. A Properties object holding some of these
+ * properties, superimposed onto the defaults, is used to establish test topologies and control test behaviour.
+ *
+ * <p/>A complete list of the parameters, default values and comments on their usage is provided here:
+ *
+ * <p/><table><caption>Parameters</caption>
+ * <tr><th> Parameter <th> Default <th> Comments
+ * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
+ * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
+ * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
+ * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
+ * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
+ * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
+ * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
+ * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
+ * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
+ * <tr><td> username <td> guest <td> The username to access the broker with.
+ * <tr><td> password <td> guest <td> The password to access the broker with.
+ * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
+ * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
+ * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
+ * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
+ * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
+ * 0 - SESSION_TRANSACTED
+ * 1 - AUTO_ACKNOWLEDGE
+ * 2 - CLIENT_ACKNOWLEDGE
+ * 3 - DUPS_OK_ACKNOWLEDGE
+ * 257 - NO_ACKNOWLEDGE
+ * 258 - PRE_ACKNOWLEDGE
+ * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
+ * Limits the volume of messages currently buffered on the client
+ * or broker. Can help scale test clients by limiting amount of buffered
+ * data to avoid out of memory errors.
+ * </table>
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide the names and defaults of all test parameters.
+ * </table>
+ */
+public class MessagingTestConfigProperties
+{
+ // ====================== Connection Properties ==================================
+
+ /** Holds the name of the default connection configuration. */
+ public static final String CONNECTION_NAME = "broker";
+
+ /** Holds the name of the property to get the initial context factory name from. */
+ public static final String INITIAL_CONTEXT_FACTORY_PROPNAME = "java.naming.factory.initial";
+
+ /** Defines the class to use as the initial context factory by default. */
+ public static final String INITIAL_CONTEXT_FACTORY_DEFAULT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ /** Holds the name of the default connection factory configuration property. */
+ public static final String CONNECTION_PROPNAME = "connectionfactory.broker";
+
+ /** Defeins the default connection configuration. */
+ public static final String CONNECTION_DEFAULT = "amqp://guest:guest@clientid/?brokerlist='vm://:1'";
+
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "qpid.test.broker";
+
+ /** Holds the default broker url for the test. */
+ public static final String BROKER_DEFAULT = "vm://:1";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
+
+ /** Holds the default virtual path for the test. */
+ public static final String VIRTUAL_HOST_DEFAULT = "";
+
+ /** Holds the name of the property to get the broker access username from. */
+ public static final String USERNAME_PROPNAME = "username";
+
+ /** Holds the default broker log on username. */
+ public static final String USERNAME_DEFAULT = "guest";
+
+ /** Holds the name of the property to get the broker access password from. */
+ public static final String PASSWORD_PROPNAME = "password";
+
+ /** Holds the default broker log on password. */
+ public static final String PASSWORD_DEFAULT = "guest";
+
+ // ====================== Messaging Topology Properties ==========================
+
+ /** Holds the name of the property to get the bind publisher procuder flag from. */
+ public static final String PUBLISHER_PRODUCER_BIND_PROPNAME = "publisherProducerBind";
+
+ /** Holds the default value of the publisher producer flag. */
+ public static final boolean PUBLISHER_PRODUCER_BIND_DEFAULT = true;
+
+ /** Holds the name of the property to get the bind publisher procuder flag from. */
+ public static final String PUBLISHER_CONSUMER_BIND_PROPNAME = "publisherConsumerBind";
+
+ /** Holds the default value of the publisher consumer flag. */
+ public static final boolean PUBLISHER_CONSUMER_BIND_DEFAULT = false;
+
+ /** Holds the name of the property to get the bind receiver procuder flag from. */
+ public static final String RECEIVER_PRODUCER_BIND_PROPNAME = "receiverProducerBind";
+
+ /** Holds the default value of the receiver producer flag. */
+ public static final boolean RECEIVER_PRODUCER_BIND_DEFAULT = false;
+
+ /** Holds the name of the property to get the bind receiver procuder flag from. */
+ public static final String RECEIVER_CONSUMER_BIND_PROPNAME = "receiverConsumerBind";
+
+ /** Holds the default value of the receiver consumer flag. */
+ public static final boolean RECEIVER_CONSUMER_BIND_DEFAULT = true;
+
+ /** Holds the name of the property to get the destination name root from. */
+ public static final String SEND_DESTINATION_NAME_ROOT_PROPNAME = "sendDestinationRoot";
+
+ /** Holds the root of the name of the default destination to send to. */
+ public static final String SEND_DESTINATION_NAME_ROOT_DEFAULT = "sendTo";
+
+ /** Holds the name of the property to get the destination name root from. */
+ public static final String RECEIVE_DESTINATION_NAME_ROOT_PROPNAME = "receiveDestinationRoot";
+
+ /** Holds the root of the name of the default destination to send to. */
+ public static final String RECEIVE_DESTINATION_NAME_ROOT_DEFAULT = "receiveFrom";
+
+ /** Holds the name of the proeprty to get the destination count from. */
+ public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
+
+ /** Defines the default number of destinations to ping. */
+ public static final int DESTINATION_COUNT_DEFAULT = 1;
+
+ /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
+ public static final String PUBSUB_PROPNAME = "pubsub";
+
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ public static final boolean PUBSUB_DEFAULT = false;
+
+ // ====================== JMS Options and Flags =================================
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ public static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the message delivery mode to use for the test. */
+ public static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ public static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the transactional mode to use for the test. */
+ public static final boolean TRANSACTED_DEFAULT = false;
+
+ /** Holds the name of the property to set the no local flag from. */
+ public static final String NO_LOCAL_PROPNAME = "noLocal";
+
+ /** Defines the default value of the no local flag to use when consuming messages. */
+ public static final boolean NO_LOCAL_DEFAULT = false;
+
+ /** Holds the name of the property to get the message acknowledgement mode from. */
+ public static final String ACK_MODE_PROPNAME = "ackMode";
+
+ /** Defines the default message acknowledgement mode. */
+ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+
+ // ====================== Qpid Options and Flags ================================
+
+ /** Holds the name of the property to set the exclusive flag from. */
+ public static final String EXCLUSIVE_PROPNAME = "exclusive";
+
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
+ public static final boolean EXCLUSIVE_DEFAULT = false;
+
+ /** Holds the name of the property to set the immediate flag from. */
+ public static final String IMMEDIATE_PROPNAME = "immediate";
+
+ /** Defines the default value of the immediate flag to use when sending messages. */
+ public static final boolean IMMEDIATE_DEFAULT = false;
+
+ /** Holds the name of the property to set the mandatory flag from. */
+ public static final String MANDATORY_PROPNAME = "mandatory";
+
+ /** Defines the default value of the mandatory flag to use when sending messages. */
+ public static final boolean MANDATORY_DEFAULT = false;
+
+ /** Holds the name of the property to get the durable destinations flag from. */
+ public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+
+ /** Default value for the durable destinations flag. */
+ public static final boolean DURABLE_DESTS_DEFAULT = false;
+
+ /** Holds the name of the proeprty to set the prefetch size from. */
+ public static final String PREFECTH_PROPNAME = "prefetch";
+
+ /** Defines the default prefetch size to use when consuming messages. */
+ public static final int PREFETCH_DEFAULT = 100;
+
+ // ====================== Common Test Parameters ================================
+
+ /** Holds the name of the property to get the test message size from. */
+ public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+ /** Used to set up a default message size. */
+ public static final int MESSAGE_SIZE_DEAFULT = 0;
+
+ /** Holds the name of the property to get the message rate from. */
+ public static final String RATE_PROPNAME = "rate";
+
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ public static final int RATE_DEFAULT = 0;
+
+ /** Holds the name of the proeprty to get the. */
+ public static final String SELECTOR_PROPNAME = "selector";
+
+ /** Holds the default message selector. */
+ public static final String SELECTOR_DEFAULT = "";
+
+ /** Holds the name of the property to get the waiting timeout for response messages. */
+ public static final String TIMEOUT_PROPNAME = "timeout";
+
+ /** Default time to wait before assuming that a ping has timed out. */
+ public static final long TIMEOUT_DEFAULT = 30000;
+
+ /** Holds the name of the property to get the commit batch size from. */
+ public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
+
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
+ public static final int TX_BATCH_SIZE_DEFAULT = 1;
+
+ /** Holds the name of the property to set the maximum amount of pending message data for a producer to hold. */
+ public static final String MAX_PENDING_PROPNAME = "maxPending";
+
+ /** Defines the default maximum quantity of pending message data to allow producers to hold. */
+ public static final int MAX_PENDING_DEFAULT = 0;
+
+ /** Holds the name of the property to get the verbose mode proeprty from. */
+ public static final String VERBOSE_PROPNAME = "verbose";
+
+ /** Holds the default verbose mode. */
+ public static final boolean VERBOSE_DEFAULT = false;
+
+ /** Holds the default configuration properties. */
+ public static ParsedProperties defaults = new ParsedProperties();
+
+ static
+ {
+ defaults.setPropertyIfNull(INITIAL_CONTEXT_FACTORY_PROPNAME, INITIAL_CONTEXT_FACTORY_DEFAULT);
+ defaults.setPropertyIfNull(CONNECTION_PROPNAME, CONNECTION_DEFAULT);
+ defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
+ defaults.setPropertyIfNull(PUBLISHER_PRODUCER_BIND_PROPNAME, PUBLISHER_PRODUCER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(PUBLISHER_CONSUMER_BIND_PROPNAME, PUBLISHER_CONSUMER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVER_PRODUCER_BIND_PROPNAME, RECEIVER_PRODUCER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVER_CONSUMER_BIND_PROPNAME, RECEIVER_CONSUMER_BIND_DEFAULT);
+ defaults.setPropertyIfNull(SEND_DESTINATION_NAME_ROOT_PROPNAME, SEND_DESTINATION_NAME_ROOT_DEFAULT);
+ defaults.setPropertyIfNull(RECEIVE_DESTINATION_NAME_ROOT_PROPNAME, RECEIVE_DESTINATION_NAME_ROOT_DEFAULT);
+ defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
+ defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
+ defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
+ defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
+ defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
+ defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
+ defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
+ defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
+ defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
+ defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
+ defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+ defaults.setPropertyIfNull(PREFECTH_PROPNAME, PREFETCH_DEFAULT);
+ defaults.setPropertyIfNull(NO_LOCAL_PROPNAME, NO_LOCAL_DEFAULT);
+ defaults.setPropertyIfNull(EXCLUSIVE_PROPNAME, EXCLUSIVE_DEFAULT);
+ defaults.setPropertyIfNull(IMMEDIATE_PROPNAME, IMMEDIATE_DEFAULT);
+ defaults.setPropertyIfNull(MANDATORY_PROPNAME, MANDATORY_DEFAULT);
+ }
+}