summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-05 14:43:14 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-05 14:43:14 +0000
commit93bd9b2405e5c8d5c4493d621297cc8765785f28 (patch)
tree2263f8360596fa66dfc767acdbe74bafa0eda59f
parentc24eccc88801b77b06842aa0686b6582040630a4 (diff)
downloadqpid-python-93bd9b2405e5c8d5c4493d621297cc8765785f28.tar.gz
Revision: 503646
Author: rgreig Date: 11:28:57, 05 February 2007 Message: (Submitted by Rupert Smith) This local repository is no longer needed. JUnit-Toolkit snapshot repository is now hosted on sourceforge: http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/. A release is also in progress to the central maven repository. ---- Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo Revision: 503637 Author: rgreig Date: 11:17:08, 05 February 2007 Message: (Submitted by Rupert Smith) Junit-toolkit has now fully migrated onto sourceforge. Snapshot repository location updated. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/pom.xml Revision: 503609 Author: ritchiem Date: 09:49:59, 05 February 2007 Message: Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Revision: 503604 Author: rgreig Date: 09:40:04, 05 February 2007 Message: QPID-326 : Patch supplied by Rob Godfrey - add oldest message on queue notification, and log notifications in log file ---- Modified : /incubator/qpid/trunk/qpid/java/broker/etc/virtualhosts.xml Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Added : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java Modified : /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Revision: 503593 Author: ritchiem Date: 08:58:30, 05 February 2007 Message: Fixed bug in stop(). If a connection is opened not start()ed then closed a NullPointerException will be thrown as the Dispatcher has not been created. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Revision: 502655 Author: rgreig Date: 16:59:14, 02 February 2007 Message: (Submitted by Rupert Smith) Options moved to top of contructor. Were at bottom and not being used! ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Revision: 502627 Author: rgreig Date: 15:31:30, 02 February 2007 Message: (Submitted by Rupert Smith) Fixed problem with losing message results. Was not passing in self generated message correlation id in the async test, to match up replies with. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Revision: 502620 Author: rgreig Date: 15:09:08, 02 February 2007 Message: (Submitted by Rupert Smith) Perftests improved with better timeout handling. Shared/unique destinations to ping now an option. TestRunner now runs all per-thread setups, synchs all threads, then runs tests, synchas all threads, then runs tear downs. ---- Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.jar.sha1 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.md5 Deleted : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070131.112634-1.pom.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 Added : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 Modified : /incubator/qpid/trunk/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java Revision: 502610 Author: bhupendrab Date: 14:26:32, 02 February 2007 Message: QPID-84 tests for FSContextFactory deleted.fscontext.jar is not part of apache svn. ---- Modified : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java Deleted : /incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest Revision: 502576 Author: ritchiem Date: 11:13:13, 02 February 2007 Message: QPID-343 Performance test suite doesn't output missing message count on failure. Updated PingAsyncTestPerf to output missing messsage count. Updated PingPongProducer so it doesn't use AMQShortStringx. ---- Modified : /incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Modified : /incubator/qpid/trunk/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@503703 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/AppliedPatches.txt15
-rw-r--r--qpid/java/broker/etc/log4j.xml4
-rw-r--r--qpid/java/broker/etc/virtualhosts.xml77
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java197
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java119
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java50
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java357
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java135
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java23
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java264
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java250
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java104
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java167
-rw-r--r--qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java173
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java2
-rw-r--r--qpid/java/mvn-repo/README.txt5
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jarbin9768 -> 0 bytes
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom91
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml12
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml12
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jarbin73370 -> 0 bytes
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom111
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml12
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha11
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml9
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md51
-rw-r--r--qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha11
-rw-r--r--qpid/java/perftests/pom.xml168
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java17
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java190
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java16
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java15
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java2
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java6
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java1
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java54
57 files changed, 675 insertions, 2130 deletions
diff --git a/qpid/java/AppliedPatches.txt b/qpid/java/AppliedPatches.txt
index d60c8b1c75..6967a594a9 100644
--- a/qpid/java/AppliedPatches.txt
+++ b/qpid/java/AppliedPatches.txt
@@ -3,7 +3,7 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100
- Hard coded to return 0
-Latest Revision-
-502576.499979
+503646,502576.499979
-Not Done-
502610 QPID-84 UPdate PropertiesFileInitialContextFactoryTest Remove referencetest
@@ -18,7 +18,6 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100
501008 QPID-324 change sending of destination to byte representing T/Q/Other
501007 QPID-322 Test may habg isntead of fail of messages doesn't get through
501005 QPID-324 change sending of destination to byte representing T/Q/Other
-501003 QPID-320 Imporve performance by remembering protocol version
500310 Patch to TransactedTest to clean up messages
500264 Change to Transacted Test
499874 QPID-319 management console update (QPID-50)
@@ -64,8 +63,19 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100
-PARTIAL-
494650 QPID-268 Improvements to performance of generated code
+501003 QPID-320 Imporve performance by remembering protocol version
+ * DeliveryManager Queue data size counting.
-Done-
+503646 local repository removed
+503637 Junit-toolkit update
+503609 Updated to performacen test to allow the use of shared destinations.
+503604 QPID-326 add oldest Message on queue Notification and log notification in log file
+503593 Fixed bug in session . stop() where it would NPE on close if not start()ed
+502655 Moved option sets to top of constructor
+502627 Fixed problem with losing message results.
+502620 Improved timeout handling
+502576 QPID-343 Performance test suite doesn't output missing message count on failure
502627 Fixed problem with losing message results
502620 Performance test improvement with better timeout handing
502576 QPID-343 performacne test suite doesn't output missing message count
@@ -89,7 +99,6 @@ Update includes local fix for AMQQueueMBeanTest.getQueueDepth resolved by r50100
501010 QPID-322 Message reference count not incremented when added to unackMap.
501004 QPID-320 Simplify MessageListener setup
500284 Updated script details and added guard for traffic light being null
--
499979 QPID-315 Test classes to reproduce missing correlation id
499975 QPID-315 Moved message converstion to MessageConverter.
499781 Fixed race that caused duplicate date in log file
diff --git a/qpid/java/broker/etc/log4j.xml b/qpid/java/broker/etc/log4j.xml
index a14ac8459b..28a572eac9 100644
--- a/qpid/java/broker/etc/log4j.xml
+++ b/qpid/java/broker/etc/log4j.xml
@@ -42,6 +42,10 @@
<priority value="info"/>
</category>
+ <category name="org.apache.qpid.server.queue.AMQQueueMBean">
+ <priority value="info"/>
+ </category>
+
<category name="org.apache.qpid">
<priority value="warn"/>
</category>
diff --git a/qpid/java/broker/etc/virtualhosts.xml b/qpid/java/broker/etc/virtualhosts.xml
index de6a8c0682..2a573535de 100644
--- a/qpid/java/broker/etc/virtualhosts.xml
+++ b/qpid/java/broker/etc/virtualhosts.xml
@@ -21,8 +21,79 @@
-->
<virtualhosts>
<virtualhost>
- <path>/development</path>
- <bind>direct://amq.direct//queue</bind>
- <bind>direct://amq.direct//ping</bind>
+ <name>localhost</name>
+
+ <localhost>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </localhost>
+ </virtualhost>
+ <virtualhost>
+ <name>development</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <development>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <test>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </test>
</virtualhost>
</virtualhosts>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 9ecbf3d31a..a433351509 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -32,9 +32,13 @@ import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.CompositeConfiguration;
import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
public class VirtualHostConfiguration
{
@@ -42,11 +46,7 @@ public class VirtualHostConfiguration
XMLConfiguration _config;
- private static final String XML_VIRTUALHOST = "virtualhost";
- private static final String XML_PATH = "path";
- private static final String XML_BIND = "bind";
- private static final String XML_VIRTUALHOST_PATH = "virtualhost.path";
- private static final String XML_VIRTUALHOST_BIND = "virtualhost.bind";
+ private static final String VIRTUALHOST_PROPERTY_BASE = "virtualhost.";
public VirtualHostConfiguration(String configFile) throws ConfigurationException
@@ -55,135 +55,58 @@ public class VirtualHostConfiguration
_config = new XMLConfiguration(configFile);
- if (_config.getProperty(XML_VIRTUALHOST_PATH) == null)
- {
- throw new ConfigurationException(
- "Virtualhost Configuration document does not contain a valid virtualhost.");
- }
}
- public void performBindings() throws AMQException, ConfigurationException, URLSyntaxException
- {
- Object prop = _config.getProperty(XML_VIRTUALHOST_PATH);
- if (prop instanceof Collection)
- {
- _logger.debug("Number of VirtualHosts: " + ((Collection) prop).size());
- int virtualhosts = ((Collection) prop).size();
- for (int vhost = 0; vhost < virtualhosts; vhost++)
- {
- loadVirtualHost(vhost);
- }
- }
- else
- {
- loadVirtualHost(-1);
- }
- }
-
- private void loadVirtualHost(int index) throws AMQException, ConfigurationException, URLSyntaxException
+ private void configureVirtualHost(String virtualHostName, Configuration configuration) throws ConfigurationException, AMQException
{
- String path = XML_VIRTUALHOST;
+ _logger.debug("Loding configuration for virtaulhost: "+virtualHostName);
- if (index != -1)
+ if(virtualHostName == null)
{
- path = path + "(" + index + ")";
- }
-
- Object prop = _config.getProperty(path + "." + XML_PATH);
-
- if (prop == null)
- {
- prop = _config.getProperty(path + "." + XML_BIND);
- String error = "Virtual Host not defined for binding";
-
- if (prop != null)
- {
- if (prop instanceof Collection)
- {
- error += "s";
- }
-
- error += ": " + prop;
- }
-
- throw new ConfigurationException(error);
+ throw new ConfigurationException("Unknown virtual host: " + virtualHostName);
}
- _logger.info("VirtualHost:'" + prop + "'");
+ List queueNames = configuration.getList("queue.name");
- prop = _config.getProperty(path + "." + XML_BIND);
- if (prop instanceof Collection)
+ for(Object queueNameObj : queueNames)
{
- int bindings = ((Collection) prop).size();
- _logger.debug("Number of Bindings: " + bindings);
- for (int dest = 0; dest < bindings; dest++)
- {
- loadBinding(path, dest);
- }
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(queueName, configuration);
}
- else
- {
- loadBinding(path, -1);
- }
- }
- private void loadBinding(String rootpath, int index) throws AMQException, ConfigurationException, URLSyntaxException
- {
- String path = rootpath + "." + XML_BIND;
- if (index != -1)
- {
- path = path + "(" + index + ")";
- }
-
- String bindingString = _config.getString(path);
-
- AMQBindingURL binding = new AMQBindingURL(bindingString);
-
- _logger.debug("Loaded Binding:" + binding);
-
- try
- {
- bind(binding);
- }
- catch (AMQException amqe)
- {
- _logger.info("Unable to bind url: " + binding);
- throw amqe;
- }
}
- private void bind(AMQBindingURL binding) throws AMQException, ConfigurationException
+ private void configureQueue(String queueName, Configuration configuration) throws AMQException, ConfigurationException
{
+ CompositeConfiguration queueConfiguration = new CompositeConfiguration();
- String queueName = binding.getQueueName();
-
- // This will occur if the URL is a Topic
- if (queueName == null)
- {
- //todo register valid topic
- ///queueName = binding.getDestinationName();
- throw new AMQException("Topics cannot be bound. TODO Register valid topic");
- }
+ queueConfiguration.addConfiguration(configuration.subset("queue."+ queueName));
+ queueConfiguration.addConfiguration(configuration);
- //Get references to Broker Registries
QueueRegistry queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
MessageStore messageStore = ApplicationRegistry.getInstance().getMessageStore();
ExchangeRegistry exchangeRegistry = ApplicationRegistry.getInstance().getExchangeRegistry();
+ AMQQueue queue;
+
synchronized (queueRegistry)
{
- AMQQueue queue = queueRegistry.getQueue(queueName);
+ queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- _logger.info("Queue '" + binding.getQueueName() + "' does not exists. Creating.");
+ _logger.info("Creating queue '" + queueName + "' [on virtual host ]" );
+
+ boolean durable = queueConfiguration.getBoolean("durable" ,false);
+ boolean autodelete = queueConfiguration.getBoolean("autodelete", false);
+ String owner = queueConfiguration.getString("owner", null);
queue = new AMQQueue(queueName,
- Boolean.parseBoolean(binding.getOption(AMQBindingURL.OPTION_DURABLE)),
- null /* These queues will have no owner */,
- false /* Therefore autodelete makes no sence */, queueRegistry);
+ durable,
+ owner == null ? null : owner /* These queues will have no owner */,
+ autodelete /* Therefore autodelete makes no sence */, queueRegistry);
if (queue.isDurable())
{
@@ -194,27 +117,67 @@ public class VirtualHostConfiguration
}
else
{
- _logger.info("Queue '" + binding.getQueueName() + "' already exists not creating.");
+ _logger.info("Queue '" + queueName + "' already exists [on virtual host ], not creating.");
}
- Exchange defaultExchange = exchangeRegistry.getExchange(binding.getExchangeName());
- synchronized (defaultExchange)
+ String exchangeName = queueConfiguration.getString("exchange", null);
+
+ Exchange exchange = exchangeRegistry.getExchange(exchangeName == null ? null : exchangeName);
+
+ if(exchange == null)
{
- if (defaultExchange == null)
- {
- throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + binding);
- }
+ exchange = exchangeRegistry.getDefaultExchange();
+ }
- defaultExchange.registerQueue(queue.getName(), queue, null);
+ if (exchange == null)
+ {
+ throw new ConfigurationException("Attempt to bind queue to unknown exchange:" + exchangeName);
+ }
- if (binding.getRoutingKey() == null || binding.getRoutingKey().equals(""))
+ synchronized (exchange)
+ {
+ List routingKeys = queueConfiguration.getList("routingKey");
+ if(routingKeys == null || routingKeys.isEmpty())
{
- throw new ConfigurationException("Unknown binding not specified on url:" + binding);
+ routingKeys = Collections.singletonList(queue.getName());
}
- queue.bind(binding.getRoutingKey(), defaultExchange);
+ for(Object routingKey : routingKeys)
+ {
+ exchange.registerQueue((String)routingKey, queue, null);
+
+ queue.bind((String)routingKey, exchange);
+
+ _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + exchangeName + " RK:'" + routingKey + "'");
+ }
}
- _logger.info("Queue '" + queue.getName() + "' bound to exchange:" + binding.getExchangeName() + " RK:'" + binding.getRoutingKey() + "'");
+
}
+
+
+ Configurator.configure(queue);//, queueConfiguration);
}
+
+
+ public void performBindings() throws AMQException, ConfigurationException
+ {
+ List virtualHostNames = _config.getList(VIRTUALHOST_PROPERTY_BASE + "name");
+
+ _logger.info("Configuring " + virtualHostNames == null ? 0 : virtualHostNames.size() + " virtual hosts: " + virtualHostNames);
+
+ for(Object nameObject : virtualHostNames)
+ {
+ String name = String.valueOf(nameObject);
+ configureVirtualHost(name, _config.subset(VIRTUALHOST_PROPERTY_BASE + name));
+ }
+
+ if (virtualHostNames == null || virtualHostNames.isEmpty())
+ {
+ throw new ConfigurationException(
+ "Virtualhost Configuration document does not contain a valid virtualhost.");
+ }
+ }
+
+
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index eb9d1acb59..92dc0fa43e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.exchange;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -37,6 +37,8 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -52,9 +54,23 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
+ public Exchange getDefaultExchange()
+ {
+ return _defaultExchange;
+ }
+
public void unregisterExchange(String name, boolean inUse) throws AMQException
{
// TODO: check inUse argument
@@ -71,7 +87,16 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public Exchange getExchange(String name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -82,14 +107,15 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
public void routeContent(AMQMessage payload) throws AMQException
{
final String exchange = payload.getPublishBody().exchange;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be deleted in between
- // the JmsPublish being received (where the exchange is validated) and the final
+ // the BasicPublish being received (where the exchange is validated) and the final
// content body being received (which triggers this method)
+ // TODO: check where the exchange is validated
if (exch == null)
{
throw new AMQException("Exchange '" + exchange + "' does not exist");
}
exch.route(payload);
}
-}
+} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index 4a0a6a0ee1..1d32ee17a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -37,4 +37,6 @@ public interface ExchangeRegistry extends MessageRouter
void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException;
Exchange getExchange(String name);
+
+ Exchange getDefaultExchange();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afe4ea95b9..8603113c11 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.txn.TxnBuffer;
import org.apache.qpid.server.message.MessageDecorator;
import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class AMQMessage
{
+ private static final Logger _log = Logger.getLogger(AMQMessage.class);
+
public static final String JMS_MESSAGE = "jms.message";
private final Set<Object> _tokens = new HashSet<Object>();
@@ -61,6 +64,8 @@ public class AMQMessage
private final AtomicInteger _referenceCount = new AtomicInteger(1);
+ private long _arrivalTime;
+
/**
* Keeps a track of how many bytes we have received in body frames
*/
@@ -157,20 +162,20 @@ public class AMQMessage
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
-
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
- (byte)8, (byte)0, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- getExchangeName(), // exchange
- _redelivered, // redelivered
- getRoutingKey() // routingKey
- );
+ (byte) 8, (byte) 0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
@@ -201,6 +206,8 @@ public class AMQMessage
public void setContentHeaderBody(ContentHeaderBody contentHeaderBody) throws AMQException
{
_contentHeaderBody = contentHeaderBody;
+ _arrivalTime = System.currentTimeMillis();
+
if (_storeWhenComplete && isAllContentReceived())
{
storeMessage();
@@ -223,6 +230,7 @@ public class AMQMessage
_bodyLengthReceived += contentBody.getSize();
if (_storeWhenComplete && isAllContentReceived())
{
+ _arrivalTime = System.currentTimeMillis();
storeMessage();
}
}
@@ -263,6 +271,12 @@ public class AMQMessage
_redelivered = redelivered;
}
+
+ public long getArrivalTime()
+ {
+ return _arrivalTime;
+ }
+
public long getMessageId()
{
return _messageId;
@@ -299,6 +313,7 @@ public class AMQMessage
throw new MessageCleanupException(_messageId, e);
}
}
+
}
public void setPublisher(AMQProtocolSession publisher)
@@ -367,11 +382,17 @@ public class AMQMessage
return _txnBuffer;
}
+ public long getSize()
+ {
+ return getContentHeaderBody().bodySize;
+ }
+
/**
* Called to enforce the 'immediate' flag.
+ *
* @throws NoConsumersException if the message is marked for
- * immediate delivery but has not been marked as delivered to a
- * consumer
+ * immediate delivery but has not been marked as delivered to a
+ * consumer
*/
public void checkDeliveredToConsumer() throws NoConsumersException
{
@@ -393,7 +414,8 @@ public class AMQMessage
/**
* Called selectors to determin if the message has already been sent
- * @return _deliveredToConsumer
+ *
+ * @return _deliveredToConsumer
*/
public boolean getDeliveredToConsumer()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 101a2833a0..d8bacc8c7d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
@@ -90,22 +91,36 @@ public class AMQQueue implements Managable, Comparable
/**
* max allowed size(KB) of a single message
*/
- private long _maxMessageSize = 10000;
+ private long _maximumMessageSize = 10000;
/**
* max allowed number of messages on a queue.
*/
- private Integer _maxMessageCount = 10000;
+ @Configured(path = "maximumMessageCount", defaultValue = "0")
+ public int _maximumMessageCount;
/**
- * max queue depth(KB) for the queue
+ * max queue depth for the queue
*/
- private long _maxQueueDepth = 10000000;
+ @Configured(path = "maximumQueueDepth", defaultValue = "0")
+ public long _maximumQueueDepth = 10000000;
+
+ /*
+ * maximum message age before alerts occur
+ */
+ @Configured(path = "maximumMessageAge", defaultValue = "0")
+ public long _maximumMessageAge = 30000; //0
+
+ /*
+ * the minimum interval between sending out consequetive alerts of the same type
+ */
+ @Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
+ public long _minimumAlertRepeatGap = 30000;
/**
* total messages received by the queue since startup.
*/
- private long _totalMessagesReceived = 0;
+ public long _totalMessagesReceived = 0;
public int compareTo(Object o)
{
@@ -183,35 +198,13 @@ public class AMQQueue implements Managable, Comparable
_autoDelete = autoDelete;
_queueRegistry = queueRegistry;
_asyncDelivery = asyncDelivery;
+
_managedObject = createMBean();
_managedObject.register();
+
_subscribers = subscribers;
_subscriptionFactory = subscriptionFactory;
-
- //fixme - Make this configurable via the broker config.xml
- if (System.getProperties().getProperty("deliverymanager") != null)
- {
- if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
- {
- _logger.info("Using ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
- else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
- {
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
- }
- else
- {
- _logger.info("Using SynchronizedDeliveryManager");
- _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
- }
- }
- else
- {
- _logger.info("Using Default DeliveryManager: ConcurrentSelectorDeliveryManager");
- _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
- }
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
}
private AMQQueueMBean createMBean() throws AMQException
@@ -267,6 +260,11 @@ public class AMQQueue implements Managable, Comparable
return _deliveryMgr.getMessages();
}
+ public long getQueueDepth()
+ {
+ return _deliveryMgr.getTotalMessageSize();
+ }
+
/**
* @param messageId
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -295,50 +293,55 @@ public class AMQQueue implements Managable, Comparable
return _managedObject;
}
- public Long getMaximumMessageSize()
+ public long getMaximumMessageSize()
{
- return _maxMessageSize;
+ return _maximumMessageSize;
}
- public void setMaximumMessageSize(Long value)
+ public void setMaximumMessageSize(long value)
{
- _maxMessageSize = value;
+ _maximumMessageSize = value;
}
- public Integer getConsumerCount()
+ public int getConsumerCount()
{
return _subscribers.size();
}
- public Integer getActiveConsumerCount()
+ public int getActiveConsumerCount()
{
return _subscribers.getWeight();
}
- public Long getReceivedMessageCount()
+ public long getReceivedMessageCount()
{
return _totalMessagesReceived;
}
- public Integer getMaximumMessageCount()
+ public int getMaximumMessageCount()
{
- return _maxMessageCount;
+ return _maximumMessageCount;
}
- public void setMaximumMessageCount(Integer value)
+ public void setMaximumMessageCount(int value)
{
- _maxMessageCount = value;
+ _maximumMessageCount = value;
}
- public Long getMaximumQueueDepth()
+ public long getMaximumQueueDepth()
{
- return _maxQueueDepth;
+ return _maximumQueueDepth;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(Long value)
+ public void setMaximumQueueDepth(long value)
{
- _maxQueueDepth = value;
+ _maximumQueueDepth = value;
+ }
+
+ public long getOldestMessageArrivalTime()
+ {
+ return _deliveryMgr.getOldestMessageArrival();
}
/**
@@ -374,11 +377,11 @@ public class AMQQueue implements Managable, Comparable
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
- if(subscription.hasFilters())
+ if (subscription.hasFilters())
{
if (_deliveryMgr.hasQueuedMessages())
{
- _deliveryMgr.populatePreDeliveryQueue(subscription);
+ _deliveryMgr.populatePreDeliveryQueue(subscription);
}
}
@@ -551,6 +554,27 @@ public class AMQQueue implements Managable, Comparable
}
}
+ public long getMinimumAlertRepeatGap()
+ {
+ return _minimumAlertRepeatGap;
+ }
+
+ public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ {
+ _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ }
+
+ public long getMaximumMessageAge()
+ {
+ return _maximumMessageAge;
+ }
+
+ public void setMaximumMessageAge(long maximumMessageAge)
+ {
+ _maximumMessageAge = maximumMessageAge;
+ }
+
+
private class Deliver implements TxnOp
{
private final AMQMessage _msg;
@@ -591,4 +615,5 @@ public class AMQQueue implements Managable, Comparable
}
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index a914975e00..f5ecf6ba55 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
import javax.management.openmbean.*;
import javax.management.JMException;
@@ -41,8 +42,11 @@ import java.util.ArrayList;
* for an AMQQueue.
*/
@MBeanDescription("Management Interface for AMQQueue")
-public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
+public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
+
+ private static final Logger _logger = Logger.getLogger(AMQQueueMBean.class);
+
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -51,12 +55,14 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
private static OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
private static CompositeType _messageDataType = null; // Composite type for representing AMQ Message data.
private static TabularType _messagelistDataType = null; // Datatype for representing AMQ messages list.
-
+
// OpenMBean data types for viewMessageContent method
private static CompositeType _msgContentType = null;
private final static String[] _msgContentAttributes = {"AMQ MessageId", "MimeType", "Encoding", "Content"};
private static OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
+
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
@MBeanConstructor("Creates an MBean exposing an AMQQueue")
public AMQQueueMBean(AMQQueue queue) throws JMException
{
@@ -71,7 +77,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
{
init();
}
- catch(JMException ex)
+ catch (JMException ex)
{
// It should never occur
System.out.println(ex.getMessage());
@@ -88,7 +94,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
_msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
- _msgContentAttributes, _msgContentAttributeTypes);
+ _msgContentAttributes, _msgContentAttributeTypes);
_msgAttributeTypes[0] = SimpleType.LONG; // For message id
_msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING); // For header attributes
@@ -215,35 +221,31 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
*/
public void checkForNotification(AMQMessage msg)
{
- // Check for threshold message count
- Integer msgCount = getMessageCount();
- if (msgCount >= getMaximumMessageCount())
- {
- notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
- }
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap();
- // Check for threshold message size
- long messageSize = getMessageSize(msg);
- if (messageSize >= _queue.getMaximumMessageSize())
+ for (NotificationCheck check : NotificationCheck.values())
{
- notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
- }
-
- // Check for threshold queue depth in bytes
- long queueDepth = getQueueDepth();
- if (queueDepth >= _queue.getMaximumQueueDepth())
- {
- notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
+ if (check.isMessageSpecific() || _lastNotificationTimes[check.ordinal()] < thresholdTime)
+ {
+ if (check.notifyIfNecessary(msg, _queue, this))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
}
}
/**
* Sends the notification to the listeners
*/
- private void notifyClients(String notificationMsg)
+ public void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg)
{
+ // important : add log to the log file - monitoring tools may be looking for this
+ _logger.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+
Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
- ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
+ ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
_broadcaster.sendNotification(n);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
deleted file mode 100644
index 022d3b9635..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentDeliveryManager.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.server.configuration.Configurator;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-public class ConcurrentDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
-
- @Configured(path = "advanced.compressBufferOnQueue",
- defaultValue = "false")
- public boolean compressBufferOnQueue;
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
- //private int _messageCount;
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
-
- /**
- * Lock used to ensure that an channel that becomes unsuspended during the start of the queueing process is forced
- * to wait till the first message is added to the queue. This will ensure that the _queue has messages to be delivered
- * via the async thread.
- * <p/>
- * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
- */
- private ReentrantLock _lock = new ReentrantLock();
-
-
- ConcurrentDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
-
- //Set values from configuration
- Configurator.configure(this);
-
- if (compressBufferOnQueue)
- {
- _log.info("Compressing Buffers on queue.");
- }
-
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- /**
- * @return boolean if we are queueing
- */
- private boolean queueing()
- {
- return hasQueuedMessages();
- }
-
-
- /**
- * @param msg to enqueue
- * @return true if we are queue this message
- */
- private boolean enqueue(AMQMessage msg)
- {
- if (msg.isImmediate())
- {
- return false;
- }
- else
- {
- _lock.lock();
- try
- {
- if (queueing())
- {
- return addMessageToQueue(msg);
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private void startQueueing(AMQMessage msg)
- {
- if (!msg.isImmediate())
- {
- addMessageToQueue(msg);
- }
- }
-
- private boolean addMessageToQueue(AMQMessage msg)
- {
- // Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
- {
- Iterator it = msg.getContentBodies().iterator();
- while (it.hasNext())
- {
- ContentBody cb = (ContentBody) it.next();
- cb.reduceBufferToFit();
- }
- }
-
- _messages.offer(msg);
-
- return true;
- }
-
-
- public boolean hasQueuedMessages()
- {
-
- _lock.lock();
- try
- {
- return !_messages.isEmpty();
- }
- finally
- {
- _lock.unlock();
- }
-
-
- }
-
- public int getQueueMessageCount()
- {
- return getMessageCount();
- }
-
- /**
- * This is an EXPENSIVE opperation to perform with a ConcurrentLinkedQueue as it must run the queue to determine size.
- * The ConcurrentLinkedQueueAtomicSize uses an AtomicInteger to record the number of elements on the queue.
- *
- * @return int the number of messages in the delivery queue.
- */
- private int getMessageCount()
- {
- return _messages.size();
- }
-
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop() throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(_queue);
- }
- }
-
- public synchronized void clearAllMessages() throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(_queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue()
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- AMQMessage message = peek();
-
- //While we have messages to send and subscribers to send them to.
- while (message != null && hasSubscribers)
- {
- // _log.debug("Have messages(" + _messages.size() + ") and subscribers");
- Subscription next = _subscriptions.nextSubscriber(message);
- //FIXME Is there still not the chance that this subscribe could be suspended between here and the send?
-
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- next.send(message, _queue);
- poll();
- message = peek();
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- catch (FailedDequeueException e)
- {
- _log.error("Unable to deliver message as dequeue failed: " + e, e);
- }
- finally
- {
- _log.debug("End of processQueue: (" + getQueueMessageCount() + ")" + " subscribers:" + _subscriptions.hasActiveSubscribers());
- }
- }
-
- private AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private AMQMessage poll()
- {
- return _messages.poll();
- }
-
- Runner asyncDelivery = new Runner();
-
- public void processAsync(Executor executor)
- {
- _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
-
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(asyncDelivery);
- }
- }
- }
- }
-
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- // not queueing so deliver message to 'next' subscriber
- _lock.lock();
- try
- {
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- if (!msg.isImmediate())
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- boolean running = true;
- while (running)
- {
- processQueue();
-
- //Check that messages have not been added since we did our last peek();
- // Synchronize with the thread that adds to the queue.
- // If the queue is still empty then we can exit
- _lock.lock();
- try
- {
- if (!(hasQueuedMessages() && _subscriptions.hasActiveSubscribers()))
- {
- running = false;
- _processing.set(false);
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
- }
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index f09e8213b1..9efeb8351c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -34,6 +34,7 @@ import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
@@ -76,7 +77,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
* Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
*/
private ReentrantLock _lock = new ReentrantLock();
-
+ private AtomicLong _totalMessageSize = new AtomicLong();
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
{
@@ -109,6 +110,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_messages.offer(msg);
+ _totalMessageSize.addAndGet(msg.getSize());
+
return true;
}
@@ -142,6 +145,17 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return _messages.size();
}
+ public long getTotalMessageSize()
+ {
+ return _totalMessageSize.get();
+ }
+
+ public long getOldestMessageArrival()
+ {
+ AMQMessage msg = _messages.peek();
+ return msg == null ? Long.MAX_VALUE : msg.getArrivalTime();
+ }
+
public synchronized List<AMQMessage> getMessages()
{
@@ -173,6 +187,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.addAndGet(-msg.getSize());
}
}
@@ -182,6 +197,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
while (msg != null)
{
msg.dequeue(_queue);
+ _totalMessageSize.set(0L);
msg = poll();
}
}
@@ -222,6 +238,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//remove sent message from our queue.
messageQueue.poll();
+ _totalMessageSize.addAndGet(-message.getSize());
}
catch (FailedDequeueException e)
{
@@ -308,7 +325,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//Pre Deliver to all subscriptions
if (_log.isDebugEnabled())
{
- _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
" subscribers to give the message to.");
}
for (Subscription sub : _subscriptions.getSubscriptions())
@@ -330,7 +347,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
{
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
}
sub.enqueueForPreDelivery(msg);
@@ -345,7 +362,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (_log.isDebugEnabled())
{
- _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
System.identityHashCode(s) + ") :" + s);
}
//Deliver the message
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index cac499587f..28386dfa45 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -64,7 +64,8 @@ interface DeliveryManager
*
* @param name the name of the entity on whose behalf we are delivering the message
* @param msg the message to deliver
- * @throws org.apache.qpid.server.queue.FailedDequeueException if the message could not be dequeued
+ * @throws org.apache.qpid.server.queue.FailedDequeueException
+ * if the message could not be dequeued
*/
void deliver(String name, AMQMessage msg) throws FailedDequeueException;
@@ -75,4 +76,8 @@ interface DeliveryManager
List<AMQMessage> getMessages();
void populatePreDeliveryQueue(Subscription subscription);
+
+ long getTotalMessageSize();
+
+ long getOldestMessageArrival();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
new file mode 100644
index 0000000000..8e9b3804f2
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -0,0 +1,135 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount = queue.getMessageCount();
+ final int maximumMessageCount = queue.getMaximumMessageCount();
+ if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount)
+ {
+ listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getMaximumMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+// try
+// {
+ messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+// }
+// catch (AMQException e)
+// {
+// messageSize = 0;
+// }
+
+
+ if (messageSize >= maximumMessageSize)
+ {
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getMaximumQueueDepth();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepth();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getMaximumMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
new file mode 100644
index 0000000000..bd9d7f6b11
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueNotificationListener.java
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface QueueNotificationListener
+{
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
deleted file mode 100644
index c967ea2cde..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.log4j.Logger;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Manages delivery of messages on behalf of a queue
- */
-class SynchronizedDeliveryManager implements DeliveryManager
-{
- private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
-
- /**
- * Holds any queued messages
- */
- private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
- /**
- * Ensures that only one asynchronous task is running for this manager at
- * any time.
- */
- private final AtomicBoolean _processing = new AtomicBoolean();
- /**
- * The subscriptions on the queue to whom messages are delivered
- */
- private final SubscriptionManager _subscriptions;
-
- /**
- * An indication of the mode we are in. If this is true then messages are
- * being queued up in _messages for asynchronous delivery. If it is false
- * then messages can be delivered directly as they come in.
- */
- private volatile boolean _queueing;
-
- /**
- * A reference to the queue we are delivering messages for. We need this to be able
- * to pass the code that handles acknowledgements a handle on the queue.
- */
- private final AMQQueue _queue;
-
- SynchronizedDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
- {
- _subscriptions = subscriptions;
- _queue = queue;
- }
-
- private synchronized boolean enqueue(AMQMessage msg)
- {
- if (msg.isImmediate())
- {
- return false;
- }
- else
- {
- if (_queueing)
- {
- _messages.offer(msg);
- return true;
- }
- else
- {
- return false;
- }
- }
- }
-
- private synchronized void startQueueing(AMQMessage msg)
- {
- _queueing = true;
- enqueue(msg);
- }
-
- /**
- * Determines whether there are queued messages. Sets _queueing to false if
- * there are no queued messages. This needs to be atomic.
- *
- * @return true if there are queued messages
- */
- public synchronized boolean hasQueuedMessages()
- {
- boolean empty = _messages.isEmpty();
- if (empty)
- {
- _queueing = false;
- }
- return !empty;
- }
-
- public synchronized int getQueueMessageCount()
- {
- return _messages.size();
- }
-
- public synchronized List<AMQMessage> getMessages()
- {
- return new ArrayList<AMQMessage>(_messages);
- }
-
- public void populatePreDeliveryQueue(Subscription subscription)
- {
- //no-op . This DM has no PreDeliveryQueues
- }
-
- public synchronized void removeAMessageFromTop() throws AMQException
- {
- AMQMessage msg = poll();
- if (msg != null)
- {
- msg.dequeue(_queue);
- }
- }
-
- public synchronized void clearAllMessages() throws AMQException
- {
- AMQMessage msg = poll();
- while (msg != null)
- {
- msg.dequeue(_queue);
- msg = poll();
- }
- }
-
- /**
- * Only one thread should ever execute this method concurrently, but
- * it can do so while other threads invoke deliver().
- */
- private void processQueue()
- {
- try
- {
- boolean hasSubscribers = _subscriptions.hasActiveSubscribers();
- while (hasQueuedMessages() && hasSubscribers)
- {
- Subscription next = _subscriptions.nextSubscriber(peek());
- //We don't synchronize access to subscribers so need to re-check
- if (next != null)
- {
- try
- {
- next.send(poll(), _queue);
- }
- catch (AMQException e)
- {
- _log.error("Unable to deliver message: " + e, e);
- }
- }
- else
- {
- hasSubscribers = false;
- }
- }
- }
- finally
- {
- _processing.set(false);
- }
- }
-
- private synchronized AMQMessage peek()
- {
- return _messages.peek();
- }
-
- private synchronized AMQMessage poll()
- {
- return _messages.poll();
- }
-
- /**
- * Requests that the delivery manager start processing the queue asynchronously
- * if there is work that can be done (i.e. there are messages queued up and
- * subscribers that can receive them.
- * <p/>
- * This should be called when subscribers are added, but only after the consume-ok
- * message has been returned as message delivery may start immediately. It should also
- * be called after unsuspending a client.
- * <p/>
- *
- * @param executor the executor on which the delivery should take place
- */
- public void processAsync(Executor executor)
- {
- if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
- {
- //are we already running? if so, don't re-run
- if (_processing.compareAndSet(false, true))
- {
- // Do we need this?
- // This executor is created via Executors in AsyncDeliveryConfig which only returns a TPE so cast is ok.
- //if (executor != null && !((ThreadPoolExecutor) executor).isShutdown())
- {
- executor.execute(new Runner());
- }
- }
- }
- }
-
- /**
- * Handles message delivery. The delivery manager is always in one of two modes;
- * it is either queueing messages for asynchronous delivery or delivering
- * directly.
- *
- * @param name the name of the entity on whose behalf we are delivering the message
- * @param msg the message to deliver
- * @throws NoConsumersException if there are no active subscribers to deliver
- * the message to
- */
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
- {
- // first check whether we are queueing, and enqueue if we are
- if (!enqueue(msg))
- {
- synchronized(this)
- {
- // not queueing so deliver message to 'next' subscriber
- Subscription s = _subscriptions.nextSubscriber(msg);
- if (s == null)
- {
- // no subscribers yet so enter 'queueing' mode and queue this message
- startQueueing(msg);
- }
- else
- {
- s.send(msg, _queue);
- }
- }
- }
-
- }
-
- private class Runner implements Runnable
- {
- public void run()
- {
- processQueue();
- }
- }
-}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java
deleted file mode 100644
index db871281bf..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Bind.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameAlreadyBoundException;
-import javax.naming.NamingException;
-import javax.naming.NoInitialContextException;
-import java.io.File;
-import java.util.Hashtable;
-
-import junit.framework.TestCase;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Bind extends TestCase
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis();
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- String _connectionFactoryString = "";
-
- String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'";
- Topic _topic = null;
-
- boolean _bound = false;
-
- public Bind() throws NameAlreadyBoundException, NoInitialContextException
- {
- this(false, DEFAULT_PROVIDER_URL);
- }
-
- public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException
- {
- this(output, DEFAULT_PROVIDER_URL);
- }
-
- public Bind(boolean output, String providerURL) throws NameAlreadyBoundException, NoInitialContextException
- {
- PROVIDER_URL = providerURL;
-
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
-
-
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- Connection connection = null;
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- // Create the connection factory to be bound
- ConnectionFactory connectionFactory = null;
- // Create the Connection to be bound
-
-
- try
- {
- connectionFactory = new AMQConnectionFactory(_connectionString);
- connection = connectionFactory.createConnection();
-
- _connectionFactoryString = ((AMQConnectionFactory) connectionFactory).getConnectionURL().getURL();
- }
- catch (JMSException jmsqe)
- {
- fail("Unable to create Connection:" + jmsqe);
- }
- catch (URLSyntaxException urlse)
- {
- fail("Unable to create Connection:" + urlse);
- }
-
- try
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- _topic = session.createTopic("Fruity");
- }
- catch (JMSException jmse)
- {
-
- }
- // Perform the binds
- ctx.bind("ConnectionFactory", connectionFactory);
- if (output)
- {
- System.out.println("Bound factory\n" + ((AMQConnectionFactory) connectionFactory).getConnectionURL());
- }
- ctx.bind("Connection", connection);
- if (output)
- {
- System.out.println("Bound Connection\n" + ((AMQConnection) connection).toURL());
- }
- ctx.bind("Topic", _topic);
- if (output)
- {
- System.out.println("Bound Topic:\n" + ((AMQTopic) _topic).toURL());
- }
- _bound = true;
-
- // Check that it is bound
- //Object obj = ctx.lookup("Connection");
- //System.out.println(((AMQConnection)obj).toURL());
-
- // Close the context when we're done
- ctx.close();
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- if (e instanceof NameAlreadyBoundException)
- {
- throw(NameAlreadyBoundException) e;
- }
-
- if (e instanceof NoInitialContextException)
- {
- throw(NoInitialContextException) e;
- }
- }
- finally
- {
- try
- {
- if (connection != null)
- {
- connection.close();
- }
- }
- catch (JMSException e)
- {
- //ignore just want it closed
- }
- }
- }
-
- public String connectionFactoryValue()
- {
- if (_connectionFactoryString != null)
- {
- return _connectionFactoryString;
- }
- else
- {
- return "";
- }
- }
-
- public String connectionValue()
- {
- if (_connectionString != null)
- {
- return _connectionString;
- }
- else
- {
- return "";
- }
- }
-
- public String topicValue()
- {
- if (_topic != null)
- {
- return ((AMQTopic) _topic).toURL();
- }
- else
- {
- return "";
- }
-
- }
-
- public boolean bound()
- {
- return _bound;
- }
-
- public String getProviderURL()
- {
- return PROVIDER_URL;
- }
-
- public static void main(String[] args) throws NameAlreadyBoundException, NoInitialContextException
- {
- new Bind(true);
- }
-}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
deleted file mode 100644
index 4731caca98..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.test.VMBrokerSetup;
-
-import javax.naming.NameAlreadyBoundException;
-import javax.naming.NoInitialContextException;
-
-import junit.framework.TestCase;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-public class JNDIReferenceableTest extends TestCase
-{
- // FIXME FSContext has been removed from repository. This needs redone with the PropertiesFileInitialContextFactory. QPID-84
- public void testReferenceable()
- {
- Bind b = null;
- try
- {
- try
- {
- b = new Bind();
- }
- catch (NameAlreadyBoundException e)
- {
- if (new Unbind().unbound())
- {
- try
- {
- b = new Bind();
- }
- catch (NameAlreadyBoundException ee)
- {
- fail("Unable to clear bound objects for test.");
- }
- }
- else
- {
- fail("Unable to clear bound objects for test.");
- }
- }
- }
- catch (NoInitialContextException e)
- {
- fail("You don't have the File System SPI on you class path.\n" +
- "This can be downloaded from sun here:\n" +
- "http://java.sun.com/products/jndi/downloads/index.html\n" +
- "Click : Download JNDI 1.2.1 & More button\n" +
- "Download: File System Service Provider, 1.2 Beta 3\n" +
- "and add the two jars in the lib dir to your class path.");
- }
-
- assertTrue(b.bound());
-
- Lookup l = new Lookup(b.getProviderURL());
-
- assertTrue(l.connectionFactoryValue().equals(b.connectionFactoryValue()));
-
- assertTrue(l.connectionValue().equals(b.connectionValue()));
-
- assertTrue(l.topicValue().equals(b.topicValue()));
-
-
- Unbind u = new Unbind();
-
- assertTrue(u.unbound());
-
- }
-
- public static junit.framework.Test suite()
- {
- return new VMBrokerSetup(new junit.framework.TestSuite(JNDIReferenceableTest.class));
- }
-}
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java
deleted file mode 100644
index b804ccb30c..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Lookup.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQTopic;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.jms.JMSException;
-import java.io.File;
-import java.util.Hashtable;
-
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Lookup
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest";
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- AMQTopic _topic = null;
- AMQConnection _connection = null;
- AMQConnectionFactory _connectionFactory = null;
- private String _connectionURL;
-
-
- public Lookup()
- {
- this(DEFAULT_PROVIDER_URL);
- }
-
- public Lookup(String providerURL)
- {
-
- PROVIDER_URL = providerURL;
-
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
-
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- _topic = (AMQTopic) ctx.lookup("Topic");
-
- _connection = (AMQConnection) ctx.lookup("Connection");
-
- _connectionURL = _connection.toURL();
-
- _connectionFactory = (AMQConnectionFactory) ctx.lookup("ConnectionFactory");
- //System.out.println(topic);
-
- // Close the context when we're done
- ctx.close();
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- }
- finally
- {
- try
- {
- if (_connection != null)
- {
- _connection.close();
- }
- }
- catch (JMSException e)
- {
- //ignore just need to close
- }
- }
- }
-
- public String connectionFactoryValue()
- {
- if (_connectionFactory != null)
- {
- return _connectionFactory.getConnectionURL().toString();
- }
- return "";
- }
-
- public String connectionValue()
- {
- if (_connectionURL != null)
- {
- return _connectionURL;
- }
- return "";
- }
-
- public String topicValue()
- {
- if (_topic != null)
- {
- return _topic.toURL();
- }
- return "";
- }
-
- public String getProviderURL()
- {
- return PROVIDER_URL;
- }
-
- public static void main(String[] args)
- {
- new Lookup();
- }
-}
-
diff --git a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java b/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java
deleted file mode 100644
index 869bc55d8f..0000000000
--- a/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/Unbind.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.unit.jndi.referenceabletest;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NameNotFoundException;
-import javax.naming.NamingException;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import java.io.File;
-import java.util.Hashtable;
-
-/**
- * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
- * This can be downloaded from sun here:
- * http://java.sun.com/products/jndi/downloads/index.html
- * Click : Download JNDI 1.2.1 & More button
- * Download: File System Service Provider, 1.2 Beta 3
- * and add the two jars in the lib dir to your class path.
- * <p/>
- * Also you need to create the directory /temp/qpid-jndi-test
- */
-class Unbind
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest" + System.currentTimeMillis();
- public static final String DEFAULT_PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
- public String PROVIDER_URL = DEFAULT_PROVIDER_URL;
-
- boolean _unbound = false;
-
- public Unbind()
- {
- this(false, DEFAULT_PROVIDER_URL);
- }
-
- public Unbind(Boolean output)
- {
- this(output, DEFAULT_PROVIDER_URL);
- }
-
- public Unbind(String provider)
- {
- this(false, provider);
- }
-
- public Unbind(boolean output, String providerURL)
- {
- PROVIDER_URL = providerURL;
- // Set up the environment for creating the initial context
- Hashtable env = new Hashtable(11);
- env.put(Context.INITIAL_CONTEXT_FACTORY,
- "com.sun.jndi.fscontext.RefFSContextFactory");
- env.put(Context.PROVIDER_URL, PROVIDER_URL);
-
- File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
-
- if (file.exists() && !file.isDirectory())
- {
- System.out.println("Couldn't make directory file already exists");
- return;
- }
- else
- {
- if (!file.exists())
- {
- if (!file.mkdirs())
- {
- System.out.println("Couldn't make directory");
- return;
- }
- }
- }
-
- try
- {
- // Create the initial context
- Context ctx = new InitialContext(env);
-
- // Remove the binding
- ctx.unbind("ConnectionFactory");
- ctx.unbind("Connection");
- ctx.unbind("Topic");
-
- // Check that it is gone
- Object obj = null;
- try
- {
- obj = ctx.lookup("ConnectionFactory");
- }
- catch (NameNotFoundException ne)
- {
- if (output)
- {
- System.out.println("unbind ConnectionFactory successful");
- }
- try
- {
- obj = ctx.lookup("Connection");
- try
- {
- ((Connection) obj).close();
- }
- catch (JMSException e)
- {
- //ignore just need to close
- }
- }
- catch (NameNotFoundException ne2)
- {
- if (output)
- {
- System.out.println("unbind Connection successful");
- }
-
- try
- {
- obj = ctx.lookup("Topic");
- }
- catch (NameNotFoundException ne3)
- {
- if (output)
- {
- System.out.println("unbind Topic successful");
- }
- _unbound = true;
- }
- }
- }
-
- //System.out.println("unbind failed; object still there: " + obj);
-
- // Close the context when we're done
-
- ctx.close();
-
- }
- catch (NamingException e)
- {
- System.out.println("Operation failed: " + e);
- }
- }
-
- public boolean unbound()
- {
- return _unbound;
- }
-
- public static void main(String[] args)
- {
-
- new Unbind(true);
- }
-}
-
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
index ecc8f1d1e9..8c90bd772c 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/DispatcherTest.java
@@ -197,7 +197,7 @@ public class DispatcherTest extends TestCase
try
{
- _logger.error("Send additional messages");
+ _logger.info("Send additional messages");
for (int msg = 0; msg < MSG_COUNT; msg++)
{
diff --git a/qpid/java/mvn-repo/README.txt b/qpid/java/mvn-repo/README.txt
deleted file mode 100644
index c78cda6938..0000000000
--- a/qpid/java/mvn-repo/README.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-Temporary local repository for jars that are not available in the central repository yet.
-This was created because junit-toolkit is undergoing some development to use it with perf tests. Its an Apache licensed performance testing utility.
-It takes 2-4 days to get something added to the central repo but want to keep up with changed to junit-toolkit. This repository holds a snapshot
-of it, so that the build can get it from somewhere without breaking. It is anticipated that it will reach a stable 0.5 version within two weeks from
-30/01/2007, at which time it will be placed in the central repo and this local repository will be deleted.
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar
deleted file mode 100644
index 43c678f547..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar
+++ /dev/null
Binary files differ
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5
deleted file mode 100644
index 87820b3b0a..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.md5
+++ /dev/null
@@ -1 +0,0 @@
-8aff63861edb0a6bb47b5fad955a6ba5 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1
deleted file mode 100644
index 5a72a41b79..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-600209771b236268f1b939e4a924899875ee8562 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom
deleted file mode 100644
index 65587eb683..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom
+++ /dev/null
@@ -1,91 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><project>
- <modelVersion>4.0.0</modelVersion>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit-maven-plugin</artifactId>
- <packaging>maven-plugin</packaging>
- <name>junit-toolkit-maven-plugin</name>
- <version>0.5-20070130.111904-1</version>
- <description>Maven plugin for the JUnit Toolkit to run performance tests with TKTestRunner.</description>
- <url>http://www.thebadgerset.co.uk/projects/junit-toolkit-maven-plugin</url>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
- <scm>
- <connection>scm:${scm.setup}/junit-toolkit-maven-plugin</connection>
- </scm>
- <organization>
- <name>The Badger Set trading as Liberty Bishop (1151) ltd.</name>
- <url>http://www.thebadgerset.co.uk/</url>
- </organization>
- <build>
- <sourceDirectory>src/main</sourceDirectory>
- <pluginManagement>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.1</version>
- <configuration>
- <source>1.5</source>
- <target>1.5</target>
- <fork>false</fork>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- <dependencies>
- <dependency>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit</artifactId>
- <version>0.5-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.maven</groupId>
- <artifactId>maven-plugin-api</artifactId>
- <version>2.0.4</version>
- </dependency>
- </dependencies>
- <reporting>
- <plugins>
- <plugin>
- <artifactId>maven-pmd-plugin</artifactId>
- <configuration>
- <linkXref>true</linkXref>
- <sourceEncoding>utf-8</sourceEncoding>
- <minimumTokens>20</minimumTokens>
- <targetJdk>1.5</targetJdk>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-jxr-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-javadoc-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <configLocation>../mavenbuild/coding_standards.xml</configLocation>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- <distributionManagement>
- <repository>
- <uniqueVersion>false</uniqueVersion>
- <id>release-repo</id>
- <name>The Badger Set Maven2 Repository</name>
- <url>file://c:/temp</url>
- </repository>
- <snapshotRepository>
- <id>snapshot-repo</id>
- <name>The Badger Set Maven2 Snapshot Repository</name>
- <url>file://c:/temp</url>
- </snapshotRepository>
- <status>deployed</status>
- </distributionManagement>
-</project> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5
deleted file mode 100644
index adf20d93ad..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.md5
+++ /dev/null
@@ -1 +0,0 @@
-4ab65f208ffa4400551233321b90933a \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1
deleted file mode 100644
index aeb3966048..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/junit-toolkit-maven-plugin-0.5-20070130.111904-1.pom.sha1
+++ /dev/null
@@ -1 +0,0 @@
-84f491024bd60142781ef9035f4394cb1379902d \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml
deleted file mode 100644
index 0a46c1d79a..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><metadata>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit-maven-plugin</artifactId>
- <version>0.5-SNAPSHOT</version>
- <versioning>
- <snapshot>
- <timestamp>20070130.111904</timestamp>
- <buildNumber>1</buildNumber>
- </snapshot>
- <lastUpdated>20070130111904</lastUpdated>
- </versioning>
-</metadata> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5
deleted file mode 100644
index 4e7eab390b..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-c619b7ac915b2eba622d556b2d2e0c25 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1
deleted file mode 100644
index 83a5267307..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/0.5-SNAPSHOT/maven-metadata.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-db7b5d51a53a5018611391ecc3346032a6c20dda \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml
deleted file mode 100644
index a3bff0dde2..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><metadata>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit-maven-plugin</artifactId>
- <version>0.5-SNAPSHOT</version>
- <versioning>
- <latest>0.5-SNAPSHOT</latest>
- <versions>
- <version>0.5-SNAPSHOT</version>
- </versions>
- <lastUpdated>20070130111904</lastUpdated>
- </versioning>
-</metadata> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5
deleted file mode 100644
index 395a968533..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-da47ce66de64d4ba056d0a9c901c5676 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1
deleted file mode 100644
index b396785e6c..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit-maven-plugin/maven-metadata.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-70adf93da1c1757152e954750ceb2477a8659a99 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
deleted file mode 100644
index b5c0129ea9..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar
+++ /dev/null
Binary files differ
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
deleted file mode 100644
index 2f9b7922bd..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.md5
+++ /dev/null
@@ -1 +0,0 @@
-ce27581a94a89b664830f3c355dd7bf5 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
deleted file mode 100644
index 3c17b5a8e4..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-ffecddfd23345c7fb4177ab1d89cf73a4fe7adc9 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
deleted file mode 100644
index a2f72deff6..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom
+++ /dev/null
@@ -1,111 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><project>
- <modelVersion>4.0.0</modelVersion>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit</artifactId>
- <name>junit-toolkit</name>
- <version>0.5-20070202.132554-1</version>
- <description>JUnit Toolkit enhances JUnit with performance testing, asymptotic behaviour analysis, and concurrency testing.</description>
- <url>http://www.thebadgerset.co.uk/junit-toolkit</url>
- <developers>
- <developer>
- <id>rupert</id>
- <name>Rupert Smith</name>
- <email>rupertlssmith (contactable on g-m-a-i-l)</email>
- <organization></organization>
- </developer>
- </developers>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- </license>
- </licenses>
- <scm>
- <connection>scm:svn:http://www.thebadgerset.co.uk/svn/junit-toolkit</connection>
- </scm>
- <organization>
- <name>The Badger Set trading as Liberty Bishop (1151) ltd.</name>
- <url>http://www.thebadgerset.co.uk/</url>
- </organization>
- <build>
- <sourceDirectory>src/main</sourceDirectory>
- <pluginManagement>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.1</version>
- <configuration>
- <source>1.5</source>
- <target>1.5</target>
- <fork>false</fork>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- <pluginRepositories>
- <pluginRepository>
- <snapshots />
- <id>apache.snapshots</id>
- <name>Apache SNAPSHOT Repository</name>
- <url>http://people.apache.org/repo/m2-snapshot-repository</url>
- </pluginRepository>
- </pluginRepositories>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>3.8.1</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.8</version>
- </dependency>
- <dependency>
- <groupId>regexp</groupId>
- <artifactId>regexp</artifactId>
- <version>1.3</version>
- </dependency>
- </dependencies>
- <reporting>
- <plugins>
- <plugin>
- <artifactId>maven-pmd-plugin</artifactId>
- <configuration>
- <linkXref>true</linkXref>
- <sourceEncoding>utf-8</sourceEncoding>
- <minimumTokens>20</minimumTokens>
- <targetJdk>1.5</targetJdk>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-jxr-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-javadoc-plugin</artifactId>
- </plugin>
- <plugin>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <configLocation>../mavenbuild/coding_standards.xml</configLocation>
- </configuration>
- </plugin>
- </plugins>
- </reporting>
- <distributionManagement>
- <repository>
- <uniqueVersion>false</uniqueVersion>
- <id>release-repo</id>
- <name>The Badger Set Maven2 Repository</name>
- <url>file://c:/temp</url>
- </repository>
- <snapshotRepository>
- <id>snapshot-repo</id>
- <name>The Badger Set Maven2 Snapshot Repository</name>
- <url>file://c:/temp</url>
- </snapshotRepository>
- <status>deployed</status>
- </distributionManagement>
-</project> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
deleted file mode 100644
index ec5938d266..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.md5
+++ /dev/null
@@ -1 +0,0 @@
-36d35e778356cef8a984a021d9bc0fe4 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
deleted file mode 100644
index f889d5fda6..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/junit-toolkit-0.5-20070202.132554-1.pom.sha1
+++ /dev/null
@@ -1 +0,0 @@
-9383e1d89168d83973f47595063a35b733a3854d \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
deleted file mode 100644
index 42bb1afc36..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><metadata>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit</artifactId>
- <version>0.5-SNAPSHOT</version>
- <versioning>
- <snapshot>
- <timestamp>20070202.132554</timestamp>
- <buildNumber>1</buildNumber>
- </snapshot>
- <lastUpdated>20070202132554</lastUpdated>
- </versioning>
-</metadata> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
deleted file mode 100644
index 2d29a790e5..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-277e07c561ec6eebda9d1a470a2ce6a4 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
deleted file mode 100644
index 0b047b645a..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/0.5-SNAPSHOT/maven-metadata.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-155e5e47c236cefb5668414e81cc485867bdb93e \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
deleted file mode 100644
index 76b0c44645..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><metadata>
- <groupId>uk.co.thebadgerset</groupId>
- <artifactId>junit-toolkit</artifactId>
- <version>0.5-SNAPSHOT</version>
- <versioning>
- <versions>
- <version>0.5-SNAPSHOT</version>
- </versions>
- <lastUpdated>20070202132554</lastUpdated>
- </versioning>
-</metadata> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
deleted file mode 100644
index a57795d6c0..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-77e2a30606515c4f52bcb466c3966f63 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
deleted file mode 100644
index 6b612f5d27..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/junit-toolkit/maven-metadata.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-111f4320859e398337dbf4e396ec2fad1e5aa8dd \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml
deleted file mode 100644
index 4c367f55d3..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?><metadata>
- <plugins>
- <plugin>
- <name>junit-toolkit-maven-plugin</name>
- <prefix>junit-toolkit</prefix>
- <artifactId>junit-toolkit-maven-plugin</artifactId>
- </plugin>
- </plugins>
-</metadata> \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5 b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5
deleted file mode 100644
index da122e37da..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-8eee1c76f27e4d20ffcd48d87897b923 \ No newline at end of file
diff --git a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1 b/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1
deleted file mode 100644
index fd57ffc943..0000000000
--- a/qpid/java/mvn-repo/uk/co/thebadgerset/maven-metadata.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-300ebe0bfa8ae2b56d26a9fae9073ef9f902b0e7 \ No newline at end of file
diff --git a/qpid/java/perftests/pom.xml b/qpid/java/perftests/pom.xml
index 02f55b43ee..3bcda0f359 100644
--- a/qpid/java/perftests/pom.xml
+++ b/qpid/java/perftests/pom.xml
@@ -44,7 +44,7 @@
<repository>
<id>junit-toolkit.snapshots</id>
<name>JUnit Toolkit SNAPSHOT Repository</name>
- <url>file://${basedir}/../mvn-repo</url>
+ <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -56,7 +56,7 @@
<pluginRepository>
<id>junit-toolkit-plugin.snapshots</id>
<name>JUnit Toolkit SNAPSHOT Repository</name>
- <url>file://${basedir}/../mvn-repo</url>
+ <url>http://junit-toolkit.svn.sourceforge.net/svnroot/junit-toolkit/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -148,19 +148,7 @@
<name>amqj.test.logging.level</name>
<value>info</value>
</property>
- <property>
- <name>logdir</name>
- <value>$QPID_WORK/results</value>
- </property>
- <property>
- <name>-Xms</name>
- <value>256m</value>
- </property>
- <property>
- <name>-Xmx</name>
- <value>512m</value>
- </property>
- </systemproperties>
+ </systemproperties>
<commands>
<!-- Single pings. These can be scaled up by overriding the parameters when calling the test script. -->
@@ -200,147 +188,19 @@
-n Ping-Failover-After-Commit -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf CommitBatchSize=10 FailAfterCommit=true
</Ping-Failover-After-Commit>
-
- <!-- P2P Volume Tests. -->
- <VT-Qpid-1>-n VT-Qpid-1 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</VT-Qpid-1>
- <VT-Qpid-2>-n VT-Qpid-2 -s [15000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</VT-Qpid-2>
- <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. -->
- <VT-Qpid-3>-n VT-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000 transacted=true</VT-Qpid-3>
- <VT-Qpid-4>-n VT-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true rate=10000 BatchSize=3000000</VT-Qpid-4>
-
- <!-- P2P Scalability Tests. -->
- <!-- 250,000 Total, 1P-1T-1C -->
- <PT-Qpid-1>-n PT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-1>
- <PT-Qpid-2>-n PT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-2>
-
- <!-- 25000 Msgs * 10 Brokers = 250,000 Total, 10P-1Q-10C -->
- <PT-Qpid-3>-n PT-Qpid-3 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true transacted=true</PT-Qpid-3>
- <PT-Qpid-4>-n PT-Qpid-4 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true </PT-Qpid-4>
-
- <!-- 25000 Msgs * 10 Brokers = 250,000 Tota,l 10P-10T-10C 10*(1P-1Q-1C) -->
- <PT-Qpid-5>-n PT-Qpid-5 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1 transacted=true</PT-Qpid-5>
- <PT-Qpid-6>-n PT-Qpid-6 -s [25000] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=1</PT-Qpid-6>
-
- <!-- 2500 Msgs * 10 Brokers * 10 Topics/Clients = 250,000 Total, 10P-100T-10C 10*(1P-10T-1C) -->
- <PT-Qpid-7>-n PT-Qpid-7 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10 transacted=true</PT-Qpid-7>
- <PT-Qpid-8>-n PT-Qpid-8 -s [2500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true destinationcount=10</PT-Qpid-8>
-
- <!-- 2500 Msgs * 100 Brokers = 250,000 Total, 100P-100T-100C 100*(1P-1T-1C) -->
- <PT-Qpid-9>-n PT-Qpid-9 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=500</PT-Qpid-9>
- <PT-Qpid-10>-n PT-Qpid-10 -s [2500] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-10>
+ </commands>
+ </configuration>
- <!-- 250 Msgs * 100 Brokers * 10 Clients = 250,000 Total, 100P-1000T-100C 100*(1P-10T-1C) -->
- <PT-Qpid-11>-n PT-Qpid-11 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10 transacted=true CommitBatchSize=50</PT-Qpid-11>
- <PT-Qpid-12>-n PT-Qpid-12 -s [250] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=10</PT-Qpid-12>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <!--<goals>
+ <goal>tktest</goal>
+ </goals>-->
+ </execution>
+ </executions>
+ </plugin>
- <!-- 250 Msgs * 1000 Brokers = 250,000 Total, 1000P-1000T-1000C 1000*(1P-1T-1C) -->
- <PT-Qpid-13>-n PT-Qpid-13 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1 transacted=true CommitBatchSize=50</PT-Qpid-13>
- <PT-Qpid-14>-n PT-Qpid-14 -s [250] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=250 destinationcount=1</PT-Qpid-14>
-
- <!-- P2P Volume Tests. -->
- <VQ-Qpid-1>-n VQ-Qpid-1 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</VQ-Qpid-1>
- <VQ-Qpid-2>-n VQ-Qpid-2 -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</VQ-Qpid-2>
- <!-- Setting sample to 3,000,000 will result in a log entry every 10 minutes so should have 144 data points for the run. -->
- <VQ-Qpid-3>-n VQ-Qpid-3 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 transacted=true</VQ-Qpid-3>
- <VQ-Qpid-4>-n VQ-Qpid-4 -s [3000000] -d 24H -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf rate=10000 BatchSize=3000000 </VQ-Qpid-4>
-
- <!-- P2P Scalability Tests. -->
- <!-- 15,000 Total, 1P-1Q-1C -->
- <PQ-Qpid-1>-n PQ-Qpid-1 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true</PQ-Qpid-1>
- <PQ-Qpid-2>-n PQ-Qpid-2 -s [15000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf</PQ-Qpid-2>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-1Q-10C -->
- <PQ-Qpid-3>-n PQ-Qpid-3 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping transacted=true CommitBatchSize=500</PQ-Qpid-3>
- <PQ-Qpid-4>-n PQ-Qpid-4 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationname=ping</PQ-Qpid-4>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-10Q-10C 10*(1P-1Q-1C) -->
- <PQ-Qpid-5>-n PQ-Qpid-5 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1 transacted=true CommitBatchSize=500</PQ-Qpid-5>
- <PQ-Qpid-6>-n PQ-Qpid-6 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=1</PQ-Qpid-6>
-
- <!-- 1500 Messages * 10 Brokers = 15,000 Total, 10P-100Q-10C 10*(1P-10Q-1C) -->
- <PQ-Qpid-7>-n PQ-Qpid-7 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=500</PQ-Qpid-7>
- <PQ-Qpid-8>-n PQ-Qpid-8 -s [1500] -c[10] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-8>
-
- <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-100Q-100C 100*(1P-1Q-1C) -->
- <PQ-Qpid-9>-n PQ-Qpid-9 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 transacted=true CommitBatchSize=50</PQ-Qpid-9>
- <PQ-Qpid-10>-n PQ-Qpid-10 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=50 destinationcount=1 </PQ-Qpid-10>
-
- <!-- 150 Messages * 100 Brokers = 15,000 Total, 100P-1000Q-100C 100*(1P-10Q-1C) -->
- <PQ-Qpid-11>-n PQ-Qpid-11 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10 transacted=true CommitBatchSize=50</PQ-Qpid-11>
- <PQ-Qpid-12>-n PQ-Qpid-12 -s [150] -c[100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=100 destinationcount=10</PQ-Qpid-12>
-
- <!-- 15 Messages * 1000 Brokers = 15,000 Total, 1000P-1000Q-1000C 1000*(1P-1Q-1C) -->
- <PQ-Qpid-13>-n PQ-Qpid-13 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 transacted=true CommitBatchSize=15</PQ-Qpid-13>
- <PQ-Qpid-14>-n PQ-Qpid-14 -s [15] -c[1000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=15 </PQ-Qpid-14>
-
- <!-- Increasing Message Payload Tests. -->
- <!-- Topic Testing -->
- <LT-Qpid-1-512b>-n LT-Qpid-1-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-1-512b>
- <LT-Qpid-2-512b>-n LT-Qpid-2-512b -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=512</LT-Qpid-2-512b>
-
- <LT-Qpid-1-1K>-n LT-Qpid-1-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 transacted=true</LT-Qpid-1-1K>
- <LT-Qpid-2-1K>-n LT-Qpid-2-1K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000</LT-Qpid-2-1K>
-
- <LT-Qpid-1-5K>-n LT-Qpid-1-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-1-5K>
- <LT-Qpid-2-5K>-n LT-Qpid-2-5K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=5120</LT-Qpid-2-5K>
-
- <LT-Qpid-1-10K>-n LT-Qpid-1-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-1-10K>
- <LT-Qpid-2-10K>-n LT-Qpid-2-10K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=10240 </LT-Qpid-2-10K>
-
- <LT-Qpid-1-50K>-n LT-Qpid-1-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-1-50K>
- <LT-Qpid-2-50K>-n LT-Qpid-2-50K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=51200</LT-Qpid-2-50K>
-
- <LT-Qpid-1-100K>-n LT-Qpid-1-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-1-100K>
- <LT-Qpid-2-100K>-n LT-Qpid-2-100K -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=102400</LT-Qpid-2-100K>
-
- <LT-Qpid-1-1M>-n LT-Qpid-1-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-1-1M>
- <LT-Qpid-2-1M>-n LT-Qpid-2-1M -s [1000000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf pubsub=true BatchSize=10000 messagesize=1048476</LT-Qpid-2-1M>
-
- <!-- Queue Testing -->
- <LT-Qpid-3-512b>-n LT-Qpid-3-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512 transacted=true</LT-Qpid-3-512b>
- <LT-Qpid-4-512b>-n LT-Qpid-4-512b -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=512</LT-Qpid-4-512b>
-
- <LT-Qpid-3-1K>-n LT-Qpid-3-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true</LT-Qpid-3-1K>
- <LT-Qpid-4-1K>-n LT-Qpid-4-1K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000</LT-Qpid-4-1K>
-
- <LT-Qpid-3-5K>-n LT-Qpid-3-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120 transacted=true</LT-Qpid-3-5K>
- <LT-Qpid-4-5K>-n LT-Qpid-4-5K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=5120</LT-Qpid-4-5K>
-
- <LT-Qpid-3-10K>-n LT-Qpid-3-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240 transacted=true</LT-Qpid-3-10K>
- <LT-Qpid-4-10K>-n LT-Qpid-4-10K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=10240</LT-Qpid-4-10K>
-
- <LT-Qpid-3-50K>-n LT-Qpid-3-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200 transacted=true</LT-Qpid-3-50K>
- <LT-Qpid-4-50K>-n LT-Qpid-4-50K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=51200</LT-Qpid-4-50K>
-
- <LT-Qpid-3-100K>-n LT-Qpid-3-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400 transacted=true</LT-Qpid-3-100K>
- <LT-Qpid-4-100K>-n LT-Qpid-4-100K -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=102400</LT-Qpid-4-100K>
-
- <LT-Qpid-3-1M>-n LT-Qpid-3-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 transacted=true</LT-Qpid-3-1M>
- <LT-Qpid-4-1M>-n LT-Qpid-4-1M -s [900000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 messagesize=1048576 </LT-Qpid-4-1M>
-
- <!-- Failover Tests. -->
- <!-- Transactional -->
- <FT-Qpid-1>-n FT-Qpid-1 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailBeforeSend=true</FT-Qpid-1>
- <FT-Qpid-2>-n FT-Qpid-2 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterSend=true</FT-Qpid-2>
- <FT-Qpid-3>-n FT-Qpid-3 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 transacted=true broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" FailAfterCommit=true</FT-Qpid-3>
-
- <!-- Non Transactional -->
- <FT-Qpid-4>-n FT-Qpid-4 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailBeforeSend=true</FT-Qpid-4>
- <FT-Qpid-5>-n FT-Qpid-5 -s [250000] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf BatchSize=10000 broker="tcp://10.0.0.1:5001;tcp://10.0.0.2:5002" transacted=false FailAfterSend=true</FT-Qpid-5>
-
-
- </commands>
- </configuration>
-
- <executions>
- <execution>
- <phase>test</phase>
- <!--<goals>
- <goal>tktest</goal>
- </goals>-->
- </execution>
- </executions>
- </plugin>
<!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests.
Usefull when bundling system, integration or performance tests into a convenient
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index a295919565..a52b5a0c49 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -38,6 +38,8 @@ import org.apache.qpid.requestreply.PingPongProducer;
*/
public class PingClient extends PingPongProducer
{
+ private static int _pingClientCount;
+
/**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates ping pong producer but de-registers its reply-to destination message
@@ -76,6 +78,8 @@ public class PingClient extends PingPongProducer
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
pubsub, unique);
+
+ _pingClientCount++;
}
/**
@@ -88,4 +92,17 @@ public class PingClient extends PingPongProducer
{
return _pingDestinations;
}
+
+ public int getConsumersPerTopic()
+ {
+ if (_isUnique)
+ {
+ return 1;
+ }
+ else
+ {
+ return _pingClientCount;
+ }
+ }
+
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 027bb04a12..6f444bd290 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -50,21 +50,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
/**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
* client (see {@link PingPongBouncer} for the bounce back client).
- *
+ * <p/>
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
* correlation id in the ping to be bounced back in the reply correlation id.
- *
+ * <p/>
* <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
* It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
* within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
* failover testing.
- *
+ * <p/>
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for all responses cycle.
@@ -72,25 +72,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle;
* </table>
*
* @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
- * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
- * be created and configured by the test runner from the -f command line option and made available through
- * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
- * tests.
- *
+ * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line option and made available through
+ * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ * tests.
* @todo Make acknowledege mode a test option.
- *
* @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than
- * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
- * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
- * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
- * be picked up by the PPP that it is atteched to.
- *
+ * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
- * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
- * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
- * that the last message waits until all other messages have been handled before releasing producers but allows
- * messages to be processed concurrently, unlike the current synchronized block.
- *
+ * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized block.
* @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
@@ -228,9 +224,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
- /** A source for providing unique ids to PingPongProducer. */
- private static AtomicInteger _pingProducerIdGenerator;
-
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
* multiple ping producers on the same JVM.
@@ -238,7 +231,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/*private static Map<String, CountDownLatch> trafficLights =
Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -273,6 +266,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Flag used to indicate if this is a point to point or pub/sub ping client. */
protected boolean _isPubSub = false;
+ /** Flag used to indicate if the destinations should be unique client. */
+ protected static boolean _isUnique = false;
+
/**
* This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
* on the same JVM using this id generator will allow them to ping on the same queues.
@@ -313,6 +309,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
protected int _txBatchSize = 1;
/**
+ * Holds the number of consumers that will be attached to each topic.
+ * Each pings will result in a reply from each of the attached clients
+ */
+ static int _consumersPerTopic = 1;
+
+ /**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
* to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
@@ -339,7 +341,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* possible, with no rate restriction.
* @param pubsub True to ping topics, false to ping queues.
* @param unique True to use unique destinations for each ping pong producer, false to share.
- *
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
@@ -358,6 +359,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
+ txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
+ ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+ // Keep all the relevant options.
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = txBatchSize;
+ _isPubSub = pubsub;
+ _isUnique = unique;
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -388,18 +402,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
createProducer();
createPingDestinations(noOfDestinations, selector, destinationName, unique);
createReplyConsumers(getReplyDestinations(), selector);
-
- // Keep all the remaining options.
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = txBatchSize;
- _isPubSub = pubsub;
}
/**
@@ -407,6 +409,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* to be started to bounce the pings back again.
*
* @param args The command line arguments.
+ * @throws Exception When something went wrong with the test
*/
public static void main(String[] args) throws Exception
{
@@ -421,7 +424,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
+ String virtualpath = "/test";
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -479,9 +482,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
pingProducer.getConnection().start();
@@ -511,7 +514,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- { }
+ {
+ //ignore
+ }
}
}
@@ -555,11 +560,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param rootName The root of the name, or actual name if only one is being created.
* @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
* the numbering with all pingers on the same JVM.
- *
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
_logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
@@ -568,28 +572,32 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Create the desired number of ping destinations and consumers for them.
for (int i = 0; i < noOfDestinations; i++)
{
- AMQDestination destination = null;
+ AMQDestination destination;
int id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
+ _logger.debug("Creating unique destinations.");
id = _queueJVMSequenceID.incrementAndGet();
}
else
{
+ _logger.debug("Creating shared destinations.");
id = _queueSharedId.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
+ _logger.debug("Creating topics.");
destination = new AMQTopic(rootName + id);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
+ _logger.debug("Creating queues.");
destination = new AMQQueue(rootName + id);
}
@@ -697,11 +705,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
@@ -723,14 +730,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
_logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
@@ -743,7 +749,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// chained message listener must be called before this sender can be unblocked, but that decrementing the
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
@@ -763,11 +770,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = numPings - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies >= numPings;
+ numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+
+ allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = "+ numReplies);
- _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+ _logger.debug("numReplies = " + numReplies);
+ _logger.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
@@ -779,7 +787,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
while (!timedOut && !allMessagesReceived);
- if ((numReplies < numPings) && _verbose)
+ if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
_logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -808,7 +816,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param message The message to send.
* @param numPings The number of pings to send.
* @param messageCorrelationId A correlation id to place on all messages sent.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
@@ -864,9 +871,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
}
- /**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
- */
+ /** The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. */
public void pingLoop()
{
try
@@ -909,9 +914,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_chainedMessageListener = messageListener;
}
- /**
- * Removes any chained message listeners from this pinger.
- */
+ /** Removes any chained message listeners from this pinger. */
public void removeChainedMessageListener()
{
_chainedMessageListener = null;
@@ -923,9 +926,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
* @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
- *
* @return A freshly generated test message.
- *
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
@@ -947,9 +948,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_publish = false;
}
- /**
- * Implements a ping loop that repeatedly pings until the publish flag becomes false.
- */
+ /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
// Keep running until the publish flag is cleared.
@@ -980,12 +979,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public Thread getShutdownHook()
{
return new Thread(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- stop();
- }
- });
+ stop();
+ }
+ });
}
/**
@@ -1003,7 +1002,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @param destinations The destinations to listen to.
* @param selector A selector to filter the messages with.
- *
* @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
@@ -1015,8 +1013,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
// Create a consumer for the destination and set this pinger to listen to its messages.
MessageConsumer consumer =
- _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
- selector);
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
consumer.setMessageListener(this);
}
}
@@ -1039,19 +1037,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/**
* Convenience method to commit the transaction on the specified session. If the session to commit on is not
* a transactional session, this method does nothing (unless the failover after send flag is set).
- *
+ * <p/>
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
* is applied. This flag applies whether the pinger is transactional or not.
- *
+ * <p/>
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
* commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
* after the commit is applied. These flags will only apply if using a transactional pinger.
*
+ * @param session The session to commit
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- *
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional
- * and non-transactional alike.
+ * <p/>
+ * //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
*/
protected void commitTx(Session session) throws JMSException
{
@@ -1132,7 +1131,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @param destination The destination to send to.
* @param message The message to send.
- *
* @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
*/
protected void sendMessage(Destination destination, Message message) throws JMSException
@@ -1170,17 +1168,35 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
System.in.read();
}
catch (IOException e)
- { }
+ {
+ //ignore
+ }
System.out.println("Continuing.");
}
/**
+ * This value will be changed by PingClient to represent the number of clients connected to each topic.
+ *
+ * @return int The number of consumers subscribing to each topic.
+ */
+ public int getConsumersPerTopic()
+ {
+ return _consumersPerTopic;
+ }
+
+ public int getExpectedNumPings(int numpings)
+ {
+ return numpings * getConsumersPerTopic();
+ }
+
+
+ /**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
* {@link PingPongProducer#onMessage} method is called, the chained listener set through the
* {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
* count of messages with that correlation id.
- *
+ * <p/>
* Provided only one pinger is producing messages with that correlation id, the chained listener will always be
* given unique message counts. It will always be called while the producer waiting for all messages to arrive is
* still blocked.
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index c01987cfc0..347031ff51 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -70,7 +70,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
/** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
private Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** Holds the batched results listener, that does logging on batch boundaries. */
private BatchedResultsListener batchedResultsListener = null;
@@ -91,6 +91,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
/**
* Compile all the tests into a test suite.
+ * @return The test suite to run. Should only contain testAsyncPingOk method.
*/
public static Test suite()
{
@@ -128,6 +129,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
* all replies have been received or a time out occurs before exiting this method.
*
* @param numPings The number of pings to send.
+ * @throws Exception pass all errors out to the test harness
*/
public void testAsyncPingOk(int numPings) throws Exception
{
@@ -151,7 +153,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = numPings;
+ perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
@@ -160,18 +162,18 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// Generate a sample message of the specified size.
ObjectMessage msg =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId);
// Check that all the replies were received and log a fail if they were not.
- if (numReplies < numPings)
+ if (numReplies < perCorrelationId._expectedCount)
{
- tc.completeTest(false, numPings - numReplies);
+ tc.completeTest(false, numPings - perCorrelationId._expectedCount);
}
// Remove the chained message listener from the ping producer.
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index f81ff531da..6d024a189d 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -110,6 +110,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
/**
* Compile all the tests into a test suite.
+ * @return The test method testPingOk.
*/
public static Test suite()
{
@@ -139,18 +140,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
// Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
// Fail the test if the timeout was exceeded.
- if (numReplies != numPings)
+ if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ numReplies);
@@ -191,7 +192,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
// Extract the test set up paramaeters.
int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
index fe8960c872..1a2817a7b2 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrencyTest.java
@@ -52,7 +52,7 @@ public class ConcurrencyTest extends MessageTestHelper
public ConcurrencyTest() throws Exception
{
- _deliveryMgr = new ConcurrentDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscriptionMgr, new AMQQueue("myQ", false, "guest", false,
new DefaultQueueRegistry()));
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
index 3072d44f48..b3d1eb1325 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/ConcurrentDeliveryManagerTest.java
@@ -21,10 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
{
@@ -33,7 +29,7 @@ public class ConcurrentDeliveryManagerTest extends DeliveryManagerTest
try
{
System.setProperty("concurrentdeliverymanager","true");
- _mgr = new ConcurrentDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
+ _mgr = new ConcurrentSelectorDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
new DefaultQueueRegistry()));
}
catch (Throwable t)
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
index 3631264e5a..858547a05d 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/DeliveryManagerTest.java
@@ -169,7 +169,6 @@ abstract public class DeliveryManagerTest extends MessageTestHelper
{
TestSuite suite = new TestSuite();
suite.addTestSuite(ConcurrentDeliveryManagerTest.class);
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
return suite;
}
}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
deleted file mode 100644
index ebe8e192a0..0000000000
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.DeliveryManagerTest;
-import org.apache.qpid.AMQException;
-
-import junit.framework.TestSuite;
-
-public class SynchronizedDeliveryManagerTest extends DeliveryManagerTest
-{
- public SynchronizedDeliveryManagerTest() throws Exception
- {
- try
- {
- System.setProperty("concurrentdeliverymanager","false");
- _mgr = new SynchronizedDeliveryManager(_subscriptions, new AMQQueue("myQ", false, "guest", false,
- new DefaultQueueRegistry()));
- }
- catch (Throwable t)
- {
- t.printStackTrace();
- throw new AMQException("Could not initialise delivery manager", t);
- }
- }
-
- public static junit.framework.Test suite()
- {
- TestSuite suite = new TestSuite();
- suite.addTestSuite(SynchronizedDeliveryManagerTest.class);
- return suite;
- }
-}